diff --git a/erasure-createfile.go b/erasure-createfile.go index c12f273b4..8dc70573f 100644 --- a/erasure-createfile.go +++ b/erasure-createfile.go @@ -28,7 +28,7 @@ import ( // erasureCreateFile - writes an entire stream by erasure coding to // all the disks, writes also calculate individual block's checksum // for future bit-rot protection. -func erasureCreateFile(disks []StorageAPI, volume string, path string, partName string, data io.Reader, eInfos []erasureInfo) (newEInfos []erasureInfo, size int64, err error) { +func erasureCreateFile(disks []StorageAPI, volume string, path string, partName string, data io.Reader, eInfos []erasureInfo, writeQuorum int) (newEInfos []erasureInfo, size int64, err error) { // Allocated blockSized buffer for reading. buf := make([]byte, blockSizeV1) hashWriters := newHashWriters(len(disks)) @@ -53,7 +53,7 @@ func erasureCreateFile(disks []StorageAPI, volume string, path string, partName if err != nil { return nil, 0, err } - err = appendFile(disks, volume, path, blocks, eInfo.Distribution, hashWriters) + err = appendFile(disks, volume, path, blocks, eInfo.Distribution, hashWriters, writeQuorum) if err != nil { return nil, 0, err } @@ -109,7 +109,7 @@ func encodeData(dataBuffer []byte, dataBlocks, parityBlocks int) ([][]byte, erro } // appendFile - append data buffer at path. -func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, distribution []int, hashWriters []hash.Hash) (err error) { +func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, distribution []int, hashWriters []hash.Hash, writeQuorum int) (err error) { var wg = &sync.WaitGroup{} var wErrs = make([]error, len(disks)) // Write encoded data to quorum disks in parallel. @@ -144,6 +144,5 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, dist // Wait for all the appends to finish. wg.Wait() - // Return success. - return nil + return reduceError(wErrs, writeQuorum) } diff --git a/xl-v1-healing.go b/xl-v1-healing.go index 4adb05112..e8f1f5822 100644 --- a/xl-v1-healing.go +++ b/xl-v1-healing.go @@ -69,8 +69,8 @@ func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []erro return metadataArray, errs } -// error based on total errors and read quorum. -func (xl xlObjects) reduceError(errs []error) error { +// error based on total errors and quorum. +func reduceError(errs []error, quorum int) error { fileNotFoundCount := 0 longNameCount := 0 diskNotFoundCount := 0 @@ -90,28 +90,28 @@ func (xl xlObjects) reduceError(errs []error) error { } } // If we have errors with 'file not found' greater than - // readQuorum, return as errFileNotFound. + // quorum, return as errFileNotFound. // else if we have errors with 'volume not found' - // greater than readQuorum, return as errVolumeNotFound. - if fileNotFoundCount > len(xl.storageDisks)-xl.readQuorum { + // greater than quorum, return as errVolumeNotFound. + if fileNotFoundCount > len(errs)-quorum { return errFileNotFound - } else if longNameCount > len(xl.storageDisks)-xl.readQuorum { + } else if longNameCount > len(errs)-quorum { return errFileNameTooLong - } else if volumeNotFoundCount > len(xl.storageDisks)-xl.readQuorum { + } else if volumeNotFoundCount > len(errs)-quorum { return errVolumeNotFound } // If we have errors with disk not found equal to the // number of disks, return as errDiskNotFound. - if diskNotFoundCount == len(xl.storageDisks) { + if diskNotFoundCount == len(errs) { return errDiskNotFound - } else if diskNotFoundCount > len(xl.storageDisks)-xl.readQuorum { + } else if diskNotFoundCount > len(errs)-quorum { // If we have errors with 'disk not found' - // greater than readQuorum, return as errFileNotFound. + // greater than quorum, return as errFileNotFound. return errFileNotFound } // If we have errors with disk not found equal to the // number of disks, return as errDiskNotFound. - if diskAccessDeniedCount == len(xl.storageDisks) { + if diskAccessDeniedCount == len(errs) { return errVolumeAccessDenied } return nil @@ -156,7 +156,7 @@ func (xl xlObjects) shouldHeal(onlineDisks []StorageAPI) (heal bool) { // - error if any. func (xl xlObjects) listOnlineDisks(partsMetadata []xlMetaV1, errs []error) (onlineDisks []StorageAPI, version int64, err error) { onlineDisks = make([]StorageAPI, len(xl.storageDisks)) - if err = xl.reduceError(errs); err != nil { + if err = reduceError(errs, xl.readQuorum); err != nil { if err == errFileNotFound { // For file not found, treat as if disks are available // return all the configured ones. diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index d324106fa..b119dc33b 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -342,7 +342,7 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string, } // Erasure code data and write across all disks. - newEInfos, n, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, partSuffix, teeReader, eInfos) + newEInfos, n, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, partSuffix, teeReader, eInfos, xl.writeQuorum) if err != nil { return "", toObjectErr(err, minioMetaBucket, tmpPartPath) } @@ -553,7 +553,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Read metadata associated with the object from all disks. partsMetadata, errs := xl.readAllXLMetadata(minioMetaBucket, uploadIDPath) - if err = xl.reduceError(errs); err != nil { + if err = reduceError(errs, xl.readQuorum); err != nil { return "", toObjectErr(err, minioMetaBucket, uploadIDPath) } diff --git a/xl-v1-object.go b/xl-v1-object.go index 9a6989af2..7887ae15b 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -55,7 +55,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i // Read metadata associated with the object from all disks. partsMetadata, errs := xl.readAllXLMetadata(bucket, object) - if err := xl.reduceError(errs); err != nil { + if err := reduceError(errs, xl.readQuorum); err != nil { return toObjectErr(err, bucket, object) } @@ -296,7 +296,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. } // Erasure code and write across all disks. - newEInfos, n, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, "object1", teeReader, eInfos) + newEInfos, n, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, "object1", teeReader, eInfos, xl.writeQuorum) if err != nil { return "", toObjectErr(err, minioMetaBucket, tempErasureObj) }