From c897b6a82db5a4b3c48d63494f8be8ca7534f341 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Wed, 10 Nov 2021 19:41:21 +0100 Subject: [PATCH] fix: missing entries on first list resume (#13627) On first list resume or when specifying a custom markers entries could be missed in rare cases. Do conservative truncation of entries when forwarding. Replaces #13619 --- cmd/metacache-walk.go | 12 ++- cmd/object-api-listobjects_test.go | 146 +++++++++++++++++++++++++++++ 2 files changed, 154 insertions(+), 4 deletions(-) diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index 02621aa59..c3a675f55 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -116,6 +116,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ forward := "" if len(opts.ForwardTo) > 0 && strings.HasPrefix(opts.ForwardTo, current) { forward = strings.TrimPrefix(opts.ForwardTo, current) + // Trim further directories and trailing slash. if idx := strings.IndexByte(forward, '/'); idx > 0 { forward = forward[:idx] } @@ -163,7 +164,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ entries[i] = entry continue } - // Trim slash, maybe compiler is clever? + // Trim slash, since we don't know if this is folder or object. entries[i] = entries[i][:len(entry)-1] continue } @@ -214,9 +215,12 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ dirStack := make([]string, 0, 5) prefix = "" // Remove prefix after first level as we have already filtered the list. if len(forward) > 0 { - idx := sort.SearchStrings(entries, forward) - if idx > 0 { - entries = entries[idx:] + // Conservative forwarding. Entries may be either objects or prefixes. + for i, entry := range entries { + if entry >= forward || strings.HasPrefix(forward, entry) { + entries = entries[i:] + break + } } } diff --git a/cmd/object-api-listobjects_test.go b/cmd/object-api-listobjects_test.go index 968de6da2..240ff4202 100644 --- a/cmd/object-api-listobjects_test.go +++ b/cmd/object-api-listobjects_test.go @@ -1378,6 +1378,152 @@ func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHand } } +// Wrapper for calling ListObjects continuation tests for both Erasure multiple disks and single node setup. +func TestListObjectsContinuation(t *testing.T) { + ExecObjectLayerTest(t, testListObjectsContinuation) +} + +// Unit test for ListObjects in general. +func testListObjectsContinuation(obj ObjectLayer, instanceType string, t1 TestErrHandler) { + t, _ := t1.(*testing.T) + testBuckets := []string{ + // This bucket is used for testing ListObject operations. + "test-bucket-list-object-continuation-1", + "test-bucket-list-object-continuation-2", + } + for _, bucket := range testBuckets { + err := obj.MakeBucketWithLocation(context.Background(), bucket, BucketOptions{}) + if err != nil { + t.Fatalf("%s : %s", instanceType, err.Error()) + } + } + + var err error + testObjects := []struct { + parentBucket string + name string + content string + meta map[string]string + }{ + {testBuckets[0], "a/1.txt", "contentstring", nil}, + {testBuckets[0], "a-1.txt", "contentstring", nil}, + {testBuckets[0], "a.txt", "contentstring", nil}, + {testBuckets[0], "apache2-doc/1.txt", "contentstring", nil}, + {testBuckets[0], "apache2/1.txt", "contentstring", nil}, + {testBuckets[0], "apache2/-sub/2.txt", "contentstring", nil}, + {testBuckets[1], "azerty/1.txt", "contentstring", nil}, + {testBuckets[1], "apache2-doc/1.txt", "contentstring", nil}, + {testBuckets[1], "apache2/1.txt", "contentstring", nil}, + } + for _, object := range testObjects { + md5Bytes := md5.Sum([]byte(object.content)) + _, err = obj.PutObject(context.Background(), object.parentBucket, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content), + int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{UserDefined: object.meta}) + if err != nil { + t.Fatalf("%s : %s", instanceType, err.Error()) + } + + } + + // Formualting the result data set to be expected from ListObjects call inside the tests, + // This will be used in testCases and used for asserting the correctness of ListObjects output in the tests. + + resultCases := []ListObjectsInfo{ + { + Objects: []ObjectInfo{ + {Name: "a-1.txt"}, + {Name: "a.txt"}, + {Name: "a/1.txt"}, + {Name: "apache2-doc/1.txt"}, + {Name: "apache2/-sub/2.txt"}, + {Name: "apache2/1.txt"}, + }, + }, + { + Objects: []ObjectInfo{ + {Name: "apache2-doc/1.txt"}, + {Name: "apache2/1.txt"}, + }, + }, + { + Prefixes: []string{"apache2-doc/", "apache2/", "azerty/"}, + }, + } + + testCases := []struct { + // Inputs to ListObjects. + bucketName string + prefix string + delimiter string + page int + // Expected output of ListObjects. + result ListObjectsInfo + }{ + {testBuckets[0], "", "", 1, resultCases[0]}, + {testBuckets[0], "a", "", 1, resultCases[0]}, + {testBuckets[1], "apache", "", 1, resultCases[1]}, + {testBuckets[1], "", "/", 1, resultCases[2]}, + } + + for i, testCase := range testCases { + testCase := testCase + t.Run(fmt.Sprintf("%s-Test%d", instanceType, i+1), func(t *testing.T) { + var foundObjects []ObjectInfo + var foundPrefixes []string + var marker = "" + for { + result, err := obj.ListObjects(context.Background(), testCase.bucketName, + testCase.prefix, marker, testCase.delimiter, testCase.page) + if err != nil { + t.Fatalf("Test %d: %s: Expected to pass, but failed with: %s", i+1, instanceType, err.Error()) + } + foundObjects = append(foundObjects, result.Objects...) + foundPrefixes = append(foundPrefixes, result.Prefixes...) + if !result.IsTruncated { + break + } + marker = result.NextMarker + if len(result.Objects) > 0 { + // Discard marker, so it cannot resume listing. + marker = result.Objects[len(result.Objects)-1].Name + } + } + + if len(testCase.result.Objects) != len(foundObjects) { + t.Logf("want: %v", objInfoNames(testCase.result.Objects)) + t.Logf("got: %v", objInfoNames(foundObjects)) + t.Errorf("Test %d: %s: Expected number of objects in the result to be '%d', but found '%d' objects instead", + i+1, instanceType, len(testCase.result.Objects), len(foundObjects)) + } + for j := 0; j < len(testCase.result.Objects); j++ { + if j >= len(foundObjects) { + t.Errorf("Test %d: %s: Expected object name to be \"%s\", but not nothing instead", i+1, instanceType, testCase.result.Objects[j].Name) + continue + } + if testCase.result.Objects[j].Name != foundObjects[j].Name { + t.Errorf("Test %d: %s: Expected object name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.result.Objects[j].Name, foundObjects[j].Name) + } + } + + if len(testCase.result.Prefixes) != len(foundPrefixes) { + t.Logf("want: %v", testCase.result.Prefixes) + t.Logf("got: %v", foundPrefixes) + t.Errorf("Test %d: %s: Expected number of prefixes in the result to be '%d', but found '%d' prefixes instead", + i+1, instanceType, len(testCase.result.Prefixes), len(foundPrefixes)) + } + for j := 0; j < len(testCase.result.Prefixes); j++ { + if j >= len(foundPrefixes) { + t.Errorf("Test %d: %s: Expected prefix name to be \"%s\", but found no result", i+1, instanceType, testCase.result.Prefixes[j]) + continue + } + if testCase.result.Prefixes[j] != foundPrefixes[j] { + t.Errorf("Test %d: %s: Expected prefix name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.result.Prefixes[j], foundPrefixes[j]) + } + } + }) + } +} + // Initialize FS backend for the benchmark. func initFSObjectsB(disk string, t *testing.B) (obj ObjectLayer) { var err error