diff --git a/cmd/erasure-createfile.go b/cmd/erasure-createfile.go index db300cf39..b011fbb2b 100644 --- a/cmd/erasure-createfile.go +++ b/cmd/erasure-createfile.go @@ -29,7 +29,7 @@ import ( // all the disks, writes also calculate individual block's checksum // for future bit-rot protection. func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, allowEmpty bool, blockSize int64, - dataBlocks, parityBlocks int, algo HashAlgo, writeQuorum int) (bytesWritten int64, checkSums []string, err error) { + dataBlocks, parityBlocks int, algo HashAlgo, writeQuorum int) (newDisks []StorageAPI, bytesWritten int64, checkSums []string, err error) { // Allocated blockSized buffer for reading from incoming stream. buf := make([]byte, blockSize) @@ -43,7 +43,7 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader // FIXME: this is a bug in Golang, n == 0 and err == // io.ErrUnexpectedEOF for io.ReadFull function. if n == 0 && rErr == io.ErrUnexpectedEOF { - return 0, nil, traceError(rErr) + return nil, 0, nil, traceError(rErr) } if rErr == io.EOF { // We have reached EOF on the first byte read, io.Reader @@ -51,28 +51,28 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader // data. Will create a 0byte file instead. if bytesWritten == 0 && allowEmpty { blocks = make([][]byte, len(disks)) - rErr = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum) + newDisks, rErr = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum) if rErr != nil { - return 0, nil, rErr + return nil, 0, nil, rErr } } // else we have reached EOF after few reads, no need to // add an additional 0bytes at the end. break } if rErr != nil && rErr != io.ErrUnexpectedEOF { - return 0, nil, traceError(rErr) + return nil, 0, nil, traceError(rErr) } if n > 0 { // Returns encoded blocks. var enErr error blocks, enErr = encodeData(buf[0:n], dataBlocks, parityBlocks) if enErr != nil { - return 0, nil, enErr + return nil, 0, nil, enErr } // Write to all disks. - if err = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum); err != nil { - return 0, nil, err + if newDisks, err = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum); err != nil { + return nil, 0, nil, err } bytesWritten += int64(n) } @@ -82,7 +82,7 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader for i := range checkSums { checkSums[i] = hex.EncodeToString(hashWriters[i].Sum(nil)) } - return bytesWritten, checkSums, nil + return newDisks, bytesWritten, checkSums, nil } // encodeData - encodes incoming data buffer into @@ -110,7 +110,7 @@ func encodeData(dataBuffer []byte, dataBlocks, parityBlocks int) ([][]byte, erro } // appendFile - append data buffer at path. -func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hashWriters []hash.Hash, writeQuorum int) (err error) { +func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hashWriters []hash.Hash, writeQuorum int) ([]StorageAPI, error) { var wg = &sync.WaitGroup{} var wErrs = make([]error, len(disks)) // Write encoded data to quorum disks in parallel. @@ -126,8 +126,6 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hash wErr := disk.AppendFile(volume, path, enBlocks[index]) if wErr != nil { wErrs[index] = traceError(wErr) - // Ignore disk which returned an error. - disks[index] = nil return } @@ -142,5 +140,5 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hash // Wait for all the appends to finish. wg.Wait() - return reduceWriteQuorumErrs(wErrs, objectOpIgnoredErrs, writeQuorum) + return evalDisks(disks, wErrs), reduceWriteQuorumErrs(wErrs, objectOpIgnoredErrs, writeQuorum) } diff --git a/cmd/erasure-createfile_test.go b/cmd/erasure-createfile_test.go index bcc9973eb..fcf0ce6c6 100644 --- a/cmd/erasure-createfile_test.go +++ b/cmd/erasure-createfile_test.go @@ -56,7 +56,7 @@ func TestErasureCreateFile(t *testing.T) { t.Fatal(err) } // Test when all disks are up. - size, _, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) + _, size, _, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) if err != nil { t.Fatal(err) } @@ -69,7 +69,7 @@ func TestErasureCreateFile(t *testing.T) { disks[5] = AppendDiskDown{disks[5].(*posix)} // Test when two disks are down. - size, _, err = erasureCreateFile(disks, "testbucket", "testobject2", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) + _, size, _, err = erasureCreateFile(disks, "testbucket", "testobject2", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) if err != nil { t.Fatal(err) } @@ -83,7 +83,7 @@ func TestErasureCreateFile(t *testing.T) { disks[8] = AppendDiskDown{disks[8].(*posix)} disks[9] = AppendDiskDown{disks[9].(*posix)} - size, _, err = erasureCreateFile(disks, "testbucket", "testobject3", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) + _, size, _, err = erasureCreateFile(disks, "testbucket", "testobject3", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) if err != nil { t.Fatal(err) } @@ -93,7 +93,7 @@ func TestErasureCreateFile(t *testing.T) { // 1 more disk down. 7 disk down in total. Should return quorum error. disks[10] = AppendDiskDown{disks[10].(*posix)} - _, _, err = erasureCreateFile(disks, "testbucket", "testobject4", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) + _, _, _, err = erasureCreateFile(disks, "testbucket", "testobject4", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) if errorCause(err) != errXLWriteQuorum { t.Errorf("erasureCreateFile return value: expected errXLWriteQuorum, got %s", err) } diff --git a/cmd/erasure-healfile_test.go b/cmd/erasure-healfile_test.go index 0f820738c..263da827e 100644 --- a/cmd/erasure-healfile_test.go +++ b/cmd/erasure-healfile_test.go @@ -48,7 +48,7 @@ func TestErasureHealFile(t *testing.T) { t.Fatal(err) } // Create a test file. - size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) + _, size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) if err != nil { t.Fatal(err) } diff --git a/cmd/erasure-readfile_test.go b/cmd/erasure-readfile_test.go index 6d7fef8ad..570593194 100644 --- a/cmd/erasure-readfile_test.go +++ b/cmd/erasure-readfile_test.go @@ -242,7 +242,7 @@ func TestErasureReadFileDiskFail(t *testing.T) { } // Create a test file to read from. - size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) + _, size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) if err != nil { t.Fatal(err) } @@ -325,7 +325,7 @@ func TestErasureReadFileOffsetLength(t *testing.T) { } // Create a test file to read from. - size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) + _, size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) if err != nil { t.Fatal(err) } @@ -404,7 +404,7 @@ func TestErasureReadFileRandomOffsetLength(t *testing.T) { iterations := 10000 // Create a test file to read from. - size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) + _, size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) if err != nil { t.Fatal(err) } diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index 6bbccce7d..a0a4b43e9 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -468,7 +468,7 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum } // Generate and write `xl.json` generated from other disks. - aErr = writeUniqueXLMetadata(outDatedDisks, minioMetaTmpBucket, tmpID, partsMetadata, diskCount(outDatedDisks)) + outDatedDisks, aErr = writeUniqueXLMetadata(outDatedDisks, minioMetaTmpBucket, tmpID, partsMetadata, diskCount(outDatedDisks)) if aErr != nil { return 0, 0, toObjectErr(aErr, bucket, object) } diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index af8bdcd69..47d85d5d6 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -389,7 +389,7 @@ func deleteAllXLMetadata(disks []StorageAPI, bucket, prefix string, errs []error } // Rename `xl.json` content to destination location for each disk in order. -func renameXLMetadata(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, quorum int) error { +func renameXLMetadata(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, quorum int) ([]StorageAPI, error) { isDir := false srcXLJSON := path.Join(srcEntry, xlMetaJSONFile) dstXLJSON := path.Join(dstEntry, xlMetaJSONFile) @@ -397,7 +397,7 @@ func renameXLMetadata(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEnt } // writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order. -func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []xlMetaV1, quorum int) error { +func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []xlMetaV1, quorum int) ([]StorageAPI, error) { var wg = &sync.WaitGroup{} var mErrs = make([]error, len(disks)) @@ -419,8 +419,6 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas [] err := writeXLMetadata(disk, bucket, prefix, xlMetas[index]) if err != nil { mErrs[index] = err - // Ignore disk which returned an error. - disks[index] = nil } }(index, disk) } @@ -433,11 +431,11 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas [] // Delete all `xl.json` successfully renamed. deleteAllXLMetadata(disks, bucket, prefix, mErrs) } - return err + return evalDisks(disks, mErrs), err } // writeSameXLMetadata - write `xl.json` on all disks in order. -func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMetaV1, writeQuorum, readQuorum int) error { +func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMetaV1, writeQuorum, readQuorum int) ([]StorageAPI, error) { var wg = &sync.WaitGroup{} var mErrs = make([]error, len(disks)) @@ -459,8 +457,6 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet err := writeXLMetadata(disk, bucket, prefix, metadata) if err != nil { mErrs[index] = err - // Ignore disk which returned an error. - disks[index] = nil } }(index, disk, xlMeta) } @@ -473,5 +469,5 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet // Delete all `xl.json` successfully renamed. deleteAllXLMetadata(disks, bucket, prefix, mErrs) } - return err + return evalDisks(disks, mErrs), err } diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 7f266f1d1..6d58c5b97 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -233,7 +233,7 @@ func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInf } // commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks. -func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPrefix string, quorum int) error { +func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPrefix string, quorum int) ([]StorageAPI, error) { var wg = &sync.WaitGroup{} var mErrs = make([]error, len(disks)) @@ -257,8 +257,6 @@ func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPr rErr := disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile) if rErr != nil { mErrs[index] = traceError(rErr) - // Ignore disk which returned an error. - disks[index] = nil return } mErrs[index] = nil @@ -272,7 +270,7 @@ func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPr // Delete all `xl.json` successfully renamed. deleteAllXLMetadata(disks, dstBucket, dstPrefix, mErrs) } - return err + return evalDisks(disks, mErrs), err } // listMultipartUploads - lists all multipart uploads. @@ -491,7 +489,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st uploadIDPath := path.Join(bucket, object, uploadID) tempUploadIDPath := uploadID // Write updated `xl.json` to all disks. - err := writeSameXLMetadata(xl.storageDisks, minioMetaTmpBucket, tempUploadIDPath, xlMeta, xl.writeQuorum, xl.readQuorum) + disks, err := writeSameXLMetadata(xl.storageDisks, minioMetaTmpBucket, tempUploadIDPath, xlMeta, xl.writeQuorum, xl.readQuorum) if err != nil { return "", toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath) } @@ -501,7 +499,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st defer xl.deleteObject(minioMetaTmpBucket, tempUploadIDPath) // Attempt to rename temp upload object to actual upload path object - rErr := renameObject(xl.storageDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum) + _, rErr := renameObject(disks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum) if rErr != nil { return "", toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath) } @@ -657,7 +655,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s allowEmpty := true // Erasure code data and write across all disks. - sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaTmpBucket, tmpPartPath, teeReader, allowEmpty, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, bitRotAlgo, xl.writeQuorum) + onlineDisks, sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaTmpBucket, tmpPartPath, teeReader, allowEmpty, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, bitRotAlgo, xl.writeQuorum) if err != nil { return PartInfo{}, toObjectErr(err, bucket, object) } @@ -702,7 +700,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Rename temporary part file to its final location. partPath := path.Join(uploadIDPath, partSuffix) - err = renamePart(onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, xl.writeQuorum) + onlineDisks, err = renamePart(onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, xl.writeQuorum) if err != nil { return PartInfo{}, toObjectErr(err, minioMetaMultipartBucket, partPath) } @@ -746,10 +744,11 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s tempXLMetaPath := newUUID // Writes a unique `xl.json` each disk carrying new checksum related information. - if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, partsMetadata, xl.writeQuorum); err != nil { + if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, partsMetadata, xl.writeQuorum); err != nil { return PartInfo{}, toObjectErr(err, minioMetaTmpBucket, tempXLMetaPath) } - rErr := commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum) + var rErr error + onlineDisks, rErr = commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum) if rErr != nil { return PartInfo{}, toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath) } @@ -986,11 +985,12 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload } // Write unique `xl.json` for each disk. - if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, partsMetadata, xl.writeQuorum); err != nil { + if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, partsMetadata, xl.writeQuorum); err != nil { return ObjectInfo{}, toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath) } - rErr := commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum) + var rErr error + onlineDisks, rErr = commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum) if rErr != nil { return ObjectInfo{}, toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath) } @@ -1018,7 +1018,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // NOTE: Do not use online disks slice here. // The reason is that existing object should be purged // regardless of `xl.json` status and rolled back in case of errors. - err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum) + _, err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } @@ -1038,7 +1038,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload } // Rename the multipart object to final location. - if err = renameObject(onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, xl.writeQuorum); err != nil { + if onlineDisks, err = renameObject(onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, xl.writeQuorum); err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index ec4c6ddd4..558383408 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -94,11 +94,11 @@ func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string tempObj := mustGetUUID() // Write unique `xl.json` for each disk. - if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum); err != nil { + if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum); err != nil { return ObjectInfo{}, toObjectErr(err, srcBucket, srcObject) } // Rename atomically `xl.json` from tmp location to destination for each disk. - if err = renameXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, srcBucket, srcObject, xl.writeQuorum); err != nil { + if onlineDisks, err = renameXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, srcBucket, srcObject, xl.writeQuorum); err != nil { return ObjectInfo{}, toObjectErr(err, srcBucket, srcObject) } return xlMeta.ToObjectInfo(srcBucket, srcObject), nil @@ -374,7 +374,7 @@ func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry str // rename - common function that renamePart and renameObject use to rename // the respective underlying storage layer representations. -func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, quorum int) error { +func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, quorum int) ([]StorageAPI, error) { // Initialize sync waitgroup. var wg = &sync.WaitGroup{} @@ -398,8 +398,6 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, err := disk.RenameFile(srcBucket, srcEntry, dstBucket, dstEntry) if err != nil && err != errFileNotFound { errs[index] = traceError(err) - // Ignore disk which returned an error. - disks[index] = nil } }(index, disk) } @@ -414,14 +412,14 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, // Undo all the partial rename operations. undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isDir, errs) } - return err + return evalDisks(disks, errs), err } // renamePart - renames a part of the source object to the destination // across all disks in parallel. Additionally if we have errors and do // not have a readQuorum partially renamed files are renamed back to // its proper location. -func renamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, quorum int) error { +func renamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, quorum int) ([]StorageAPI, error) { isDir := false return rename(disks, srcBucket, srcPart, dstBucket, dstPart, isDir, quorum) } @@ -430,7 +428,7 @@ func renamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart strin // across all disks in parallel. Additionally if we have errors and do // not have a readQuorum partially renamed files are renamed back to // its proper location. -func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, quorum int) error { +func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, quorum int) ([]StorageAPI, error) { isDir := true return rename(disks, srcBucket, srcObject, dstBucket, dstObject, isDir, quorum) } @@ -573,8 +571,12 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // when size == -1 because in this case, we are not able to predict how many parts we will have. allowEmptyPart := partIdx == 1 + var partSizeWritten int64 + var checkSums []string + var erasureErr error + // Erasure code data and write across all disks. - partSizeWritten, checkSums, erasureErr := erasureCreateFile(onlineDisks, minioMetaTmpBucket, tempErasureObj, partReader, allowEmptyPart, partsMetadata[0].Erasure.BlockSize, partsMetadata[0].Erasure.DataBlocks, partsMetadata[0].Erasure.ParityBlocks, bitRotAlgo, xl.writeQuorum) + onlineDisks, partSizeWritten, checkSums, erasureErr = erasureCreateFile(onlineDisks, minioMetaTmpBucket, tempErasureObj, partReader, allowEmptyPart, partsMetadata[0].Erasure.BlockSize, partsMetadata[0].Erasure.DataBlocks, partsMetadata[0].Erasure.ParityBlocks, bitRotAlgo, xl.writeQuorum) if erasureErr != nil { return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj) } @@ -674,7 +676,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // NOTE: Do not use online disks slice here. // The reason is that existing object should be purged // regardless of `xl.json` status and rolled back in case of errors. - err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum) + _, err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } @@ -689,12 +691,12 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. } // Write unique `xl.json` for each disk. - if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum); err != nil { + if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum); err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } // Rename the successfully written temporary object to final location. - err = renameObject(onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum) + onlineDisks, err = renameObject(onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } diff --git a/cmd/xl-v1-utils.go b/cmd/xl-v1-utils.go index 43b79cc2b..b437929eb 100644 --- a/cmd/xl-v1-utils.go +++ b/cmd/xl-v1-utils.go @@ -357,6 +357,24 @@ func shuffleDisks(disks []StorageAPI, distribution []int) (shuffledDisks []Stora return shuffledDisks } +// evalDisks - returns a new slice of disks where nil is set if +// the correspond error in errs slice is not nil +func evalDisks(disks []StorageAPI, errs []error) []StorageAPI { + if len(errs) != len(disks) { + errorIf(errors.New("unexpected disks/errors slice length"), "unable to evaluate internal disks") + return nil + } + newDisks := make([]StorageAPI, len(disks)) + for index := range errs { + if errs[index] == nil { + newDisks[index] = disks[index] + } else { + newDisks[index] = nil + } + } + return newDisks +} + // Errors specifically generated by getPartSizeFromIdx function. var ( errPartSizeZero = errors.New("Part size cannot be zero") diff --git a/cmd/xl-v1-utils_test.go b/cmd/xl-v1-utils_test.go index b5c4c8855..b9b4f5ba2 100644 --- a/cmd/xl-v1-utils_test.go +++ b/cmd/xl-v1-utils_test.go @@ -18,6 +18,7 @@ package cmd import ( "encoding/json" + "errors" "reflect" "strconv" "testing" @@ -429,3 +430,61 @@ func testShuffleDisks(t *testing.T, xl *xlObjects) { t.Errorf("shuffleDisks returned incorrect order.") } } + +// TestEvalDisks tests the behavior of evalDisks +func TestEvalDisks(t *testing.T) { + nDisks := 16 + disks, err := getRandomDisks(nDisks) + if err != nil { + t.Fatal(err) + } + objLayer, _, err := initObjectLayer(mustGetNewEndpointList(disks...)) + if err != nil { + removeRoots(disks) + t.Fatal(err) + } + defer removeRoots(disks) + xl := objLayer.(*xlObjects) + testShuffleDisks(t, xl) +} + +func testEvalDisks(t *testing.T, xl *xlObjects) { + disks := xl.storageDisks + + diskErr := errors.New("some disk error") + errs := []error{ + diskErr, nil, nil, nil, + nil, diskErr, nil, nil, + diskErr, nil, nil, nil, + nil, nil, nil, diskErr, + } + + // Test normal setup with some disks + // returning errors + newDisks := evalDisks(disks, errs) + if newDisks[0] != nil || + newDisks[1] != disks[1] || + newDisks[2] != disks[2] || + newDisks[3] != disks[3] || + newDisks[4] != disks[4] || + newDisks[5] != nil || + newDisks[6] != disks[6] || + newDisks[7] != disks[7] || + newDisks[8] != nil || + newDisks[9] != disks[9] || + newDisks[10] != disks[10] || + newDisks[11] != disks[11] || + newDisks[12] != disks[12] || + newDisks[13] != disks[13] || + newDisks[14] != disks[14] || + newDisks[15] != nil { + t.Errorf("evalDisks returned incorrect new disk set.") + } + + // Test when number of errs doesn't match with number of disks + errs = []error{nil, nil, nil, nil} + newDisks = evalDisks(disks, errs) + if newDisks != nil { + t.Errorf("evalDisks returned no nil slice") + } +}