Set disk to nil after write which needs quorum (#3795)

Ignore a disk which wasn't able to successfully perform an action to
avoid eventual perturbations when the disk comes back in the middle
of write change.
This commit is contained in:
Anis Elleuch 2017-02-26 20:58:32 +01:00 committed by Harshavardhana
parent 461b2bbd37
commit dce0345f8f
4 changed files with 11 additions and 0 deletions

View File

@ -114,6 +114,7 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hash
// Write encoded data to quorum disks in parallel. // Write encoded data to quorum disks in parallel.
for index, disk := range disks { for index, disk := range disks {
if disk == nil { if disk == nil {
wErrs[index] = traceError(errDiskNotFound)
continue continue
} }
wg.Add(1) wg.Add(1)
@ -123,6 +124,8 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hash
wErr := disk.AppendFile(volume, path, enBlocks[index]) wErr := disk.AppendFile(volume, path, enBlocks[index])
if wErr != nil { if wErr != nil {
wErrs[index] = traceError(wErr) wErrs[index] = traceError(wErr)
// Ignore disk which returned an error.
disks[index] = nil
return return
} }

View File

@ -361,6 +361,8 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []
err := writeXLMetadata(disk, bucket, prefix, xlMetas[index]) err := writeXLMetadata(disk, bucket, prefix, xlMetas[index])
if err != nil { if err != nil {
mErrs[index] = err mErrs[index] = err
// Ignore disk which returned an error.
disks[index] = nil
} }
}(index, disk) }(index, disk)
} }
@ -399,6 +401,8 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet
err := writeXLMetadata(disk, bucket, prefix, metadata) err := writeXLMetadata(disk, bucket, prefix, metadata)
if err != nil { if err != nil {
mErrs[index] = err mErrs[index] = err
// Ignore disk which returned an error.
disks[index] = nil
} }
}(index, disk, xlMeta) }(index, disk, xlMeta)
} }

View File

@ -254,6 +254,8 @@ func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPr
rErr := disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile) rErr := disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile)
if rErr != nil { if rErr != nil {
mErrs[index] = traceError(rErr) mErrs[index] = traceError(rErr)
// Ignore disk which returned an error.
disks[index] = nil
return return
} }
mErrs[index] = nil mErrs[index] = nil

View File

@ -395,6 +395,8 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string,
err := disk.RenameFile(srcBucket, srcEntry, dstBucket, dstEntry) err := disk.RenameFile(srcBucket, srcEntry, dstBucket, dstEntry)
if err != nil && err != errFileNotFound { if err != nil && err != errFileNotFound {
errs[index] = traceError(err) errs[index] = traceError(err)
// Ignore disk which returned an error.
disks[index] = nil
} }
}(index, disk) }(index, disk)
} }