From 21c868a646f1c1d2aad8b1316a1914938afd1643 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 2 Dec 2021 08:46:33 -0800 Subject: [PATCH] fix: do not ignore delete-marker directories in ListObjects() (#13804) Following scenario such as objects that exist inside a prefix say `folder/` must be included in the listObjects() response. ``` 2aa16073-387e-492c-9d59-b4b0b7b6997a v2 DEL folder/ a5b9ce68-7239-4921-90ab-20aed402c7a2 v1 PUT folder/ f2211798-0eeb-4d9e-9184-fcfeae27d069 v1 PUT folder/1.txt ``` Current master does not handle this scenario, because it ignores the top level delete-marker on folders. This is however unexpected. It is expected that list-objects returns the top level prefix in this situation. ``` aws s3api list-objects --bucket harshavardhana --prefix unique/ \ --delimiter / --profile minio --endpoint-url http://localhost:9000 { "CommonPrefixes": [ { "Prefix": "unique/folder/" } ] } ``` There are applications in the wild such as Hadoop s3a connector that exploit this behavior and expect the folder to be present in the response. This also makes the behavior consistent with AWS S3. --- cmd/erasure-server-pool.go | 15 ++- cmd/object-api-listobjects_test.go | 208 ++++++++++++++++++++++++++++- 2 files changed, 211 insertions(+), 12 deletions(-) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 5455c985a..9f1f808d3 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1118,12 +1118,15 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma } opts := listPathOptions{ - Bucket: bucket, - Prefix: prefix, - Separator: delimiter, - Limit: maxKeysPlusOne(maxKeys, marker != ""), - Marker: marker, - InclDeleted: false, + Bucket: bucket, + Prefix: prefix, + Separator: delimiter, + Limit: maxKeysPlusOne(maxKeys, marker != ""), + Marker: marker, + // When delimiter is set make sure to include delete markers + // necessary to capture proper CommonPrefixes as expected + // in the response as per AWS S3. + InclDeleted: delimiter != "", AskDisks: globalAPIConfig.getListQuorum(), } merged, err := z.listPath(ctx, &opts) diff --git a/cmd/object-api-listobjects_test.go b/cmd/object-api-listobjects_test.go index 0feb27995..5265423a1 100644 --- a/cmd/object-api-listobjects_test.go +++ b/cmd/object-api-listobjects_test.go @@ -30,13 +30,203 @@ import ( "testing" ) -// Wrapper for calling ListObjects tests for both Erasure multiple disks and single node setup. +func TestListObjectsVersionedFolders(t *testing.T) { + ExecObjectLayerTest(t, testListObjectsVersionedFolders) +} + +func testListObjectsVersionedFolders(obj ObjectLayer, instanceType string, t1 TestErrHandler) { + if instanceType == FSTestStr { + return + } + t, _ := t1.(*testing.T) + testBuckets := []string{ + // This bucket is used for testing ListObject operations. + "test-bucket-folders", + } + for _, bucket := range testBuckets { + err := obj.MakeBucketWithLocation(context.Background(), bucket, BucketOptions{ + VersioningEnabled: true, + }) + if err != nil { + t.Fatalf("%s : %s", instanceType, err.Error()) + } + } + + var err error + testObjects := []struct { + parentBucket string + name string + content string + meta map[string]string + addDeleteMarker bool + }{ + {testBuckets[0], "unique/folder/", "", nil, true}, + {testBuckets[0], "unique/folder/1.txt", "content", nil, false}, + } + for _, object := range testObjects { + md5Bytes := md5.Sum([]byte(object.content)) + _, err = obj.PutObject(context.Background(), object.parentBucket, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content), + int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{ + Versioned: globalBucketVersioningSys.Enabled(object.parentBucket), + UserDefined: object.meta, + }) + if err != nil { + t.Fatalf("%s : %s", instanceType, err.Error()) + } + if object.addDeleteMarker { + oi, err := obj.DeleteObject(context.Background(), object.parentBucket, object.name, ObjectOptions{ + Versioned: globalBucketVersioningSys.Enabled(object.parentBucket), + }) + if err != nil { + t.Fatalf("%s : %s", instanceType, err.Error()) + } + if oi.DeleteMarker != object.addDeleteMarker { + t.Fatalf("Expected, marker %t : got %t", object.addDeleteMarker, oi.DeleteMarker) + } + } + } + + // Formulating the result data set to be expected from ListObjects call inside the tests, + // This will be used in testCases and used for asserting the correctness of ListObjects output in the tests. + + resultCases := []ListObjectsInfo{ + // ListObjectsResult-0. + // Testing list only the top-level folder. + { + IsTruncated: false, + Prefixes: []string{"unique/folder/"}, + }, + { + IsTruncated: false, + Objects: []ObjectInfo{ + {Name: "unique/folder/1.txt"}, + }, + }, + } + + testCases := []struct { + // Inputs to ListObjects. + bucketName string + prefix string + marker string + delimiter string + maxKeys int32 + // Expected output of ListObjects. + result ListObjectsInfo + err error + // Flag indicating whether the test is expected to pass or not. + shouldPass bool + }{ + {testBuckets[0], "unique/", "", "/", 1000, resultCases[0], nil, true}, + {testBuckets[0], "unique/folder", "", "/", 1000, resultCases[0], nil, true}, + {testBuckets[0], "unique/", "", "", 1000, resultCases[1], nil, true}, + } + + for i, testCase := range testCases { + testCase := testCase + t.Run(fmt.Sprintf("%s-Test%d", instanceType, i+1), func(t *testing.T) { + t.Log("ListObjects, bucket:", testCase.bucketName, "prefix:", testCase.prefix, "marker:", testCase.marker, "delimiter:", testCase.delimiter, "maxkeys:", testCase.maxKeys) + result, err := obj.ListObjects(context.Background(), testCase.bucketName, + testCase.prefix, testCase.marker, testCase.delimiter, int(testCase.maxKeys)) + if err != nil && testCase.shouldPass { + t.Errorf("Test %d: %s: Expected to pass, but failed with: %s", i+1, instanceType, err.Error()) + } + if err == nil && !testCase.shouldPass { + t.Errorf("Test %d: %s: Expected to fail with \"%s\", but passed instead", i+1, instanceType, testCase.err.Error()) + } + // Failed as expected, but does it fail for the expected reason. + if err != nil && !testCase.shouldPass { + if !strings.Contains(err.Error(), testCase.err.Error()) { + t.Errorf("Test %d: %s: Expected to fail with error \"%s\", but instead failed with error \"%s\" instead", i+1, instanceType, testCase.err.Error(), err.Error()) + } + } + // Since there are cases for which ListObjects fails, this is + // necessary. Test passes as expected, but the output values + // are verified for correctness here. + if err == nil && testCase.shouldPass { + // The length of the expected ListObjectsResult.Objects + // should match in both expected result from test cases + // and in the output. On failure calling t.Fatalf, + // otherwise it may lead to index out of range error in + // assertion following this. + if len(testCase.result.Objects) != len(result.Objects) { + t.Logf("want: %v", objInfoNames(testCase.result.Objects)) + t.Logf("got: %v", objInfoNames(result.Objects)) + t.Errorf("Test %d: %s: Expected number of object in the result to be '%d', but found '%d' objects instead", i+1, instanceType, len(testCase.result.Objects), len(result.Objects)) + } + for j := 0; j < len(testCase.result.Objects); j++ { + if j >= len(result.Objects) { + t.Errorf("Test %d: %s: Expected object name to be \"%s\", but not nothing instead", i+1, instanceType, testCase.result.Objects[j].Name) + continue + } + if testCase.result.Objects[j].Name != result.Objects[j].Name { + t.Errorf("Test %d: %s: Expected object name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.result.Objects[j].Name, result.Objects[j].Name) + } + } + + if len(testCase.result.Prefixes) != len(result.Prefixes) { + t.Logf("want: %v", testCase.result.Prefixes) + t.Logf("got: %v", result.Prefixes) + t.Errorf("Test %d: %s: Expected number of prefixes in the result to be '%d', but found '%d' prefixes instead", i+1, instanceType, len(testCase.result.Prefixes), len(result.Prefixes)) + } + for j := 0; j < len(testCase.result.Prefixes); j++ { + if j >= len(result.Prefixes) { + t.Errorf("Test %d: %s: Expected prefix name to be \"%s\", but found no result", i+1, instanceType, testCase.result.Prefixes[j]) + continue + } + if testCase.result.Prefixes[j] != result.Prefixes[j] { + t.Errorf("Test %d: %s: Expected prefix name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.result.Prefixes[j], result.Prefixes[j]) + } + } + + if testCase.result.IsTruncated != result.IsTruncated { + // Allow an extra continuation token. + if !result.IsTruncated || len(result.Objects) == 0 { + t.Errorf("Test %d: %s: Expected IsTruncated flag to be %v, but instead found it to be %v", i+1, instanceType, testCase.result.IsTruncated, result.IsTruncated) + } + } + + if testCase.result.IsTruncated && result.NextMarker == "" { + t.Errorf("Test %d: %s: Expected NextMarker to contain a string since listing is truncated, but instead found it to be empty", i+1, instanceType) + } + + if !testCase.result.IsTruncated && result.NextMarker != "" { + if !result.IsTruncated || len(result.Objects) == 0 { + t.Errorf("Test %d: %s: Expected NextMarker to be empty since listing is not truncated, but instead found `%v`", i+1, instanceType, result.NextMarker) + } + } + + } + }) + } +} + +// Wrapper for calling ListObjectsOnVersionedBuckets tests for both +// Erasure multiple disks and single node setup. +func TestListObjectsOnVersionedBuckets(t *testing.T) { + ExecObjectLayerTest(t, testListObjectsOnVersionedBuckets) +} + +// Wrapper for calling ListObjects tests for both Erasure multiple +// disks and single node setup. func TestListObjects(t *testing.T) { ExecObjectLayerTest(t, testListObjects) } -// Unit test for ListObjects in general. +// Unit test for ListObjects on VersionedBucket. +func testListObjectsOnVersionedBuckets(obj ObjectLayer, instanceType string, t1 TestErrHandler) { + _testListObjects(obj, instanceType, t1, true) +} + +// Unit test for ListObjects. func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) { + _testListObjects(obj, instanceType, t1, false) +} + +func _testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler, versioned bool) { + if instanceType == FSTestStr && versioned { + return + } t, _ := t1.(*testing.T) testBuckets := []string{ // This bucket is used for testing ListObject operations. @@ -54,7 +244,9 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) { "test-bucket-max-keys-prefixes", } for _, bucket := range testBuckets { - err := obj.MakeBucketWithLocation(context.Background(), bucket, BucketOptions{}) + err := obj.MakeBucketWithLocation(context.Background(), bucket, BucketOptions{ + VersioningEnabled: versioned, + }) if err != nil { t.Fatalf("%s : %s", instanceType, err.Error()) } @@ -92,15 +284,19 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) { } for _, object := range testObjects { md5Bytes := md5.Sum([]byte(object.content)) - _, err = obj.PutObject(context.Background(), object.parentBucket, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content), - int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{UserDefined: object.meta}) + _, err = obj.PutObject(context.Background(), object.parentBucket, object.name, + mustGetPutObjReader(t, bytes.NewBufferString(object.content), + int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{ + Versioned: globalBucketVersioningSys.Enabled(object.parentBucket), + UserDefined: object.meta, + }) if err != nil { t.Fatalf("%s : %s", instanceType, err.Error()) } } - // Formualting the result data set to be expected from ListObjects call inside the tests, + // Formulating the result data set to be expected from ListObjects call inside the tests, // This will be used in testCases and used for asserting the correctness of ListObjects output in the tests. resultCases := []ListObjectsInfo{