fix: delimiter based listing was broken without marker (#11136)

with missing nextMarker with delimiter based listing,
top level prefixes beyond 4500 or max-keys value
wouldn't be sent back for client to ask for the next
batch.

reproduced at a customer deployment, create prefixes
as shown below

```
for year in $(seq 2017 2020)
do
    for month in {01..12}
    do for day in {01..31}
       do
           mc -q cp file myminio/testbucket/dir/day_id=$year-$month-$day/;
       done
    done
done
```

Then perform

```
aws s3api --profile minio --endpoint-url http://localhost:9000 list-objects \
    --bucket testbucket --prefix dir/ --delimiter / --max-keys 1000
```

You shall see missing NextMarker, this would disallow listing beyond max-keys
requested and also disallow beyond 4500 (maxKeyObjectList) prefixes being listed
because client wouldn't know the NextMarker available.

This PR addresses this situation properly by making the implementation
more spec compatible. i.e NextMarker in-fact can be either an object, a prefix
with delimiter depending on the input operation.

This issue was introduced after the list caching changes and has been present
for a while.
This commit is contained in:
Harshavardhana 2020-12-19 09:36:04 -08:00 committed by GitHub
parent 6128304f6e
commit e5d378931d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 86 additions and 51 deletions

View File

@ -712,14 +712,21 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre
if err != nil && err != io.EOF {
return loi, err
}
loi.Objects, loi.Prefixes = merged.fileInfoVersions(bucket, prefix, delimiter, versionMarker)
loi.IsTruncated = err == nil && len(loi.Objects) > 0
if maxKeys > 0 && len(loi.Objects) > maxKeys {
loi.Objects = loi.Objects[:maxKeys]
objects := merged.fileInfoVersions(bucket, prefix, delimiter, versionMarker)
loi.IsTruncated = err == nil && len(objects) > 0
if maxKeys > 0 && len(objects) > maxKeys {
objects = objects[:maxKeys]
loi.IsTruncated = true
}
for _, obj := range objects {
if obj.IsDir && delimiter != "" {
loi.Prefixes = append(loi.Prefixes, obj.Name)
} else {
loi.Objects = append(loi.Objects, obj)
}
}
if loi.IsTruncated {
last := loi.Objects[len(loi.Objects)-1]
last := objects[len(objects)-1]
loi.NextMarker = encodeMarker(last.Name, merged.listID)
loi.NextVersionIDMarker = last.VersionID
}
@ -728,6 +735,7 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre
func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
var loi ListObjectsInfo
merged, err := z.listPath(ctx, listPathOptions{
Bucket: bucket,
Prefix: prefix,
@ -741,11 +749,24 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
logger.LogIf(ctx, err)
return loi, err
}
// Default is recursive, if delimiter is set then list non recursive.
loi.Objects, loi.Prefixes = merged.fileInfos(bucket, prefix, delimiter)
loi.IsTruncated = err == nil && len(loi.Objects) > 0
objects := merged.fileInfos(bucket, prefix, delimiter)
loi.IsTruncated = err == nil && len(objects) > 0
if maxKeys > 0 && len(objects) > maxKeys {
objects = objects[:maxKeys]
loi.IsTruncated = true
}
for _, obj := range objects {
if obj.IsDir && delimiter != "" {
loi.Prefixes = append(loi.Prefixes, obj.Name)
} else {
loi.Objects = append(loi.Objects, obj)
}
}
if loi.IsTruncated {
loi.NextMarker = encodeMarker(loi.Objects[len(loi.Objects)-1].Name, merged.listID)
last := objects[len(objects)-1]
loi.NextMarker = encodeMarker(last.Name, merged.listID)
}
return loi, nil
}

View File

@ -309,7 +309,7 @@ func (m *metaCacheEntriesSorted) iterate(fn func(entry metaCacheEntry) (cont boo
// fileInfoVersions converts the metadata to FileInfoVersions where possible.
// Metadata that cannot be decoded is skipped.
func (m *metaCacheEntriesSorted) fileInfoVersions(bucket, prefix, delimiter, afterV string) (versions []ObjectInfo, commonPrefixes []string) {
func (m *metaCacheEntriesSorted) fileInfoVersions(bucket, prefix, delimiter, afterV string) (versions []ObjectInfo) {
versions = make([]ObjectInfo, 0, m.len())
prevPrefix := ""
for _, entry := range m.o {
@ -323,7 +323,11 @@ func (m *metaCacheEntriesSorted) fileInfoVersions(bucket, prefix, delimiter, aft
continue
}
prevPrefix = currPrefix
commonPrefixes = append(commonPrefixes, currPrefix)
versions = append(versions, ObjectInfo{
IsDir: true,
Bucket: bucket,
Name: currPrefix,
})
continue
}
}
@ -356,17 +360,20 @@ func (m *metaCacheEntriesSorted) fileInfoVersions(bucket, prefix, delimiter, aft
continue
}
prevPrefix = currPrefix
commonPrefixes = append(commonPrefixes, currPrefix)
continue
versions = append(versions, ObjectInfo{
IsDir: true,
Bucket: bucket,
Name: currPrefix,
})
}
}
return versions, commonPrefixes
return versions
}
// fileInfoVersions converts the metadata to FileInfoVersions where possible.
// Metadata that cannot be decoded is skipped.
func (m *metaCacheEntriesSorted) fileInfos(bucket, prefix, delimiter string) (objects []ObjectInfo, commonPrefixes []string) {
func (m *metaCacheEntriesSorted) fileInfos(bucket, prefix, delimiter string) (objects []ObjectInfo) {
objects = make([]ObjectInfo, 0, m.len())
prevPrefix := ""
for _, entry := range m.o {
@ -380,7 +387,11 @@ func (m *metaCacheEntriesSorted) fileInfos(bucket, prefix, delimiter string) (ob
continue
}
prevPrefix = currPrefix
commonPrefixes = append(commonPrefixes, currPrefix)
objects = append(objects, ObjectInfo{
IsDir: true,
Bucket: bucket,
Name: currPrefix,
})
continue
}
}
@ -405,12 +416,15 @@ func (m *metaCacheEntriesSorted) fileInfos(bucket, prefix, delimiter string) (ob
continue
}
prevPrefix = currPrefix
commonPrefixes = append(commonPrefixes, currPrefix)
continue
objects = append(objects, ObjectInfo{
IsDir: true,
Bucket: bucket,
Name: currPrefix,
})
}
}
return objects, commonPrefixes
return objects
}
// forwardTo will truncate m so only entries that are s or after is in the list.

View File

@ -49,6 +49,8 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
"test-bucket-single-object",
// Listing uncommon delimiter.
"test-bucket-delimiter",
// Listing prefixes > maxKeys
"test-bucket-max-keys-prefixes",
}
for _, bucket := range testBuckets {
err := obj.MakeBucketWithLocation(context.Background(), bucket, BucketOptions{})
@ -79,6 +81,8 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
{testBuckets[3], "A/B", "contentstring", nil},
{testBuckets[4], "file1/receipt.json", "content", nil},
{testBuckets[4], "file1/guidSplunk-aaaa/file", "content", nil},
{testBuckets[5], "dir/day_id=2017-10-10/issue", "content", nil},
{testBuckets[5], "dir/day_id=2017-10-11/issue", "content", nil},
}
for _, object := range testObjects {
md5Bytes := md5.Sum([]byte(object.content))
@ -467,6 +471,11 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
},
Prefixes: []string{"file1/guidSplunk"},
},
// ListObjectsResult-36 list with nextmarker prefix and maxKeys set to 1.
{
IsTruncated: true,
Prefixes: []string{"dir/day_id=2017-10-10/"},
},
}
testCases := []struct {
@ -589,6 +598,8 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
{testBuckets[4], "", "", "guidSplunk", 1000, resultCases[35], nil, true},
// Test listing an object with uncommon delimiter and matching prefix
{testBuckets[4], "file1/", "", "guidSplunk", 1000, resultCases[35], nil, true},
// Test listing at prefix with expected prefix markers
{testBuckets[5], "dir/", "", SlashSeparator, 1, resultCases[36], nil, true},
}
for i, testCase := range testCases {
@ -661,27 +672,16 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
}
if testCase.result.IsTruncated && result.NextMarker == "" {
t.Errorf("Test %d: %s: Expected NextContinuationToken to contain a string since listing is truncated, but instead found it to be empty", i+1, instanceType)
t.Errorf("Test %d: %s: Expected NextMarker to contain a string since listing is truncated, but instead found it to be empty", i+1, instanceType)
}
if !testCase.result.IsTruncated && result.NextMarker != "" {
if !result.IsTruncated || len(result.Objects) == 0 {
t.Errorf("Test %d: %s: Expected NextContinuationToken to be empty since listing is not truncated, but instead found `%v`", i+1, instanceType, result.NextMarker)
t.Errorf("Test %d: %s: Expected NextMarker to be empty since listing is not truncated, but instead found `%v`", i+1, instanceType, result.NextMarker)
}
}
}
// Take ListObject treeWalk go-routine to completion, if available in the treewalk pool.
for result.IsTruncated {
result, err = obj.ListObjects(context.Background(), testCase.bucketName,
testCase.prefix, result.NextMarker, testCase.delimiter, 1000)
if err != nil {
t.Fatal(err)
}
if !testCase.result.IsTruncated && len(result.Objects) > 0 {
t.Errorf("expected to get all objects in the previous call, but got %d more", len(result.Objects))
}
}
})
}
}
@ -714,6 +714,8 @@ func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHand
"test-bucket-single-object",
// Listing uncommon delimiter.
"test-bucket-delimiter",
// Listing prefixes > maxKeys
"test-bucket-max-keys-prefixes",
}
for _, bucket := range testBuckets {
err := obj.MakeBucketWithLocation(context.Background(), bucket, BucketOptions{VersioningEnabled: true})
@ -748,7 +750,10 @@ func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHand
{testBuckets[3], "A/B", "contentstring", nil},
{testBuckets[4], "file1/receipt.json", "content", nil},
{testBuckets[4], "file1/guidSplunk-aaaa/file", "content", nil},
{testBuckets[5], "dir/day_id=2017-10-10/issue", "content", nil},
{testBuckets[5], "dir/day_id=2017-10-11/issue", "content", nil},
}
for _, object := range testObjects {
md5Bytes := md5.Sum([]byte(object.content))
_, err = obj.PutObject(context.Background(), object.parentBucket, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content),
@ -1140,6 +1145,11 @@ func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHand
},
Prefixes: []string{"file1/guidSplunk"},
},
// ListObjectsResult-36 list with nextmarker prefix and maxKeys set to 1.
{
IsTruncated: true,
Prefixes: []string{"dir/day_id=2017-10-10/"},
},
}
testCases := []struct {
@ -1262,6 +1272,8 @@ func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHand
{testBuckets[4], "", "", "guidSplunk", 1000, resultCases[35], nil, true},
// Test listing an object with uncommon delimiter and matching prefix
{testBuckets[4], "file1/", "", "guidSplunk", 1000, resultCases[35], nil, true},
// Test listing at prefix with expected prefix markers
{testBuckets[5], "dir/", "", SlashSeparator, 1, resultCases[36], nil, true},
}
for i, testCase := range testCases {
@ -1326,26 +1338,14 @@ func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHand
}
if testCase.result.IsTruncated && result.NextMarker == "" {
t.Errorf("%s: Expected NextContinuationToken to contain a string since listing is truncated, but instead found it to be empty", instanceType)
t.Errorf("%s: Expected NextMarker to contain a string since listing is truncated, but instead found it to be empty", instanceType)
}
if !testCase.result.IsTruncated && result.NextMarker != "" {
if !result.IsTruncated || len(result.Objects) == 0 {
t.Errorf("%s: Expected NextContinuationToken to be empty since listing is not truncated, but instead found `%v`", instanceType, result.NextMarker)
t.Errorf("%s: Expected NextMarker to be empty since listing is not truncated, but instead found `%v`", instanceType, result.NextMarker)
}
}
}
// Take ListObject treeWalk go-routine to completion, if available in the treewalk pool.
for result.IsTruncated {
result, err = obj.ListObjectVersions(context.Background(), testCase.bucketName,
testCase.prefix, result.NextMarker, "", testCase.delimiter, 1000)
if err != nil {
t.Fatal(err)
}
if !testCase.result.IsTruncated && len(result.Objects) > 0 {
t.Errorf("expected to get all objects in the previous call, but got %d more", len(result.Objects))
}
}
})
}

View File

@ -570,11 +570,13 @@ func (web *webAPIHandlers) ListObjects(r *http.Request, args *ListObjectsArgs, r
nextMarker := ""
// Fetch all the objects
for {
// Limit browser to 1000 batches to be more responsive, scrolling friendly.
lo, err := listObjects(ctx, args.BucketName, args.Prefix, nextMarker, SlashSeparator, 1000)
// Limit browser to defaults batches to be more responsive, scrolling friendly.
lo, err := listObjects(ctx, args.BucketName, args.Prefix, nextMarker, SlashSeparator, -1)
if err != nil {
return &json2.Error{Message: err.Error()}
}
nextMarker = lo.NextMarker
for i := range lo.Objects {
lo.Objects[i].Size, err = lo.Objects[i].GetActualSize()
if err != nil {
@ -596,8 +598,6 @@ func (web *webAPIHandlers) ListObjects(r *http.Request, args *ListObjectsArgs, r
})
}
nextMarker = lo.NextMarker
// Return when there are no more objects
if !lo.IsTruncated {
return nil

View File

@ -23,5 +23,5 @@ fi
test_run_dir="$MINT_RUN_CORE_DIR/minio-py"
pip3 install --user faker
pip3 install minio==${MINIO_PY_VERSION}
pip3 install minio=="${MINIO_PY_VERSION}"
$WGET --output-document="$test_run_dir/tests.py" "https://raw.githubusercontent.com/minio/minio-py/${MINIO_PY_VERSION}/tests/functional/tests.py"

View File

@ -183,7 +183,7 @@ function main()
done
## Report when all tests in run_list are run
if [ $i -eq "$count" ]; then
if [ "$i" -eq "$count" ]; then
echo -e "\nAll tests ran successfully"
else
echo -e "\nExecuted $i out of $count tests successfully."