fix: do not list delete-marked objects (#13864)

delete marked objects should not be considered
for listing when listing is delimited, this issue
as introduced in PR #13804 which was mainly to
address listing of directories in listing when
delimited.

This PR fixes this properly and adds tests to
ensure that we behave in accordance with how
an S3 API behaves for ListObjects() without
versions.
This commit is contained in:
Harshavardhana 2021-12-08 17:34:52 -08:00 committed by GitHub
parent 0a66a6f1e5
commit dcff6c996d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 83 additions and 16 deletions

View File

@ -1118,15 +1118,12 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
} }
opts := listPathOptions{ opts := listPathOptions{
Bucket: bucket, Bucket: bucket,
Prefix: prefix, Prefix: prefix,
Separator: delimiter, Separator: delimiter,
Limit: maxKeysPlusOne(maxKeys, marker != ""), Limit: maxKeysPlusOne(maxKeys, marker != ""),
Marker: marker, Marker: marker,
// When delimiter is set make sure to include delete markers InclDeleted: false,
// necessary to capture proper CommonPrefixes as expected
// in the response as per AWS S3.
InclDeleted: delimiter != "",
AskDisks: globalAPIConfig.getListQuorum(), AskDisks: globalAPIConfig.getListQuorum(),
} }
merged, err := z.listPath(ctx, &opts) merged, err := z.listPath(ctx, &opts)

View File

