Merge pull request #1849 from harshavardhana/multipart

XL/PutObject: Handle all pending cases of DiskNotFound.
This commit is contained in:
Harshavardhana 2016-06-03 11:58:04 -07:00
commit 70a1231f02
8 changed files with 81 additions and 17 deletions

View File

@ -249,6 +249,11 @@ func (s posix) StatVol(volume string) (volInfo VolInfo, err error) {
// DeleteVol - delete a volume.
func (s posix) DeleteVol(volume string) error {
// Validate if disk is free.
if err := checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return err
}
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(volume)
if err != nil {
@ -274,6 +279,11 @@ func (s posix) DeleteVol(volume string) error {
// ListDir - return all the entries at the given directory path.
// If an entry is a directory it will be returned with a trailing "/".
func (s posix) ListDir(volume, dirPath string) ([]string, error) {
// Validate if disk is free.
if err := checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return nil, err
}
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(volume)
if err != nil {
@ -296,6 +306,11 @@ func (s posix) ListDir(volume, dirPath string) ([]string, error) {
// for io.EOF. Additionally ReadFile also starts reading from an
// offset.
func (s posix) ReadFile(volume string, path string, offset int64, buf []byte) (n int64, err error) {
// Validate if disk is free.
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return 0, err
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return 0, err
@ -356,6 +371,11 @@ func (s posix) ReadFile(volume string, path string, offset int64, buf []byte) (n
// AppendFile - append a byte array at path, if file doesn't exist at
// path this call explicitly creates it.
func (s posix) AppendFile(volume, path string, buf []byte) (n int64, err error) {
// Validate if disk is free.
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return 0, err
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return 0, err
@ -403,6 +423,11 @@ func (s posix) AppendFile(volume, path string, buf []byte) (n int64, err error)
// StatFile - get file info.
func (s posix) StatFile(volume, path string) (file FileInfo, err error) {
// Validate if disk is free.
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return FileInfo{}, err
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return FileInfo{}, err
@ -480,6 +505,11 @@ func deleteFile(basePath, deletePath string) error {
// DeleteFile - delete a file at path.
func (s posix) DeleteFile(volume, path string) error {
// Validate if disk is free.
if err := checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return err
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err
@ -506,6 +536,11 @@ func (s posix) DeleteFile(volume, path string) error {
// RenameFile - rename source path to destination path atomically.
func (s posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error {
// Validate if disk is free.
if err := checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return err
}
srcVolumeDir, err := s.getVolDir(srcVolume)
if err != nil {
return err

View File

@ -53,6 +53,11 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string)
}
entries, err = disk.ListDir(bucket, prefixDir)
if err != nil {
// For any reason disk was deleted or goes offline, continue
// and list form other disks if possible.
if err == errDiskNotFound {
continue
}
break
}
// Skip the entries which do not match the filter.

View File

@ -56,9 +56,7 @@ func (xl xlObjects) MakeBucket(bucket string) error {
err := disk.MakeVol(bucket)
if err != nil {
dErrs[index] = err
return
}
dErrs[index] = nil
}(index, disk)
}
@ -99,7 +97,7 @@ func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err
var volInfo VolInfo
volInfo, err = disk.StatVol(bucketName)
if err != nil {
// For some reason disk went offline pick the next one.
// For any reason disk went offline continue and pick the next one.
if err == errDiskNotFound {
continue
}
@ -154,6 +152,10 @@ func (xl xlObjects) listBuckets() (bucketsInfo []BucketInfo, err error) {
}
var volsInfo []VolInfo
volsInfo, err = disk.ListVols()
// Ignore any disks not found.
if err == errDiskNotFound {
continue
}
if err == nil {
// NOTE: The assumption here is that volumes across all disks in
// readQuorum have consistent view i.e they all have same number
@ -218,9 +220,7 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
err := disk.DeleteVol(bucket)
if err != nil {
dErrs[index] = err
return
}
dErrs[index] = nil
}(index, disk)
}

View File

@ -63,6 +63,9 @@ func (xl xlObjects) isObject(bucket, prefix string) bool {
}
_, err := disk.StatFile(bucket, path.Join(prefix, xlMetaJSONFile))
if err != nil {
if err == errDiskNotFound {
continue
}
return false
}
break

View File

@ -209,6 +209,10 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err
var buf []byte
buf, err = readAll(disk, bucket, path.Join(object, xlMetaJSONFile))
if err != nil {
// For any reason disk is not available continue and read from other disks.
if err == errDiskNotFound {
continue
}
return xlMetaV1{}, err
}
err = json.Unmarshal(buf, &xlMeta)
@ -338,11 +342,10 @@ func (xl xlObjects) writeUniqueXLMetadata(bucket, prefix string, xlMetas []xlMet
xlMetas[index].Erasure.Index = index + 1
// Write unique `xl.json` for a disk at index.
if err := writeXLMetadata(disk, bucket, prefix, xlMetas[index]); err != nil {
err := writeXLMetadata(disk, bucket, prefix, xlMetas[index])
if err != nil {
mErrs[index] = err
return
}
mErrs[index] = nil
}(index, disk)
}
@ -404,11 +407,10 @@ func (xl xlObjects) writeSameXLMetadata(bucket, prefix string, xlMeta xlMetaV1)
metadata.Erasure.Index = index + 1
// Write xl metadata.
if err := writeXLMetadata(disk, bucket, prefix, metadata); err != nil {
err := writeXLMetadata(disk, bucket, prefix, metadata)
if err != nil {
mErrs[index] = err
return
}
mErrs[index] = nil
}(index, disk, xlMeta)
}

View File

@ -363,6 +363,9 @@ func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool {
}
_, err := disk.StatFile(bucket, pathJoin(prefix, uploadsJSONFile))
if err != nil {
if err == errDiskNotFound {
continue
}
return false
}
break
@ -377,8 +380,12 @@ func (xl xlObjects) listUploadsInfo(prefixPath string) (uploadsInfo []uploadInfo
continue
}
splitPrefixes := strings.SplitN(prefixPath, "/", 3)
uploadsJSON, err := readUploadsJSON(splitPrefixes[1], splitPrefixes[2], disk)
var uploadsJSON uploadsV1
uploadsJSON, err = readUploadsJSON(splitPrefixes[1], splitPrefixes[2], disk)
if err != nil {
if err == errDiskNotFound {
continue
}
if err == errFileNotFound {
return []uploadInfo{}, nil
}

View File

@ -68,6 +68,9 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
continue
}
uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, disk)
if err == errDiskNotFound {
continue
}
break
}
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker))
@ -124,9 +127,12 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
if disk == nil {
continue
}
newUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, disk)
if err == errDiskNotFound {
continue
}
break
}
newUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, disk)
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry))
if err != nil {
if err == errFileNotFound || walkResult.err == errDiskNotFound {
@ -661,13 +667,17 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// Validate if there are other incomplete upload-id's present for
// the object, if yes do not attempt to delete 'uploads.json'.
var disk StorageAPI
var uploadsJSON uploadsV1
for _, disk = range xl.getLoadBalancedQuorumDisks() {
if disk == nil {
continue
}
uploadsJSON, err = readUploadsJSON(bucket, object, disk)
if err == errDiskNotFound {
continue
}
break
}
uploadsJSON, err := readUploadsJSON(bucket, object, disk)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, object)
}
@ -709,13 +719,17 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e
// Validate if there are other incomplete upload-id's present for
// the object, if yes do not attempt to delete 'uploads.json'.
var disk StorageAPI
var uploadsJSON uploadsV1
for _, disk = range xl.getLoadBalancedQuorumDisks() {
if disk == nil {
continue
}
uploadsJSON, err = readUploadsJSON(bucket, object, disk)
if err == errDiskNotFound {
continue
}
break
}
uploadsJSON, err := readUploadsJSON(bucket, object, disk)
if err != nil {
return toObjectErr(err, bucket, object)
}

View File

@ -401,9 +401,7 @@ func (xl xlObjects) deleteObject(bucket, object string) error {
err := cleanupDir(disk, bucket, object)
if err != nil {
dErrs[index] = err
return
}
dErrs[index] = nil
}(index, disk)
}