XL: appendFile should return error if quorum is not met. (#1898)

Fixes #1890
This commit is contained in:
Krishna Srinivas 2016-06-15 00:24:49 +05:30 committed by Harshavardhana
parent afc3102488
commit de1c7d33eb
4 changed files with 20 additions and 21 deletions

View File

@ -28,7 +28,7 @@ import (
// erasureCreateFile - writes an entire stream by erasure coding to
// all the disks, writes also calculate individual block's checksum
// for future bit-rot protection.
func erasureCreateFile(disks []StorageAPI, volume string, path string, partName string, data io.Reader, eInfos []erasureInfo) (newEInfos []erasureInfo, size int64, err error) {
func erasureCreateFile(disks []StorageAPI, volume string, path string, partName string, data io.Reader, eInfos []erasureInfo, writeQuorum int) (newEInfos []erasureInfo, size int64, err error) {
// Allocated blockSized buffer for reading.
buf := make([]byte, blockSizeV1)
hashWriters := newHashWriters(len(disks))
@ -53,7 +53,7 @@ func erasureCreateFile(disks []StorageAPI, volume string, path string, partName
if err != nil {
return nil, 0, err
}
err = appendFile(disks, volume, path, blocks, eInfo.Distribution, hashWriters)
err = appendFile(disks, volume, path, blocks, eInfo.Distribution, hashWriters, writeQuorum)
if err != nil {
return nil, 0, err
}
@ -109,7 +109,7 @@ func encodeData(dataBuffer []byte, dataBlocks, parityBlocks int) ([][]byte, erro
}
// appendFile - append data buffer at path.
func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, distribution []int, hashWriters []hash.Hash) (err error) {
func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, distribution []int, hashWriters []hash.Hash, writeQuorum int) (err error) {
var wg = &sync.WaitGroup{}
var wErrs = make([]error, len(disks))
// Write encoded data to quorum disks in parallel.
@ -144,6 +144,5 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, dist
// Wait for all the appends to finish.
wg.Wait()
// Return success.
return nil
return reduceError(wErrs, writeQuorum)
}

View File

@ -69,8 +69,8 @@ func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []erro
return metadataArray, errs
}
// error based on total errors and read quorum.
func (xl xlObjects) reduceError(errs []error) error {
// error based on total errors and quorum.
func reduceError(errs []error, quorum int) error {
fileNotFoundCount := 0
longNameCount := 0
diskNotFoundCount := 0
@ -90,28 +90,28 @@ func (xl xlObjects) reduceError(errs []error) error {
}
}
// If we have errors with 'file not found' greater than
// readQuorum, return as errFileNotFound.
// quorum, return as errFileNotFound.
// else if we have errors with 'volume not found'
// greater than readQuorum, return as errVolumeNotFound.
if fileNotFoundCount > len(xl.storageDisks)-xl.readQuorum {
// greater than quorum, return as errVolumeNotFound.
if fileNotFoundCount > len(errs)-quorum {
return errFileNotFound
} else if longNameCount > len(xl.storageDisks)-xl.readQuorum {
} else if longNameCount > len(errs)-quorum {
return errFileNameTooLong
} else if volumeNotFoundCount > len(xl.storageDisks)-xl.readQuorum {
} else if volumeNotFoundCount > len(errs)-quorum {
return errVolumeNotFound
}
// If we have errors with disk not found equal to the
// number of disks, return as errDiskNotFound.
if diskNotFoundCount == len(xl.storageDisks) {
if diskNotFoundCount == len(errs) {
return errDiskNotFound
} else if diskNotFoundCount > len(xl.storageDisks)-xl.readQuorum {
} else if diskNotFoundCount > len(errs)-quorum {
// If we have errors with 'disk not found'
// greater than readQuorum, return as errFileNotFound.
// greater than quorum, return as errFileNotFound.
return errFileNotFound
}
// If we have errors with disk not found equal to the
// number of disks, return as errDiskNotFound.
if diskAccessDeniedCount == len(xl.storageDisks) {
if diskAccessDeniedCount == len(errs) {
return errVolumeAccessDenied
}
return nil
@ -156,7 +156,7 @@ func (xl xlObjects) shouldHeal(onlineDisks []StorageAPI) (heal bool) {
// - error if any.
func (xl xlObjects) listOnlineDisks(partsMetadata []xlMetaV1, errs []error) (onlineDisks []StorageAPI, version int64, err error) {
onlineDisks = make([]StorageAPI, len(xl.storageDisks))
if err = xl.reduceError(errs); err != nil {
if err = reduceError(errs, xl.readQuorum); err != nil {
if err == errFileNotFound {
// For file not found, treat as if disks are available
// return all the configured ones.

View File

@ -342,7 +342,7 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string,
}
// Erasure code data and write across all disks.
newEInfos, n, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, partSuffix, teeReader, eInfos)
newEInfos, n, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, partSuffix, teeReader, eInfos, xl.writeQuorum)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, tmpPartPath)
}
@ -553,7 +553,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// Read metadata associated with the object from all disks.
partsMetadata, errs := xl.readAllXLMetadata(minioMetaBucket, uploadIDPath)
if err = xl.reduceError(errs); err != nil {
if err = reduceError(errs, xl.readQuorum); err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}

View File

@ -55,7 +55,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
// Read metadata associated with the object from all disks.
partsMetadata, errs := xl.readAllXLMetadata(bucket, object)
if err := xl.reduceError(errs); err != nil {
if err := reduceError(errs, xl.readQuorum); err != nil {
return toObjectErr(err, bucket, object)
}
@ -296,7 +296,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
}
// Erasure code and write across all disks.
newEInfos, n, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, "object1", teeReader, eInfos)
newEInfos, n, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, "object1", teeReader, eInfos, xl.writeQuorum)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, tempErasureObj)
}