add more directory marker tests and fix a bug (#13871)

ListObjects() should never list a delete-marked folder
if latest is delete marker and delimiter is not provided.

ListObjectVersions() should list a delete-marked folder
even if latest is delete marker and delimiter is not
provided.

Enhance further versioning listing on the buckets
This commit is contained in:
Harshavardhana 2021-12-09 14:59:23 -08:00 committed by GitHub
parent 84c690cb07
commit 2f1e8ba612
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 161 additions and 73 deletions

View File

@ -1054,6 +1054,7 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre
Marker: marker, Marker: marker,
InclDeleted: true, InclDeleted: true,
AskDisks: globalAPIConfig.getListQuorum(), AskDisks: globalAPIConfig.getListQuorum(),
Versioned: true,
} }
merged, err := z.listPath(ctx, &opts) merged, err := z.listPath(ctx, &opts)

View File

@ -91,6 +91,9 @@ type listPathOptions struct {
// A transient result will never be returned from the cache so knowing the list id is required. // A transient result will never be returned from the cache so knowing the list id is required.
Transient bool Transient bool
// Versioned is this a ListObjectVersions call.
Versioned bool
// pool and set of where the cache is located. // pool and set of where the cache is located.
pool, set int pool, set int
} }
@ -156,7 +159,7 @@ func (o *listPathOptions) gatherResults(ctx context.Context, in <-chan metaCache
resCh = nil resCh = nil
continue continue
} }
if !o.IncludeDirectories && (entry.isDir() || (entry.isObjectDir() && entry.isLatestDeletemarker())) { if !o.IncludeDirectories && (entry.isDir() || (!o.Versioned && entry.isObjectDir() && entry.isLatestDeletemarker())) {
continue continue
} }
if o.Marker != "" && entry.name < o.Marker { if o.Marker != "" && entry.name < o.Marker {
@ -324,7 +327,7 @@ func (r *metacacheReader) filter(o listPathOptions) (entries metaCacheEntriesSor
pastPrefix = true pastPrefix = true
return false return false
} }
if !o.IncludeDirectories && (entry.isDir() || (entry.isObjectDir() && entry.isLatestDeletemarker())) { if !o.IncludeDirectories && (entry.isDir() || (!o.Versioned && entry.isObjectDir() && entry.isLatestDeletemarker())) {
return true return true
} }
if !entry.isInDir(o.Prefix, o.Separator) { if !entry.isInDir(o.Prefix, o.Separator) {
@ -343,7 +346,7 @@ func (r *metacacheReader) filter(o listPathOptions) (entries metaCacheEntriesSor
} }
// We should not need to filter more. // We should not need to filter more.
return r.readN(o.Limit, o.InclDeleted, o.IncludeDirectories, o.Prefix) return r.readN(o.Limit, o.InclDeleted, o.IncludeDirectories, o.Versioned, o.Prefix)
} }
func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) { func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) {

View File

@ -457,7 +457,7 @@ func (r *metacacheReader) forwardTo(s string) error {
// Will return io.EOF if end of stream is reached. // Will return io.EOF if end of stream is reached.
// If requesting 0 objects nil error will always be returned regardless of at end of stream. // If requesting 0 objects nil error will always be returned regardless of at end of stream.
// Use peek to determine if at end of stream. // Use peek to determine if at end of stream.
func (r *metacacheReader) readN(n int, inclDeleted, inclDirs bool, prefix string) (metaCacheEntriesSorted, error) { func (r *metacacheReader) readN(n int, inclDeleted, inclDirs, inclVersions bool, prefix string) (metaCacheEntriesSorted, error) {
r.checkInit() r.checkInit()
if n == 0 { if n == 0 {
return metaCacheEntriesSorted{}, nil return metaCacheEntriesSorted{}, nil
@ -528,7 +528,7 @@ func (r *metacacheReader) readN(n int, inclDeleted, inclDirs bool, prefix string
metaDataPoolPut(meta.metadata) metaDataPoolPut(meta.metadata)
meta.metadata = nil meta.metadata = nil
} }
if !inclDirs && (meta.isDir() || (meta.isObjectDir() && meta.isLatestDeletemarker())) { if !inclDirs && (meta.isDir() || (!inclVersions && meta.isObjectDir() && meta.isLatestDeletemarker())) {
continue continue
} }
if !inclDeleted && meta.isLatestDeletemarker() && meta.isObject() && !meta.isObjectDir() { if !inclDeleted && meta.isLatestDeletemarker() && meta.isObject() && !meta.isObjectDir() {

View File

@ -40,7 +40,7 @@ func loadMetacacheSample(t testing.TB) *metacacheReader {
func loadMetacacheSampleEntries(t testing.TB) metaCacheEntriesSorted { func loadMetacacheSampleEntries(t testing.TB) metaCacheEntriesSorted {
r := loadMetacacheSample(t) r := loadMetacacheSample(t)
defer r.Close() defer r.Close()
entries, err := r.readN(-1, false, true, "") entries, err := r.readN(-1, false, true, false, "")
if err != io.EOF { if err != io.EOF {
t.Fatal(err) t.Fatal(err)
} }
@ -64,7 +64,7 @@ func Test_metacacheReader_readNames(t *testing.T) {
func Test_metacacheReader_readN(t *testing.T) { func Test_metacacheReader_readN(t *testing.T) {
r := loadMetacacheSample(t) r := loadMetacacheSample(t)
defer r.Close() defer r.Close()
entries, err := r.readN(-1, false, true, "") entries, err := r.readN(-1, false, true, false, "")
if err != io.EOF { if err != io.EOF {
t.Fatal(err, entries.len()) t.Fatal(err, entries.len())
} }
@ -79,7 +79,7 @@ func Test_metacacheReader_readN(t *testing.T) {
} }
want = want[:0] want = want[:0]
entries, err = r.readN(0, false, true, "") entries, err = r.readN(0, false, true, false, "")
if err != nil { if err != nil {
t.Fatal(err, entries.len()) t.Fatal(err, entries.len())
} }
@ -90,7 +90,7 @@ func Test_metacacheReader_readN(t *testing.T) {
// Reload. // Reload.
r = loadMetacacheSample(t) r = loadMetacacheSample(t)
defer r.Close() defer r.Close()
entries, err = r.readN(0, false, true, "") entries, err = r.readN(0, false, true, false, "")
if err != nil { if err != nil {
t.Fatal(err, entries.len()) t.Fatal(err, entries.len())
} }
@ -98,7 +98,7 @@ func Test_metacacheReader_readN(t *testing.T) {
t.Fatal("unexpected length:", entries.len(), "want:", len(want)) t.Fatal("unexpected length:", entries.len(), "want:", len(want))
} }
entries, err = r.readN(5, false, true, "") entries, err = r.readN(5, false, true, false, "")
if err != nil { if err != nil {
t.Fatal(err, entries.len()) t.Fatal(err, entries.len())
} }
@ -117,7 +117,7 @@ func Test_metacacheReader_readN(t *testing.T) {
func Test_metacacheReader_readNDirs(t *testing.T) { func Test_metacacheReader_readNDirs(t *testing.T) {
r := loadMetacacheSample(t) r := loadMetacacheSample(t)
defer r.Close() defer r.Close()
entries, err := r.readN(-1, false, true, "") entries, err := r.readN(-1, false, true, false, "")
if err != io.EOF { if err != io.EOF {
t.Fatal(err, entries.len()) t.Fatal(err, entries.len())
} }
@ -138,7 +138,7 @@ func Test_metacacheReader_readNDirs(t *testing.T) {
want = noDirs want = noDirs
r = loadMetacacheSample(t) r = loadMetacacheSample(t)
defer r.Close() defer r.Close()
entries, err = r.readN(-1, false, false, "") entries, err = r.readN(-1, false, false, false, "")
if err != io.EOF { if err != io.EOF {
t.Fatal(err, entries.len()) t.Fatal(err, entries.len())
} }
@ -152,7 +152,7 @@ func Test_metacacheReader_readNDirs(t *testing.T) {
} }
want = want[:0] want = want[:0]
entries, err = r.readN(0, false, false, "") entries, err = r.readN(0, false, false, false, "")
if err != nil { if err != nil {
t.Fatal(err, entries.len()) t.Fatal(err, entries.len())
} }
@ -163,7 +163,7 @@ func Test_metacacheReader_readNDirs(t *testing.T) {
// Reload. // Reload.
r = loadMetacacheSample(t) r = loadMetacacheSample(t)
defer r.Close() defer r.Close()
entries, err = r.readN(0, false, false, "") entries, err = r.readN(0, false, false, false, "")
if err != nil { if err != nil {
t.Fatal(err, entries.len()) t.Fatal(err, entries.len())
} }
@ -171,7 +171,7 @@ func Test_metacacheReader_readNDirs(t *testing.T) {
t.Fatal("unexpected length:", entries.len(), "want:", len(want)) t.Fatal("unexpected length:", entries.len(), "want:", len(want))
} }
entries, err = r.readN(5, false, false, "") entries, err = r.readN(5, false, false, false, "")
if err != nil { if err != nil {
t.Fatal(err, entries.len()) t.Fatal(err, entries.len())
} }
@ -190,7 +190,7 @@ func Test_metacacheReader_readNDirs(t *testing.T) {
func Test_metacacheReader_readNPrefix(t *testing.T) { func Test_metacacheReader_readNPrefix(t *testing.T) {
r := loadMetacacheSample(t) r := loadMetacacheSample(t)
defer r.Close() defer r.Close()
entries, err := r.readN(-1, false, true, "src/compress/bzip2/") entries, err := r.readN(-1, false, true, false, "src/compress/bzip2/")
if err != io.EOF { if err != io.EOF {
t.Fatal(err, entries.len()) t.Fatal(err, entries.len())
} }
@ -206,7 +206,7 @@ func Test_metacacheReader_readNPrefix(t *testing.T) {
r = loadMetacacheSample(t) r = loadMetacacheSample(t)
defer r.Close() defer r.Close()
entries, err = r.readN(-1, false, true, "src/nonexist") entries, err = r.readN(-1, false, true, false, "src/nonexist")
if err != io.EOF { if err != io.EOF {
t.Fatal(err, entries.len()) t.Fatal(err, entries.len())
} }
@ -222,7 +222,7 @@ func Test_metacacheReader_readNPrefix(t *testing.T) {
r = loadMetacacheSample(t) r = loadMetacacheSample(t)
defer r.Close() defer r.Close()
entries, err = r.readN(-1, false, true, "src/a") entries, err = r.readN(-1, false, true, false, "src/a")
if err != io.EOF { if err != io.EOF {
t.Fatal(err, entries.len()) t.Fatal(err, entries.len())
} }
@ -238,7 +238,7 @@ func Test_metacacheReader_readNPrefix(t *testing.T) {
r = loadMetacacheSample(t) r = loadMetacacheSample(t)
defer r.Close() defer r.Close()
entries, err = r.readN(-1, false, true, "src/compress/zlib/e") entries, err = r.readN(-1, false, true, false, "src/compress/zlib/e")
if err != io.EOF { if err != io.EOF {
t.Fatal(err, entries.len()) t.Fatal(err, entries.len())
} }

View File

@ -93,8 +93,6 @@ func testListObjectsVersionedFolders(obj ObjectLayer, instanceType string, t1 Te
// This will be used in testCases and used for asserting the correctness of ListObjects output in the tests. // This will be used in testCases and used for asserting the correctness of ListObjects output in the tests.
resultCases := []ListObjectsInfo{ resultCases := []ListObjectsInfo{
// ListObjectsResult-0.
// Testing list only the top-level folder.
{ {
IsTruncated: false, IsTruncated: false,
Prefixes: []string{"unique/folder/"}, Prefixes: []string{"unique/folder/"},
@ -111,32 +109,70 @@ func testListObjectsVersionedFolders(obj ObjectLayer, instanceType string, t1 Te
}, },
} }
resultCasesV := []ListObjectVersionsInfo{
{
IsTruncated: false,
Prefixes: []string{"unique/folder/"},
},
{
IsTruncated: false,
Objects: []ObjectInfo{
{
Name: "unique/folder/",
DeleteMarker: true,
},
{
Name: "unique/folder/",
DeleteMarker: false,
},
{
Name: "unique/folder/1.txt",
DeleteMarker: false,
},
},
},
}
testCases := []struct { testCases := []struct {
// Inputs to ListObjects. // Inputs to ListObjects.
bucketName string bucketName string
prefix string prefix string
marker string marker string
delimiter string delimiter string
maxKeys int32 maxKeys int
versioned bool
// Expected output of ListObjects. // Expected output of ListObjects.
result ListObjectsInfo resultL ListObjectsInfo
err error resultV ListObjectVersionsInfo
err error
// Flag indicating whether the test is expected to pass or not. // Flag indicating whether the test is expected to pass or not.
shouldPass bool shouldPass bool
}{ }{
{testBuckets[0], "unique/", "", "/", 1000, resultCases[0], nil, true}, {testBuckets[0], "unique/", "", "/", 1000, false, resultCases[0], ListObjectVersionsInfo{}, nil, true},
{testBuckets[0], "unique/folder", "", "/", 1000, resultCases[0], nil, true}, {testBuckets[0], "unique/folder", "", "/", 1000, false, resultCases[0], ListObjectVersionsInfo{}, nil, true},
{testBuckets[0], "unique/", "", "", 1000, resultCases[1], nil, true}, {testBuckets[0], "unique/", "", "", 1000, false, resultCases[1], ListObjectVersionsInfo{}, nil, true},
{testBuckets[1], "unique/", "", "/", 1000, resultCases[0], nil, true}, {testBuckets[1], "unique/", "", "/", 1000, false, resultCases[0], ListObjectVersionsInfo{}, nil, true},
{testBuckets[1], "unique/folder/", "", "/", 1000, resultCases[2], nil, true}, {testBuckets[1], "unique/folder/", "", "/", 1000, false, resultCases[2], ListObjectVersionsInfo{}, nil, true},
{testBuckets[0], "unique/", "", "/", 1000, true, ListObjectsInfo{}, resultCasesV[0], nil, true},
{testBuckets[0], "unique/", "", "", 1000, true, ListObjectsInfo{}, resultCasesV[1], nil, true},
} }
for i, testCase := range testCases { for i, testCase := range testCases {
testCase := testCase testCase := testCase
t.Run(fmt.Sprintf("%s-Test%d", instanceType, i+1), func(t *testing.T) { t.Run(fmt.Sprintf("%s-Test%d", instanceType, i+1), func(t *testing.T) {
t.Log("ListObjects, bucket:", testCase.bucketName, "prefix:", testCase.prefix, "marker:", testCase.marker, "delimiter:", testCase.delimiter, "maxkeys:", testCase.maxKeys) t.Log("ListObjects, bucket:", testCase.bucketName, "prefix:",
result, err := obj.ListObjects(context.Background(), testCase.bucketName, testCase.prefix, "marker:", testCase.marker, "delimiter:",
testCase.prefix, testCase.marker, testCase.delimiter, int(testCase.maxKeys)) testCase.delimiter, "maxkeys:", testCase.maxKeys)
var err error
var resultL ListObjectsInfo
var resultV ListObjectVersionsInfo
if testCase.versioned {
resultV, err = obj.ListObjectVersions(context.Background(), testCase.bucketName,
testCase.prefix, testCase.marker, "", testCase.delimiter, testCase.maxKeys)
} else {
resultL, err = obj.ListObjects(context.Background(), testCase.bucketName,
testCase.prefix, testCase.marker, testCase.delimiter, testCase.maxKeys)
}
if err != nil && testCase.shouldPass { if err != nil && testCase.shouldPass {
t.Errorf("Test %d: %s: Expected to pass, but failed with: <ERROR> %s", i+1, instanceType, err.Error()) t.Errorf("Test %d: %s: Expected to pass, but failed with: <ERROR> %s", i+1, instanceType, err.Error())
} }
@ -158,53 +194,101 @@ func testListObjectsVersionedFolders(obj ObjectLayer, instanceType string, t1 Te
// and in the output. On failure calling t.Fatalf, // and in the output. On failure calling t.Fatalf,
// otherwise it may lead to index out of range error in // otherwise it may lead to index out of range error in
// assertion following this. // assertion following this.
if len(testCase.result.Objects) != len(result.Objects) { if !testCase.versioned {
t.Logf("want: %v", objInfoNames(testCase.result.Objects)) if len(testCase.resultL.Objects) != len(resultL.Objects) {
t.Logf("got: %v", objInfoNames(result.Objects)) t.Logf("want: %v", objInfoNames(testCase.resultL.Objects))
t.Errorf("Test %d: %s: Expected number of object in the result to be '%d', but found '%d' objects instead", i+1, instanceType, len(testCase.result.Objects), len(result.Objects)) t.Logf("got: %v", objInfoNames(resultL.Objects))
} t.Errorf("Test %d: %s: Expected number of object in the result to be '%d', but found '%d' objects instead", i+1, instanceType, len(testCase.resultL.Objects), len(resultL.Objects))
for j := 0; j < len(testCase.result.Objects); j++ {
if j >= len(result.Objects) {
t.Errorf("Test %d: %s: Expected object name to be \"%s\", but not nothing instead", i+1, instanceType, testCase.result.Objects[j].Name)
continue
} }
if testCase.result.Objects[j].Name != result.Objects[j].Name { for j := 0; j < len(testCase.resultL.Objects); j++ {
t.Errorf("Test %d: %s: Expected object name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.result.Objects[j].Name, result.Objects[j].Name) if j >= len(resultL.Objects) {
t.Errorf("Test %d: %s: Expected object name to be \"%s\", but not nothing instead", i+1, instanceType, testCase.resultL.Objects[j].Name)
continue
}
if testCase.resultL.Objects[j].Name != resultL.Objects[j].Name {
t.Errorf("Test %d: %s: Expected object name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.resultL.Objects[j].Name, resultL.Objects[j].Name)
}
} }
}
if len(testCase.result.Prefixes) != len(result.Prefixes) { if len(testCase.resultL.Prefixes) != len(resultL.Prefixes) {
t.Logf("want: %v", testCase.result.Prefixes) t.Logf("want: %v", testCase.resultL.Prefixes)
t.Logf("got: %v", result.Prefixes) t.Logf("got: %v", resultL.Prefixes)
t.Errorf("Test %d: %s: Expected number of prefixes in the result to be '%d', but found '%d' prefixes instead", i+1, instanceType, len(testCase.result.Prefixes), len(result.Prefixes)) t.Errorf("Test %d: %s: Expected number of prefixes in the result to be '%d', but found '%d' prefixes instead", i+1, instanceType, len(testCase.resultL.Prefixes), len(resultL.Prefixes))
}
for j := 0; j < len(testCase.result.Prefixes); j++ {
if j >= len(result.Prefixes) {
t.Errorf("Test %d: %s: Expected prefix name to be \"%s\", but found no result", i+1, instanceType, testCase.result.Prefixes[j])
continue
} }
if testCase.result.Prefixes[j] != result.Prefixes[j] { for j := 0; j < len(testCase.resultL.Prefixes); j++ {
t.Errorf("Test %d: %s: Expected prefix name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.result.Prefixes[j], result.Prefixes[j]) if j >= len(resultL.Prefixes) {
t.Errorf("Test %d: %s: Expected prefix name to be \"%s\", but found no result", i+1, instanceType, testCase.resultL.Prefixes[j])
continue
}
if testCase.resultL.Prefixes[j] != resultL.Prefixes[j] {
t.Errorf("Test %d: %s: Expected prefix name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.resultL.Prefixes[j], resultL.Prefixes[j])
}
}
if testCase.resultL.IsTruncated != resultL.IsTruncated {
// Allow an extra continuation token.
if !resultL.IsTruncated || len(resultL.Objects) == 0 {
t.Errorf("Test %d: %s: Expected IsTruncated flag to be %v, but instead found it to be %v", i+1, instanceType, testCase.resultL.IsTruncated, resultL.IsTruncated)
}
}
if testCase.resultL.IsTruncated && resultL.NextMarker == "" {
t.Errorf("Test %d: %s: Expected NextMarker to contain a string since listing is truncated, but instead found it to be empty", i+1, instanceType)
}
if !testCase.resultL.IsTruncated && resultL.NextMarker != "" {
if !resultL.IsTruncated || len(resultL.Objects) == 0 {
t.Errorf("Test %d: %s: Expected NextMarker to be empty since listing is not truncated, but instead found `%v`", i+1, instanceType, resultL.NextMarker)
}
}
} else {
if len(testCase.resultV.Objects) != len(resultV.Objects) {
t.Logf("want: %v", objInfoNames(testCase.resultV.Objects))
t.Logf("got: %v", objInfoNames(resultV.Objects))
t.Errorf("Test %d: %s: Expected number of object in the result to be '%d', but found '%d' objects instead", i+1, instanceType, len(testCase.resultV.Objects), len(resultV.Objects))
}
for j := 0; j < len(testCase.resultV.Objects); j++ {
if j >= len(resultV.Objects) {
t.Errorf("Test %d: %s: Expected object name to be \"%s\", but not nothing instead", i+1, instanceType, testCase.resultV.Objects[j].Name)
continue
}
if testCase.resultV.Objects[j].Name != resultV.Objects[j].Name {
t.Errorf("Test %d: %s: Expected object name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.resultV.Objects[j].Name, resultV.Objects[j].Name)
}
}
if len(testCase.resultV.Prefixes) != len(resultV.Prefixes) {
t.Logf("want: %v", testCase.resultV.Prefixes)
t.Logf("got: %v", resultV.Prefixes)
t.Errorf("Test %d: %s: Expected number of prefixes in the result to be '%d', but found '%d' prefixes instead", i+1, instanceType, len(testCase.resultV.Prefixes), len(resultV.Prefixes))
}
for j := 0; j < len(testCase.resultV.Prefixes); j++ {
if j >= len(resultV.Prefixes) {
t.Errorf("Test %d: %s: Expected prefix name to be \"%s\", but found no result", i+1, instanceType, testCase.resultV.Prefixes[j])
continue
}
if testCase.resultV.Prefixes[j] != resultV.Prefixes[j] {
t.Errorf("Test %d: %s: Expected prefix name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.resultV.Prefixes[j], resultV.Prefixes[j])
}
}
if testCase.resultV.IsTruncated != resultV.IsTruncated {
// Allow an extra continuation token.
if !resultV.IsTruncated || len(resultV.Objects) == 0 {
t.Errorf("Test %d: %s: Expected IsTruncated flag to be %v, but instead found it to be %v", i+1, instanceType, testCase.resultV.IsTruncated, resultV.IsTruncated)
}
}
if testCase.resultV.IsTruncated && resultV.NextMarker == "" {
t.Errorf("Test %d: %s: Expected NextMarker to contain a string since listing is truncated, but instead found it to be empty", i+1, instanceType)
}
if !testCase.resultV.IsTruncated && resultV.NextMarker != "" {
if !resultV.IsTruncated || len(resultV.Objects) == 0 {
t.Errorf("Test %d: %s: Expected NextMarker to be empty since listing is not truncated, but instead found `%v`", i+1, instanceType, resultV.NextMarker)
}
} }
} }
if testCase.result.IsTruncated != result.IsTruncated {
// Allow an extra continuation token.
if !result.IsTruncated || len(result.Objects) == 0 {
t.Errorf("Test %d: %s: Expected IsTruncated flag to be %v, but instead found it to be %v", i+1, instanceType, testCase.result.IsTruncated, result.IsTruncated)
}
}
if testCase.result.IsTruncated && result.NextMarker == "" {
t.Errorf("Test %d: %s: Expected NextMarker to contain a string since listing is truncated, but instead found it to be empty", i+1, instanceType)
}
if !testCase.result.IsTruncated && result.NextMarker != "" {
if !result.IsTruncated || len(result.Objects) == 0 {
t.Errorf("Test %d: %s: Expected NextMarker to be empty since listing is not truncated, but instead found `%v`", i+1, instanceType, result.NextMarker)
}
}
} }
}) })
} }