diff --git a/cmd/object-api-listobjects_test.go b/cmd/object-api-listobjects_test.go index f57b9503a..9d469f766 100644 --- a/cmd/object-api-listobjects_test.go +++ b/cmd/object-api-listobjects_test.go @@ -435,7 +435,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t TestErrHandler) { {"test-bucket-list-object", "", "", "*", 0, ListObjectsInfo{}, fmt.Errorf("delimiter '%s' is not supported", "*"), false}, {"test-bucket-list-object", "", "", "-", 0, ListObjectsInfo{}, fmt.Errorf("delimiter '%s' is not supported", "-"), false}, // Testing for failure cases with both perfix and marker (11). - // The prefix and marker combination to be valid it should satisy strings.HasPrefix(marker, prefix). + // The prefix and marker combination to be valid it should satisfy strings.HasPrefix(marker, prefix). {"test-bucket-list-object", "asia", "europe-object", "", 0, ListObjectsInfo{}, fmt.Errorf("Invalid combination of marker '%s' and prefix '%s'", "europe-object", "asia"), false}, // Setting a non-existing directory to be prefix (12-13). {"empty-bucket", "europe/france/", "", "", 1, ListObjectsInfo{}, nil, true}, diff --git a/cmd/object-api-multipart_test.go b/cmd/object-api-multipart_test.go index 947ee6bb6..5ac80fcf6 100644 --- a/cmd/object-api-multipart_test.go +++ b/cmd/object-api-multipart_test.go @@ -1110,7 +1110,7 @@ func testListMultipartUploads(obj ObjectLayer, instanceType string, t TestErrHan {bucketNames[0], "", "", "", "*", 0, ListMultipartsInfo{}, fmt.Errorf("delimiter '%s' is not supported", "*"), false}, {bucketNames[0], "", "", "", "-", 0, ListMultipartsInfo{}, fmt.Errorf("delimiter '%s' is not supported", "-"), false}, // Testing for failure cases with both perfix and marker (Test number 10). - // The prefix and marker combination to be valid it should satisy strings.HasPrefix(marker, prefix). + // The prefix and marker combination to be valid it should satisfy strings.HasPrefix(marker, prefix). {bucketNames[0], "asia", "europe-object", "", "", 0, ListMultipartsInfo{}, fmt.Errorf("Invalid combination of marker '%s' and prefix '%s'", "europe-object", "asia"), false}, // Setting an invalid combination of uploadIDMarker and Marker (Test number 11-12). diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index 29613e480..cc3647598 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -249,8 +249,10 @@ var objMetadataOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errV // readXLMetaParts - returns the XL Metadata Parts from xl.json of one of the disks picked at random. func (xl xlObjects) readXLMetaParts(bucket, object string) (xlMetaParts []objectPartInfo, err error) { + var ignoredErrs []error for _, disk := range xl.getLoadBalancedDisks() { if disk == nil { + ignoredErrs = append(ignoredErrs, errDiskNotFound) continue } xlMetaParts, err = readXLMetaParts(disk, bucket, object) @@ -260,18 +262,23 @@ func (xl xlObjects) readXLMetaParts(bucket, object string) (xlMetaParts []object // For any reason disk or bucket is not available continue // and read from other disks. if isErrIgnored(err, objMetadataOpIgnoredErrs...) { + ignoredErrs = append(ignoredErrs, err) continue } - break + // Error is not ignored, return right here. + return nil, err } - // Return error here. - return nil, err + // If all errors were ignored, reduce to maximal occurrence + // based on the read quorum. + return nil, reduceReadQuorumErrs(ignoredErrs, nil, xl.readQuorum) } // readXLMetaStat - return xlMetaV1.Stat and xlMetaV1.Meta from one of the disks picked at random. func (xl xlObjects) readXLMetaStat(bucket, object string) (xlStat statInfo, xlMeta map[string]string, err error) { + var ignoredErrs []error for _, disk := range xl.getLoadBalancedDisks() { if disk == nil { + ignoredErrs = append(ignoredErrs, errDiskNotFound) continue } // parses only xlMetaV1.Meta and xlMeta.Stat @@ -282,12 +289,15 @@ func (xl xlObjects) readXLMetaStat(bucket, object string) (xlStat statInfo, xlMe // For any reason disk or bucket is not available continue // and read from other disks. if isErrIgnored(err, objMetadataOpIgnoredErrs...) { + ignoredErrs = append(ignoredErrs, err) continue } - break + // Error is not ignored, return right here. + return statInfo{}, nil, err } - // Return error here. - return statInfo{}, nil, err + // If all errors were ignored, reduce to maximal occurrence + // based on the read quorum. + return statInfo{}, nil, reduceReadQuorumErrs(ignoredErrs, nil, xl.readQuorum) } // deleteXLMetadata - deletes `xl.json` on a single disk. diff --git a/cmd/xl-v1-metadata_test.go b/cmd/xl-v1-metadata_test.go index d09822a57..c20339e95 100644 --- a/cmd/xl-v1-metadata_test.go +++ b/cmd/xl-v1-metadata_test.go @@ -17,7 +17,9 @@ package cmd import ( + "bytes" "errors" + "path" "strconv" "testing" "time" @@ -25,6 +27,162 @@ import ( humanize "github.com/dustin/go-humanize" ) +// Tests for reading XL object info. +func TestXLReadStat(t *testing.T) { + ExecObjectLayerDiskAlteredTest(t, testXLReadStat) +} + +func testXLReadStat(obj ObjectLayer, instanceType string, disks []string, t *testing.T) { + // Setup for the tests. + bucketName := getRandomBucketName() + objectName := "test-object" + // create bucket. + err := obj.MakeBucket(bucketName) + // Stop the test if creation of the bucket fails. + if err != nil { + t.Fatalf("%s : %s", instanceType, err.Error()) + } + + // set of byte data for PutObject. + // object has to be created before running tests for GetObject. + // this is required even to assert the GetObject data, + // since dataInserted === dataFetched back is a primary criteria for any object storage this assertion is critical. + bytesData := []struct { + byteData []byte + }{ + {generateBytesData(6 * humanize.MiByte)}, + } + // set of inputs for uploading the objects before tests for downloading is done. + putObjectInputs := []struct { + bucketName string + objectName string + contentLength int64 + textData []byte + metaData map[string]string + }{ + // case - 1. + {bucketName, objectName, int64(len(bytesData[0].byteData)), bytesData[0].byteData, make(map[string]string)}, + } + sha256sum := "" + // iterate through the above set of inputs and upkoad the object. + for i, input := range putObjectInputs { + // uploading the object. + _, err = obj.PutObject(input.bucketName, input.objectName, input.contentLength, bytes.NewBuffer(input.textData), input.metaData, sha256sum) + // if object upload fails stop the test. + if err != nil { + t.Fatalf("Put Object case %d: Error uploading object: %v", i+1, err) + } + } + + _, _, err = obj.(*xlObjects).readXLMetaStat(bucketName, objectName) + if err != nil { + t.Fatal(err) + } + + // Remove one disk. + removeDiskN(disks, 7) + + // Removing disk shouldn't affect reading object info. + _, _, err = obj.(*xlObjects).readXLMetaStat(bucketName, objectName) + if err != nil { + t.Fatal(err) + } + + for _, disk := range disks { + removeAll(path.Join(disk, bucketName)) + } + + _, _, err = obj.(*xlObjects).readXLMetaStat(bucketName, objectName) + if errorCause(err) != errVolumeNotFound { + t.Fatal(err) + } +} + +// Tests for reading XL meta parts. +func TestXLReadMetaParts(t *testing.T) { + ExecObjectLayerDiskAlteredTest(t, testXLReadMetaParts) +} + +// testListObjectParts - Tests validate listing of object parts when disks go offline. +func testXLReadMetaParts(obj ObjectLayer, instanceType string, disks []string, t *testing.T) { + bucketNames := []string{"minio-bucket", "minio-2-bucket"} + objectNames := []string{"minio-object-1.txt"} + uploadIDs := []string{} + + // bucketnames[0]. + // objectNames[0]. + // uploadIds [0]. + // Create bucket before intiating NewMultipartUpload. + err := obj.MakeBucket(bucketNames[0]) + if err != nil { + // Failed to create newbucket, abort. + t.Fatalf("%s : %s", instanceType, err.Error()) + } + // Initiate Multipart Upload on the above created bucket. + uploadID, err := obj.NewMultipartUpload(bucketNames[0], objectNames[0], nil) + if err != nil { + // Failed to create NewMultipartUpload, abort. + t.Fatalf("%s : %s", instanceType, err.Error()) + } + + uploadIDs = append(uploadIDs, uploadID) + + // Create multipart parts. + // Need parts to be uploaded before MultipartLists can be called and tested. + createPartCases := []struct { + bucketName string + objName string + uploadID string + PartID int + inputReaderData string + inputMd5 string + intputDataSize int64 + expectedMd5 string + }{ + // Case 1-4. + // Creating sequence of parts for same uploadID. + // Used to ensure that the ListMultipartResult produces one output for the four parts uploaded below for the given upload ID. + {bucketNames[0], objectNames[0], uploadIDs[0], 1, "abcd", "e2fc714c4727ee9395f324cd2e7f331f", int64(len("abcd")), "e2fc714c4727ee9395f324cd2e7f331f"}, + {bucketNames[0], objectNames[0], uploadIDs[0], 2, "efgh", "1f7690ebdd9b4caf8fab49ca1757bf27", int64(len("efgh")), "1f7690ebdd9b4caf8fab49ca1757bf27"}, + {bucketNames[0], objectNames[0], uploadIDs[0], 3, "ijkl", "09a0877d04abf8759f99adec02baf579", int64(len("abcd")), "09a0877d04abf8759f99adec02baf579"}, + {bucketNames[0], objectNames[0], uploadIDs[0], 4, "mnop", "e132e96a5ddad6da8b07bba6f6131fef", int64(len("abcd")), "e132e96a5ddad6da8b07bba6f6131fef"}, + } + sha256sum := "" + // Iterating over creatPartCases to generate multipart chunks. + for _, testCase := range createPartCases { + _, perr := obj.PutObjectPart(testCase.bucketName, testCase.objName, testCase.uploadID, testCase.PartID, testCase.intputDataSize, bytes.NewBufferString(testCase.inputReaderData), testCase.inputMd5, sha256sum) + if perr != nil { + t.Fatalf("%s : %s", instanceType, perr) + } + } + + uploadIDPath := path.Join(bucketNames[0], objectNames[0], uploadIDs[0]) + + _, err = obj.(*xlObjects).readXLMetaParts(minioMetaMultipartBucket, uploadIDPath) + if err != nil { + t.Fatal(err) + } + + // Remove one disk. + removeDiskN(disks, 7) + + // Removing disk shouldn't affect reading object parts info. + _, err = obj.(*xlObjects).readXLMetaParts(minioMetaMultipartBucket, uploadIDPath) + if err != nil { + t.Fatal(err) + } + + for _, disk := range disks { + removeAll(path.Join(disk, bucketNames[0])) + removeAll(path.Join(disk, minioMetaMultipartBucket, bucketNames[0])) + } + + _, err = obj.(*xlObjects).readXLMetaParts(minioMetaMultipartBucket, uploadIDPath) + if errorCause(err) != errFileNotFound { + t.Fatal(err) + } +} + // Test xlMetaV1.AddObjectPart() func TestAddObjectPart(t *testing.T) { testCases := []struct { diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index a1e215024..b2a2b6b52 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -208,25 +208,28 @@ func (xl xlObjects) removeObjectPart(bucket, object, uploadID, partName string) // statPart - returns fileInfo structure for a successful stat on part file. func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInfo FileInfo, err error) { + var ignoredErrs []error partNamePath := path.Join(bucket, object, uploadID, partName) for _, disk := range xl.getLoadBalancedDisks() { if disk == nil { + ignoredErrs = append(ignoredErrs, errDiskNotFound) continue } fileInfo, err = disk.StatFile(minioMetaMultipartBucket, partNamePath) if err == nil { return fileInfo, nil } - err = traceError(err) // For any reason disk was deleted or goes offline we continue to next disk. if isErrIgnored(err, objMetadataOpIgnoredErrs...) { + ignoredErrs = append(ignoredErrs, err) continue } - - // Catastrophic error, we return. - break + // Error is not ignored, return right here. + return FileInfo{}, traceError(err) } - return FileInfo{}, err + // If all errors were ignored, reduce to maximal occurrence + // based on the read quorum. + return FileInfo{}, reduceReadQuorumErrs(ignoredErrs, nil, xl.readQuorum) } // commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks.