Fix listing empty directory in recursive mode (#7613)

After recent listing refactor, recursive list doesn't return empty
directories, this commit will fix the behavior and add unit tests
so it won't happen again.
This commit is contained in:
Anis Elleuch 2019-05-06 15:52:42 +01:00 committed by kannappanr
parent efbc665ad3
commit 08b9244c48
2 changed files with 83 additions and 31 deletions

View File

@ -40,6 +40,8 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
testBuckets := []string{
// This bucket is used for testing ListObject operations.
"test-bucket-list-object",
// This bucket will be tested with empty directories
"test-bucket-empty-dir",
// Will not store any objects in this bucket,
// Its to test ListObjects on an empty bucket.
"empty-bucket",
@ -53,23 +55,27 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
var err error
testObjects := []struct {
parentBucket string
name string
content string
meta map[string]string
}{
{"Asia-maps.png", "asis-maps", map[string]string{"content-type": "image/png"}},
{"Asia/India/India-summer-photos-1", "contentstring", nil},
{"Asia/India/Karnataka/Bangalore/Koramangala/pics", "contentstring", nil},
{"newPrefix0", "newPrefix0", nil},
{"newPrefix1", "newPrefix1", nil},
{"newzen/zen/recurse/again/again/again/pics", "recurse", nil},
{"obj0", "obj0", nil},
{"obj1", "obj1", nil},
{"obj2", "obj2", nil},
{testBuckets[0], "Asia-maps.png", "asis-maps", map[string]string{"content-type": "image/png"}},
{testBuckets[0], "Asia/India/India-summer-photos-1", "contentstring", nil},
{testBuckets[0], "Asia/India/Karnataka/Bangalore/Koramangala/pics", "contentstring", nil},
{testBuckets[0], "newPrefix0", "newPrefix0", nil},
{testBuckets[0], "newPrefix1", "newPrefix1", nil},
{testBuckets[0], "newzen/zen/recurse/again/again/again/pics", "recurse", nil},
{testBuckets[0], "obj0", "obj0", nil},
{testBuckets[0], "obj1", "obj1", nil},
{testBuckets[0], "obj2", "obj2", nil},
{testBuckets[1], "obj1", "obj1", nil},
{testBuckets[1], "obj2", "obj2", nil},
{testBuckets[1], "temporary/0/", "", nil},
}
for _, object := range testObjects {
md5Bytes := md5.Sum([]byte(object.content))
_, err = obj.PutObject(context.Background(), testBuckets[0], object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content),
_, err = obj.PutObject(context.Background(), object.parentBucket, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content),
int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{UserDefined: object.meta})
if err != nil {
t.Fatalf("%s : %s", instanceType, err.Error())
@ -361,6 +367,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
Objects: []ObjectInfo{
{Name: "Asia-maps.png"},
},
Prefixes: []string{"Asia/"},
},
// ListObjectsResult-26.
// prefix = "new" and delimiter is set in the testCase.(testCase 58).
@ -370,6 +377,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
{Name: "newPrefix0"},
{Name: "newPrefix1"},
},
Prefixes: []string{"newzen/"},
},
// ListObjectsResult-27.
// Prefix is set to "Asia/India/" in the testCase, and delimiter is set to forward slash '/' (testCase 59).
@ -378,6 +386,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
Objects: []ObjectInfo{
{Name: "Asia/India/India-summer-photos-1"},
},
Prefixes: []string{"Asia/India/Karnataka/"},
},
// ListObjectsResult-28.
// Marker is set to "Asia/India/India-summer-photos-1" and delimiter set in the testCase, (testCase 60).
@ -390,6 +399,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
{Name: "obj1"},
{Name: "obj2"},
},
Prefixes: []string{"newzen/"},
},
// ListObjectsResult-29.
// Marker is set to "Asia/India/Karnataka/Bangalore/Koramangala/pics" in the testCase and delimeter set, (testCase 61).
@ -402,6 +412,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
{Name: "obj1"},
{Name: "obj2"},
},
Prefixes: []string{"newzen/"},
},
// ListObjectsResult-30.
// Prefix and Delimiter is set to '/', (testCase 62).
@ -409,6 +420,31 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
IsTruncated: false,
Objects: []ObjectInfo{},
},
// ListObjectsResult-31 Empty directory, recursive listing
{
IsTruncated: false,
Objects: []ObjectInfo{
{Name: "obj1"},
{Name: "obj2"},
{Name: "temporary/0/"},
},
},
// ListObjectsResult-32 Empty directory, non recursive listing
{
IsTruncated: false,
Objects: []ObjectInfo{
{Name: "obj1"},
{Name: "obj2"},
},
Prefixes: []string{"temporary/"},
},
// ListObjectsResult-33 Listing empty directory only
{
IsTruncated: false,
Objects: []ObjectInfo{
{Name: "temporary/0/"},
},
},
}
testCases := []struct {
@ -519,9 +555,14 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
{"test-bucket-list-object", "", "Asia/India/Karnataka/Bangalore/Koramangala/pics", "/", 10, resultCases[29], nil, true},
// Test with prefix and delimiter set to '/'. (60)
{"test-bucket-list-object", "/", "", "/", 10, resultCases[30], nil, true},
// Test with invalid prefix (61)
{"test-bucket-list-object", "\\", "", "/", 10, resultCases[30], ObjectNameInvalid{Bucket: "test-bucket-list-object", Object: "\\"}, false},
{"test-bucket-list-object", "\\", "", "/", 10, ListObjectsInfo{}, ObjectNameInvalid{Bucket: "test-bucket-list-object", Object: "\\"}, false},
// Test listing an empty directory in recursive mode (62)
{"test-bucket-empty-dir", "", "", "", 10, resultCases[31], nil, true},
// Test listing an empty directory in a non recursive mode (63)
{"test-bucket-empty-dir", "", "", "/", 10, resultCases[32], nil, true},
// Test listing a directory which contains an empty directory (64)
{"test-bucket-empty-dir", "", "temporary/", "", 10, resultCases[33], nil, true},
}
for i, testCase := range testCases {
@ -563,6 +604,16 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
}
}
if len(testCase.result.Prefixes) != len(result.Prefixes) {
t.Fatalf("Test %d: %s: Expected number of prefixes in the result to be '%d', but found '%d' prefixes instead", i+1, instanceType, len(testCase.result.Prefixes), len(result.Prefixes))
}
for j := 0; j < len(testCase.result.Prefixes); j++ {
if testCase.result.Prefixes[j] != result.Prefixes[j] {
t.Errorf("Test %d: %s: Expected prefix name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.result.Prefixes[j], result.Prefixes[j])
}
}
if testCase.result.IsTruncated != result.IsTruncated {
t.Errorf("Test %d: %s: Expected IsTruncated flag to be %v, but instead found it to be %v", i+1, instanceType, testCase.result.IsTruncated, result.IsTruncated)
}

View File

@ -59,7 +59,7 @@ func filterMatchingPrefix(entries []string, prefixEntry string) []string {
type ListDirFunc func(bucket, prefixDir, prefixEntry string) (entries []string)
// treeWalk walks directory tree recursively pushing TreeWalkResult into the channel as and when it encounters files.
func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir ListDirFunc, resultCh chan TreeWalkResult, endWalkCh chan struct{}, isEnd bool) error {
func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir ListDirFunc, resultCh chan TreeWalkResult, endWalkCh chan struct{}, isEnd bool) (totalNum int, treeErr error) {
// Example:
// if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively
// called with prefixDir="one/two/three/four/" and marker="five.txt"
@ -78,7 +78,7 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
entries := listDir(bucket, prefixDir, entryPrefixMatch)
// For an empty list return right here.
if len(entries) == 0 {
return nil
return 0, nil
}
// example:
@ -90,19 +90,13 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
entries = entries[idx:]
// For an empty list after search through the entries, return right here.
if len(entries) == 0 {
return nil
return 0, nil
}
for i, entry := range entries {
pentry := pathJoin(prefixDir, entry)
isDir := hasSuffix(pentry, slashSeparator)
leaf := !hasSuffix(pentry, slashSeparator)
var leafDir bool
if !leaf {
leafDir = false
}
isDir := !leafDir && !leaf
if i == 0 && markerDir == entry {
if !recursive {
// Skip as the marker would already be listed in the previous listing.
@ -129,24 +123,31 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
// markIsEnd is passed to this entry's treeWalk() so that treeWalker.end can be marked
// true at the end of the treeWalk stream.
markIsEnd := i == len(entries)-1 && isEnd
if err := doTreeWalk(ctx, bucket, pentry, prefixMatch, markerArg, recursive,
listDir, resultCh, endWalkCh, markIsEnd); err != nil {
return err
totalFound, err := doTreeWalk(ctx, bucket, pentry, prefixMatch, markerArg, recursive,
listDir, resultCh, endWalkCh, markIsEnd)
if err != nil {
return 0, err
}
// A nil totalFound means this is an empty directory that
// needs to be sent to the result channel, otherwise continue
// to the next entry.
if totalFound > 0 {
continue
}
}
// EOF is set if we are at last entry and the caller indicated we at the end.
isEOF := ((i == len(entries)-1) && isEnd)
select {
case <-endWalkCh:
return errWalkAbort
return 0, errWalkAbort
case resultCh <- TreeWalkResult{entry: pentry, end: isEOF}:
}
}
// Everything is listed.
return nil
return len(entries), nil
}
// Initiate a new treeWalk in a goroutine.