test: Add GetObjectNInfo test with some outdated disks (#15004)

Add a test reading an object which has some old data in some outdated
disks, in a versioned and non-versioned bucket.
This commit is contained in:
Anis Elleuch 2022-05-31 01:52:59 +01:00 committed by GitHub
parent d480022711
commit 56a61bab56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1111,3 +1111,97 @@ func TestGetObjectInlineNotInline(t *testing.T) {
t.Fatalf("Expected data to have md5sum = `%s`, found `%s`", expectedHash, foundHash)
}
}
// Test reading an object with some outdated data in some disks
func TestGetObjectWithOutdatedDisks(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create an instance of xl backend.
obj, fsDirs, err := prepareErasure(ctx, 6)
if err != nil {
t.Fatal(err)
}
// Cleanup backend directories.
defer obj.Shutdown(context.Background())
defer removeRoots(fsDirs)
z := obj.(*erasureServerPools)
sets := z.serverPools[0]
xl := sets.sets[0]
origErasureDisks := xl.getDisks()
testCases := []struct {
bucket string
versioned bool
object string
content []byte
}{
{"bucket1", false, "object1", []byte("aaaaaaaaaaaaaaaa")},
{"bucket2", false, "object2", bytes.Repeat([]byte{'a'}, smallFileThreshold*2)},
{"bucket3", true, "version1", []byte("aaaaaaaaaaaaaaaa")},
{"bucket4", true, "version2", bytes.Repeat([]byte{'a'}, smallFileThreshold*2)},
}
for i, testCase := range testCases {
// Step 1: create a bucket
err = z.MakeBucketWithLocation(ctx, testCase.bucket, BucketOptions{VersioningEnabled: testCase.versioned})
if err != nil {
t.Fatalf("Test %d: Failed to create a bucket: %v", i+1, err)
}
// Step 2: Upload an object with a random content
initialData := bytes.Repeat([]byte{'b'}, len(testCase.content))
sets.erasureDisksMu.Lock()
xl.getDisks = func() []StorageAPI { return origErasureDisks }
sets.erasureDisksMu.Unlock()
_, err = z.PutObject(ctx, testCase.bucket, testCase.object, mustGetPutObjReader(t, bytes.NewReader(initialData), int64(len(initialData)), "", ""),
ObjectOptions{Versioned: testCase.versioned})
if err != nil {
t.Fatalf("Test %d: Failed to upload a random object: %v", i+1, err)
}
// Step 3: Upload the object with some disks offline
sets.erasureDisksMu.Lock()
xl.getDisks = func() []StorageAPI {
disks := make([]StorageAPI, len(origErasureDisks))
copy(disks, origErasureDisks)
disks[0] = nil
disks[1] = nil
return disks
}
sets.erasureDisksMu.Unlock()
_, err = z.PutObject(ctx, testCase.bucket, testCase.object, mustGetPutObjReader(t, bytes.NewReader(testCase.content), int64(len(testCase.content)), "", ""),
ObjectOptions{Versioned: testCase.versioned})
if err != nil {
t.Fatalf("Test %d: Failed to upload the final object: %v", i+1, err)
}
// Step 4: Try to read the object back and check its md5sum
sets.erasureDisksMu.Lock()
xl.getDisks = func() []StorageAPI { return origErasureDisks }
sets.erasureDisksMu.Unlock()
gr, err := z.GetObjectNInfo(ctx, testCase.bucket, testCase.object, nil, nil, readLock, ObjectOptions{})
if err != nil {
t.Fatalf("Expected GetObject to succeed, but failed with %v", err)
}
h := md5.New()
h.Write(testCase.content)
expectedHash := h.Sum(nil)
h.Reset()
_, err = io.Copy(h, gr)
if err != nil {
t.Fatalf("Test %d: Failed to calculate md5sum of the object: %v", i+1, err)
}
gr.Close()
foundHash := h.Sum(nil)
if !bytes.Equal(foundHash, expectedHash) {
t.Fatalf("Test %d: Expected data to have md5sum = `%x`, found `%x`", i+1, expectedHash, foundHash)
}
}
}