diff --git a/cmd/erasure-object_test.go b/cmd/erasure-object_test.go index e5de6a51a..7ca3849bd 100644 --- a/cmd/erasure-object_test.go +++ b/cmd/erasure-object_test.go @@ -1111,3 +1111,97 @@ func TestGetObjectInlineNotInline(t *testing.T) { t.Fatalf("Expected data to have md5sum = `%s`, found `%s`", expectedHash, foundHash) } } + +// Test reading an object with some outdated data in some disks +func TestGetObjectWithOutdatedDisks(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create an instance of xl backend. + obj, fsDirs, err := prepareErasure(ctx, 6) + if err != nil { + t.Fatal(err) + } + + // Cleanup backend directories. + defer obj.Shutdown(context.Background()) + defer removeRoots(fsDirs) + + z := obj.(*erasureServerPools) + sets := z.serverPools[0] + xl := sets.sets[0] + + origErasureDisks := xl.getDisks() + + testCases := []struct { + bucket string + versioned bool + object string + content []byte + }{ + {"bucket1", false, "object1", []byte("aaaaaaaaaaaaaaaa")}, + {"bucket2", false, "object2", bytes.Repeat([]byte{'a'}, smallFileThreshold*2)}, + {"bucket3", true, "version1", []byte("aaaaaaaaaaaaaaaa")}, + {"bucket4", true, "version2", bytes.Repeat([]byte{'a'}, smallFileThreshold*2)}, + } + + for i, testCase := range testCases { + // Step 1: create a bucket + err = z.MakeBucketWithLocation(ctx, testCase.bucket, BucketOptions{VersioningEnabled: testCase.versioned}) + if err != nil { + t.Fatalf("Test %d: Failed to create a bucket: %v", i+1, err) + } + + // Step 2: Upload an object with a random content + initialData := bytes.Repeat([]byte{'b'}, len(testCase.content)) + sets.erasureDisksMu.Lock() + xl.getDisks = func() []StorageAPI { return origErasureDisks } + sets.erasureDisksMu.Unlock() + _, err = z.PutObject(ctx, testCase.bucket, testCase.object, mustGetPutObjReader(t, bytes.NewReader(initialData), int64(len(initialData)), "", ""), + ObjectOptions{Versioned: testCase.versioned}) + if err != nil { + t.Fatalf("Test %d: Failed to upload a random object: %v", i+1, err) + } + + // Step 3: Upload the object with some disks offline + sets.erasureDisksMu.Lock() + xl.getDisks = func() []StorageAPI { + disks := make([]StorageAPI, len(origErasureDisks)) + copy(disks, origErasureDisks) + disks[0] = nil + disks[1] = nil + return disks + } + sets.erasureDisksMu.Unlock() + _, err = z.PutObject(ctx, testCase.bucket, testCase.object, mustGetPutObjReader(t, bytes.NewReader(testCase.content), int64(len(testCase.content)), "", ""), + ObjectOptions{Versioned: testCase.versioned}) + if err != nil { + t.Fatalf("Test %d: Failed to upload the final object: %v", i+1, err) + } + + // Step 4: Try to read the object back and check its md5sum + sets.erasureDisksMu.Lock() + xl.getDisks = func() []StorageAPI { return origErasureDisks } + sets.erasureDisksMu.Unlock() + gr, err := z.GetObjectNInfo(ctx, testCase.bucket, testCase.object, nil, nil, readLock, ObjectOptions{}) + if err != nil { + t.Fatalf("Expected GetObject to succeed, but failed with %v", err) + } + + h := md5.New() + h.Write(testCase.content) + expectedHash := h.Sum(nil) + + h.Reset() + _, err = io.Copy(h, gr) + if err != nil { + t.Fatalf("Test %d: Failed to calculate md5sum of the object: %v", i+1, err) + } + gr.Close() + foundHash := h.Sum(nil) + + if !bytes.Equal(foundHash, expectedHash) { + t.Fatalf("Test %d: Expected data to have md5sum = `%x`, found `%x`", i+1, expectedHash, foundHash) + } + } +}