@ -53,6 +53,11 @@ func (e metaCacheEntry) isObject() bool {
return len(e.metadata) > 0 return len(e.metadata) > 0
} }
// isObjectDir returns if the entry is representing an object__XL_DIR__
func (e metaCacheEntry) isObjectDir() bool {
return len(e.metadata) > 0 && strings.HasSuffix(e.name, slashSeparator)
}
// hasPrefix returns whether an entry has a specific prefix // hasPrefix returns whether an entry has a specific prefix
func (e metaCacheEntry) hasPrefix(s string) bool { func (e metaCacheEntry) hasPrefix(s string) bool {
return strings.HasPrefix(e.name, s) return strings.HasPrefix(e.name, s)

View File

@ -156,7 +156,7 @@ func (o *listPathOptions) gatherResults(ctx context.Context, in <-chan metaCache
resCh = nil resCh = nil
continue continue
} }
if !o.IncludeDirectories && entry.isDir() { if !o.IncludeDirectories && (entry.isDir() || (entry.isObjectDir() && entry.isLatestDeletemarker())) {
continue continue
} }
if o.Marker != "" && entry.name < o.Marker { if o.Marker != "" && entry.name < o.Marker {
@ -168,7 +168,7 @@ func (o *listPathOptions) gatherResults(ctx context.Context, in <-chan metaCache
if !o.Recursive && !entry.isInDir(o.Prefix, o.Separator) { if !o.Recursive && !entry.isInDir(o.Prefix, o.Separator) {
continue continue
} }
if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() { if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() && !entry.isObjectDir() {
continue continue
} }
if o.Limit > 0 && results.len() >= o.Limit { if o.Limit > 0 && results.len() >= o.Limit {
@ -324,13 +324,13 @@ func (r *metacacheReader) filter(o listPathOptions) (entries metaCacheEntriesSor
pastPrefix = true pastPrefix = true
return false return false
} }
if !o.IncludeDirectories && entry.isDir() { if !o.IncludeDirectories && (entry.isDir() || (entry.isObjectDir() && entry.isLatestDeletemarker())) {
return true return true
} }
if !entry.isInDir(o.Prefix, o.Separator) { if !entry.isInDir(o.Prefix, o.Separator) {
return true return true
} }
if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() { if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() && !entry.isObjectDir() {
return entries.len() < o.Limit return entries.len() < o.Limit
} }
entries.o = append(entries.o, entry) entries.o = append(entries.o, entry)

View File

@ -528,10 +528,10 @@ func (r *metacacheReader) readN(n int, inclDeleted, inclDirs bool, prefix string
metaDataPoolPut(meta.metadata) metaDataPoolPut(meta.metadata)
meta.metadata = nil meta.metadata = nil
} }
if !inclDirs && meta.isDir() { if !inclDirs && (meta.isDir() || (meta.isObjectDir() && meta.isLatestDeletemarker())) {
continue continue
} }
if !inclDeleted && meta.isLatestDeletemarker() { if !inclDeleted && meta.isLatestDeletemarker() && meta.isObject() && !meta.isObjectDir() {
continue continue
} }
res = append(res, meta) res = append(res, meta)

View File

@ -42,6 +42,8 @@ func testListObjectsVersionedFolders(obj ObjectLayer, instanceType string, t1 Te
testBuckets := []string{ testBuckets := []string{
// This bucket is used for testing ListObject operations. // This bucket is used for testing ListObject operations.
"test-bucket-folders", "test-bucket-folders",
// This bucket has file delete marker.
"test-bucket-files",
} }
for _, bucket := range testBuckets { for _, bucket := range testBuckets {
err := obj.MakeBucketWithLocation(context.Background(), bucket, BucketOptions{ err := obj.MakeBucketWithLocation(context.Background(), bucket, BucketOptions{
@ -62,6 +64,7 @@ func testListObjectsVersionedFolders(obj ObjectLayer, instanceType string, t1 Te
}{ }{
{testBuckets[0], "unique/folder/", "", nil, true}, {testBuckets[0], "unique/folder/", "", nil, true},
{testBuckets[0], "unique/folder/1.txt", "content", nil, false}, {testBuckets[0], "unique/folder/1.txt", "content", nil, false},
{testBuckets[1], "unique/folder/1.txt", "content", nil, true},
} }
for _, object := range testObjects { for _, object := range testObjects {
md5Bytes := md5.Sum([]byte(object.content)) md5Bytes := md5.Sum([]byte(object.content))
@ -102,6 +105,10 @@ func testListObjectsVersionedFolders(obj ObjectLayer, instanceType string, t1 Te
{Name: "unique/folder/1.txt"}, {Name: "unique/folder/1.txt"},
}, },
}, },
{
IsTruncated: false,
Objects: []ObjectInfo{},
},
} }
testCases := []struct { testCases := []struct {
@ -120,6 +127,7 @@ func testListObjectsVersionedFolders(obj ObjectLayer, instanceType string, t1 Te
{testBuckets[0], "unique/", "", "/", 1000, resultCases[0], nil, true}, {testBuckets[0], "unique/", "", "/", 1000, resultCases[0], nil, true},
{testBuckets[0], "unique/folder", "", "/", 1000, resultCases[0], nil, true}, {testBuckets[0], "unique/folder", "", "/", 1000, resultCases[0], nil, true},
{testBuckets[0], "unique/", "", "", 1000, resultCases[1], nil, true}, {testBuckets[0], "unique/", "", "", 1000, resultCases[1], nil, true},
{testBuckets[1], "unique/folder/", "", "/", 1000, resultCases[2], nil, true},
} }
for i, testCase := range testCases { for i, testCase := range testCases {

View File

@ -0,0 +1,14 @@
# To run locally an OpenLDAP instance using Docker
# $ docker-compose -f ldap.yaml up -d
version: '3.7'
services:
openldap:
image: quay.io/minio/openldap
ports:
- "389:389"
- "636:636"
environment:
LDAP_ORGANIZATION: "MinIO Inc"
LDAP_DOMAIN: "min.io"
LDAP_ADMIN_PASSWORD: "admin"

View File

@ -136,4 +136,47 @@ if [ $? -eq 0 ]; then
exit_1; exit_1;
fi fi
cleanup ./mc mb minio1/newbucket
sleep 5
./mc stat minio2/newbucket
if [ $? -eq 1 ]; then
echo "expecting bucket to be present. exiting.."
exit_1;
fi
./mc stat minio3/newbucket
if [ $? -eq 1 ]; then
echo "expecting bucket to be present. exiting.."
exit_1;
fi
./mc cp README.md minio2/newbucket/
sleep 5
./mc stat minio1/newbucket/README.md
if [ $? -eq 1 ]; then
echo "expecting object to be present. exiting.."
exit_1;
fi
./mc stat minio3/newbucket/README.md
if [ $? -eq 1 ]; then
echo "expecting object to be present. exiting.."
exit_1;
fi
./mc rm minio3/newbucket/README.md
sleep 5
./mc stat minio2/newbucket/README.md
if [ $? -eq 0 ]; then
echo "expected file to be deleted, exiting.."
exit_1;
fi
./mc stat minio1/newbucket/README.md
if [ $? -eq 0 ]; then
echo "expected file to be deleted, exiting.."
exit_1;
fi