From 6d5f2a43910b722b43c76f6a896e731978a17118 Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Tue, 8 May 2018 19:08:21 -0700 Subject: [PATCH] Better support of empty directories (#5890) Better support of HEAD and listing of zero sized objects with trailing slash (a.k.a empty directory). For that, isLeafDir function is added to indicate if the specified object is an empty directory or not. Each backend (xl, fs) has the responsibility to store that information. Currently, in both of XL & FS, an empty directory is represented by an empty directory in the backend. isLeafDir() checks if the given path is an empty directory or not, since dir listing is costly if the latter contains too many objects, readDirN() is added in this PR to list only N number of entries. In isLeadDir(), we will only list one entry to check if a directory is empty or not. --- cmd/disk-cache.go | 12 +++++- cmd/fs-v1.go | 26 +++++++++++- cmd/naughty-disk_test.go | 4 +- cmd/object-api-common.go | 2 +- cmd/posix-list-dir-nix.go | 19 ++++++++- cmd/posix-list-dir-others.go | 35 +++++++++++++--- cmd/posix-list-dir_test.go | 38 +++++++++++++++++ cmd/posix.go | 9 +++- cmd/posix_test.go | 2 +- cmd/storage-interface.go | 2 +- cmd/storage-rpc-client.go | 7 ++-- cmd/storage-rpc-client_test.go | 4 +- cmd/storage-rpc-server-datatypes.go | 3 ++ cmd/storage-rpc-server.go | 2 +- cmd/tree-walk.go | 32 +++++++++++---- cmd/tree-walk_test.go | 64 ++++++++++++++++++++++++----- cmd/xl-sets.go | 18 ++++---- cmd/xl-v1-common.go | 26 ++++++++++++ cmd/xl-v1-healing.go | 2 +- cmd/xl-v1-list-objects.go | 7 ++-- cmd/xl-v1-multipart.go | 6 +-- cmd/xl-v1-object.go | 3 ++ 22 files changed, 268 insertions(+), 55 deletions(-) diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index fe575d805..d3f4e584f 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -357,8 +357,16 @@ func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, mark return err == nil } + isLeafDir := func(bucket, object string) bool { + fs, err := c.cache.getCacheFS(ctx, bucket, object) + if err != nil { + return false + } + return fs.isObjectDir(bucket, object) + } + listDir := listDirCacheFactory(isLeaf, cacheTreeWalkIgnoredErrs, c.cache.cfs) - walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh) + walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh) } for i := 0; i < maxKeys; { @@ -417,7 +425,7 @@ func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, mark result = ListObjectsInfo{IsTruncated: !eof} for _, objInfo := range objInfos { result.NextMarker = objInfo.Name - if objInfo.IsDir { + if objInfo.IsDir && delimiter == slashSeparator { result.Prefixes = append(result.Prefixes, objInfo.Name) continue } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index b4527b6f0..79ea9ce75 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -627,6 +627,10 @@ func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object s return oi, toObjectErr(err, bucket) } + if strings.HasSuffix(object, slashSeparator) && !fs.isObjectDir(bucket, object) { + return oi, errFileNotFound + } + return fs.getObjectInfo(ctx, bucket, object) } @@ -890,6 +894,17 @@ func (fs *FSObjects) listDirFactory(isLeaf isLeafFunc) listDirFunc { return listDir } +// isObjectDir returns true if the specified bucket & prefix exists +// and the prefix represents an empty directory. An S3 empty directory +// is also an empty directory in the FS backend. +func (fs *FSObjects) isObjectDir(bucket, prefix string) bool { + entries, err := readDirN(pathJoin(fs.fsPath, bucket, prefix), 1) + if err != nil { + return false + } + return len(entries) == 0 +} + // getObjectETag is a helper function, which returns only the md5sum // of the file on the disk. func (fs *FSObjects) getObjectETag(ctx context.Context, bucket, entry string, lock bool) (string, error) { @@ -1019,8 +1034,15 @@ func (fs *FSObjects) ListObjects(ctx context.Context, bucket, prefix, marker, de // object string does not end with "/". return !hasSuffix(object, slashSeparator) } + // Return true if the specified object is an empty directory + isLeafDir := func(bucket, object string) bool { + if !hasSuffix(object, slashSeparator) { + return false + } + return fs.isObjectDir(bucket, object) + } listDir := fs.listDirFactory(isLeaf) - walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh) + walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh) } var objInfos []ObjectInfo @@ -1065,7 +1087,7 @@ func (fs *FSObjects) ListObjects(ctx context.Context, bucket, prefix, marker, de result := ListObjectsInfo{IsTruncated: !eof} for _, objInfo := range objInfos { result.NextMarker = objInfo.Name - if objInfo.IsDir { + if objInfo.IsDir && delimiter == slashSeparator { result.Prefixes = append(result.Prefixes, objInfo.Name) continue } diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 7a6f5b142..774edd7ea 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -108,11 +108,11 @@ func (d *naughtyDisk) DeleteVol(volume string) (err error) { return d.disk.DeleteVol(volume) } -func (d *naughtyDisk) ListDir(volume, path string) (entries []string, err error) { +func (d *naughtyDisk) ListDir(volume, path string, count int) (entries []string, err error) { if err := d.calcError(); err != nil { return []string{}, err } - return d.disk.ListDir(volume, path) + return d.disk.ListDir(volume, path, count) } func (d *naughtyDisk) ReadFile(volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) { diff --git a/cmd/object-api-common.go b/cmd/object-api-common.go index 81e1f0e02..142fd9739 100644 --- a/cmd/object-api-common.go +++ b/cmd/object-api-common.go @@ -116,7 +116,7 @@ func cleanupDir(ctx context.Context, storage StorageAPI, volume, dirPath string) } // If it's a directory, list and call delFunc() for each entry. - entries, err := storage.ListDir(volume, entryPath) + entries, err := storage.ListDir(volume, entryPath, -1) // If entryPath prefix never existed, safe to ignore. if err == errFileNotFound { return nil diff --git a/cmd/posix-list-dir-nix.go b/cmd/posix-list-dir-nix.go index fd00c8dc2..d896decbd 100644 --- a/cmd/posix-list-dir-nix.go +++ b/cmd/posix-list-dir-nix.go @@ -112,6 +112,12 @@ var readDirBufPool = sync.Pool{ // Return all the entries at the directory dirPath. func readDir(dirPath string) (entries []string, err error) { + return readDirN(dirPath, -1) +} + +// Return count entries at the directory dirPath and all entries +// if count is set to -1 +func readDirN(dirPath string, count int) (entries []string, err error) { bufp := readDirBufPool.Get().(*[]byte) buf := *bufp defer readDirBufPool.Put(bufp) @@ -135,7 +141,11 @@ func readDir(dirPath string) (entries []string, err error) { defer d.Close() fd := int(d.Fd()) - for { + + remaining := count + done := false + + for !done { nbuf, err := syscall.ReadDirent(fd, buf) if err != nil { return nil, err @@ -147,6 +157,13 @@ func readDir(dirPath string) (entries []string, err error) { if tmpEntries, err = parseDirents(dirPath, buf[:nbuf]); err != nil { return nil, err } + if count > 0 { + if remaining <= len(tmpEntries) { + tmpEntries = tmpEntries[:remaining] + done = true + } + remaining -= len(tmpEntries) + } entries = append(entries, tmpEntries...) } return diff --git a/cmd/posix-list-dir-others.go b/cmd/posix-list-dir-others.go index 4cadd79ff..fbc3ed930 100644 --- a/cmd/posix-list-dir-others.go +++ b/cmd/posix-list-dir-others.go @@ -30,7 +30,12 @@ import ( // Return all the entries at the directory dirPath. func readDir(dirPath string) (entries []string, err error) { - d, err := os.Open((dirPath)) + return readDirN(dirPath, -1) +} + +// Return N entries at the directory dirPath. If count is -1, return all entries +func readDirN(dirPath string, count int) (entries []string, err error) { + d, err := os.Open(dirPath) if err != nil { // File is really not found. if os.IsNotExist(err) { @@ -45,20 +50,34 @@ func readDir(dirPath string) (entries []string, err error) { } defer d.Close() - for { - // Read 1000 entries. - fis, err := d.Readdir(1000) + maxEntries := 1000 + if count > 0 && count < maxEntries { + maxEntries = count + } + + done := false + remaining := count + + for !done { + // Read up to max number of entries. + fis, err := d.Readdir(maxEntries) if err != nil { if err == io.EOF { break } return nil, err } + if count > 0 { + if remaining <= len(fis) { + fis = fis[:remaining] + done = true + } + } for _, fi := range fis { // Stat symbolic link and follow to get the final value. if fi.Mode()&os.ModeSymlink == os.ModeSymlink { var st os.FileInfo - st, err = os.Stat((path.Join(dirPath, fi.Name()))) + st, err = os.Stat(path.Join(dirPath, fi.Name())) if err != nil { reqInfo := (&logger.ReqInfo{}).AppendTags("path", path.Join(dirPath, fi.Name())) ctx := logger.SetReqInfo(context.Background(), reqInfo) @@ -71,6 +90,9 @@ func readDir(dirPath string) (entries []string, err error) { } else if st.Mode().IsRegular() { entries = append(entries, fi.Name()) } + if count > 0 { + remaining-- + } continue } if fi.Mode().IsDir() { @@ -79,6 +101,9 @@ func readDir(dirPath string) (entries []string, err error) { } else if fi.Mode().IsRegular() { entries = append(entries, fi.Name()) } + if count > 0 { + remaining-- + } } } return entries, nil diff --git a/cmd/posix-list-dir_test.go b/cmd/posix-list-dir_test.go index 30d85b862..a704d2894 100644 --- a/cmd/posix-list-dir_test.go +++ b/cmd/posix-list-dir_test.go @@ -201,3 +201,41 @@ func TestReadDir(t *testing.T) { } } } + +func TestReadDirN(t *testing.T) { + testCases := []struct { + numFiles int + n int + expectedNum int + }{ + {0, 0, 0}, + {0, 1, 0}, + {1, 0, 1}, + {0, -1, 0}, + {1, -1, 1}, + {10, -1, 10}, + {1, 1, 1}, + {2, 1, 1}, + {10, 9, 9}, + {10, 10, 10}, + {10, 11, 10}, + } + + for i, testCase := range testCases { + dir := mustSetupDir(t) + defer os.RemoveAll(dir) + + for c := 1; c <= testCase.numFiles; c++ { + if err := ioutil.WriteFile(filepath.Join(dir, fmt.Sprintf("%d", c)), []byte{}, os.ModePerm); err != nil { + t.Fatalf("Unable to create a file, %s", err) + } + } + entries, err := readDirN(dir, testCase.n) + if err != nil { + t.Fatalf("Unable to read entries, %s", err) + } + if len(entries) != testCase.expectedNum { + t.Fatalf("Test %d: unexpected number of entries, waiting for %d, but found %d", i+1, testCase.expectedNum, len(entries)) + } + } +} diff --git a/cmd/posix.go b/cmd/posix.go index 650fc0841..463e74be1 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -467,7 +467,7 @@ func (s *posix) DeleteVol(volume string) (err error) { // ListDir - return all the entries at the given directory path. // If an entry is a directory it will be returned with a trailing "/". -func (s *posix) ListDir(volume, dirPath string) (entries []string, err error) { +func (s *posix) ListDir(volume, dirPath string, count int) (entries []string, err error) { defer func() { if err == syscall.EIO { atomic.AddInt32(&s.ioErrCount, 1) @@ -495,7 +495,12 @@ func (s *posix) ListDir(volume, dirPath string) (entries []string, err error) { } return nil, err } - return readDir(pathJoin(volumeDir, dirPath)) + + dirPath = pathJoin(volumeDir, dirPath) + if count > 0 { + return readDirN(dirPath, count) + } + return readDir(dirPath) } // ReadAll reads from r until an error or EOF and returns the data it read. diff --git a/cmd/posix_test.go b/cmd/posix_test.go index b89178a12..54aff5077 100644 --- a/cmd/posix_test.go +++ b/cmd/posix_test.go @@ -785,7 +785,7 @@ func TestPosixPosixListDir(t *testing.T) { } else { t.Errorf("Expected the StorageAPI to be of type *posix") } - dirList, err = posixStorage.ListDir(testCase.srcVol, testCase.srcPath) + dirList, err = posixStorage.ListDir(testCase.srcVol, testCase.srcPath, -1) if err != testCase.expectedErr { t.Fatalf("TestPosix case %d: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, err) } diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index d6ed13ab9..575932f90 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -39,7 +39,7 @@ type StorageAPI interface { DeleteVol(volume string) (err error) // File operations. - ListDir(volume, dirPath string) ([]string, error) + ListDir(volume, dirPath string, count int) ([]string, error) ReadFile(volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) PrepareFile(volume string, path string, len int64) (err error) AppendFile(volume string, path string, buf []byte) (err error) diff --git a/cmd/storage-rpc-client.go b/cmd/storage-rpc-client.go index 8454671ae..7d9216d0c 100644 --- a/cmd/storage-rpc-client.go +++ b/cmd/storage-rpc-client.go @@ -283,10 +283,11 @@ func (n *networkStorage) ReadFile(volume string, path string, offset int64, buff } // ListDir - list all entries at prefix. -func (n *networkStorage) ListDir(volume, path string) (entries []string, err error) { +func (n *networkStorage) ListDir(volume, path string, count int) (entries []string, err error) { if err = n.call("Storage.ListDirHandler", &ListDirArgs{ - Vol: volume, - Path: path, + Vol: volume, + Path: path, + Count: count, }, &entries); err != nil { return nil, err } diff --git a/cmd/storage-rpc-client_test.go b/cmd/storage-rpc-client_test.go index b762ae899..c15bb8302 100644 --- a/cmd/storage-rpc-client_test.go +++ b/cmd/storage-rpc-client_test.go @@ -435,7 +435,7 @@ func (s *TestRPCStorageSuite) testRPCStorageListDir(t *testing.T) { t.Error("Unable to initiate MakeVol", err) } } - dirs, err := storageDisk.ListDir("myvol", "") + dirs, err := storageDisk.ListDir("myvol", "", -1) if err != nil { t.Error(err) } @@ -448,7 +448,7 @@ func (s *TestRPCStorageSuite) testRPCStorageListDir(t *testing.T) { t.Error("Unable to initiate DeleteVol", err) } } - dirs, err = storageDisk.ListDir("myvol", "") + dirs, err = storageDisk.ListDir("myvol", "", -1) if err != nil { t.Error(err) } diff --git a/cmd/storage-rpc-server-datatypes.go b/cmd/storage-rpc-server-datatypes.go index b417e3da1..9db35d2fa 100644 --- a/cmd/storage-rpc-server-datatypes.go +++ b/cmd/storage-rpc-server-datatypes.go @@ -134,6 +134,9 @@ type ListDirArgs struct { // Name of the path. Path string + + // Number of wanted results + Count int } // RenameFileArgs represents rename file RPC arguments. diff --git a/cmd/storage-rpc-server.go b/cmd/storage-rpc-server.go index f29782c1d..1cf9b4c51 100644 --- a/cmd/storage-rpc-server.go +++ b/cmd/storage-rpc-server.go @@ -120,7 +120,7 @@ func (s *storageServer) ListDirHandler(args *ListDirArgs, reply *[]string) error return err } - entries, err := s.storage.ListDir(args.Vol, args.Path) + entries, err := s.storage.ListDir(args.Vol, args.Path, args.Count) if err != nil { return err } diff --git a/cmd/tree-walk.go b/cmd/tree-walk.go index ebdffdde9..5e3c003de 100644 --- a/cmd/tree-walk.go +++ b/cmd/tree-walk.go @@ -98,6 +98,9 @@ type listDirFunc func(bucket, prefixDir, prefixEntry string) (entries []string, // 4. XL backend multipart listing - isLeaf is true if the entry is a directory and contains uploads.json type isLeafFunc func(string, string) bool +// A function isLeafDir of type isLeafDirFunc is used to detect if an entry represents an empty directory. +type isLeafDirFunc func(string, string) bool + func filterListEntries(bucket, prefixDir string, entries []string, prefixEntry string, isLeaf isLeafFunc) ([]string, bool) { // Listing needs to be sorted. sort.Strings(entries) @@ -125,7 +128,7 @@ func filterListEntries(bucket, prefixDir string, entries []string, prefixEntry s } // treeWalk walks directory tree recursively pushing treeWalkResult into the channel as and when it encounters files. -func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir listDirFunc, isLeaf isLeafFunc, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error { +func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir listDirFunc, isLeaf isLeafFunc, isLeafDir isLeafDirFunc, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error { // Example: // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively // called with prefixDir="one/two/three/four/" and marker="five.txt" @@ -167,17 +170,30 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker return nil } for i, entry := range entries { + var leaf, leafDir bool + // Decision to do isLeaf check was pushed from listDir() to here. - if delayIsLeaf && isLeaf(bucket, pathJoin(prefixDir, entry)) { - entry = strings.TrimSuffix(entry, slashSeparator) + if delayIsLeaf { + leaf = isLeaf(bucket, pathJoin(prefixDir, entry)) + if leaf { + entry = strings.TrimSuffix(entry, slashSeparator) + } + } else { + leaf = !strings.HasSuffix(entry, slashSeparator) } + if strings.HasSuffix(entry, slashSeparator) { + leafDir = isLeafDir(bucket, pathJoin(prefixDir, entry)) + } + + isDir := !leafDir && !leaf + if i == 0 && markerDir == entry { if !recursive { // Skip as the marker would already be listed in the previous listing. continue } - if recursive && !hasSuffix(entry, slashSeparator) { + if recursive && !isDir { // We should not skip for recursive listing and if markerDir is a directory // for ex. if marker is "four/five.txt" markerDir will be "four/" which // should not be skipped, instead it will need to be treeWalk()'ed into. @@ -186,7 +202,7 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker continue } } - if recursive && hasSuffix(entry, slashSeparator) { + if recursive && isDir { // If the entry is a directory, we will need recurse into it. markerArg := "" if entry == markerDir { @@ -198,7 +214,7 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker // markIsEnd is passed to this entry's treeWalk() so that treeWalker.end can be marked // true at the end of the treeWalk stream. markIsEnd := i == len(entries)-1 && isEnd - if tErr := doTreeWalk(ctx, bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, listDir, isLeaf, resultCh, endWalkCh, markIsEnd); tErr != nil { + if tErr := doTreeWalk(ctx, bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, listDir, isLeaf, isLeafDir, resultCh, endWalkCh, markIsEnd); tErr != nil { return tErr } continue @@ -218,7 +234,7 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker } // Initiate a new treeWalk in a goroutine. -func startTreeWalk(ctx context.Context, bucket, prefix, marker string, recursive bool, listDir listDirFunc, isLeaf isLeafFunc, endWalkCh chan struct{}) chan treeWalkResult { +func startTreeWalk(ctx context.Context, bucket, prefix, marker string, recursive bool, listDir listDirFunc, isLeaf isLeafFunc, isLeafDir isLeafDirFunc, endWalkCh chan struct{}) chan treeWalkResult { // Example 1 // If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt" // treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt" @@ -240,7 +256,7 @@ func startTreeWalk(ctx context.Context, bucket, prefix, marker string, recursive marker = strings.TrimPrefix(marker, prefixDir) go func() { isEnd := true // Indication to start walking the tree with end as true. - doTreeWalk(ctx, bucket, prefixDir, entryPrefixMatch, marker, recursive, listDir, isLeaf, resultCh, endWalkCh, isEnd) + doTreeWalk(ctx, bucket, prefixDir, entryPrefixMatch, marker, recursive, listDir, isLeaf, isLeafDir, resultCh, endWalkCh, isEnd) close(resultCh) }() return resultCh diff --git a/cmd/tree-walk_test.go b/cmd/tree-walk_test.go index e1626ca2b..8340e853e 100644 --- a/cmd/tree-walk_test.go +++ b/cmd/tree-walk_test.go @@ -128,11 +128,11 @@ func createNamespace(disk StorageAPI, volume string, files []string) error { // Test if tree walker returns entries matching prefix alone are received // when a non empty prefix is supplied. -func testTreeWalkPrefix(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc) { +func testTreeWalkPrefix(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc, isLeafDir isLeafDirFunc) { // Start the tree walk go-routine. prefix := "d/" endWalkCh := make(chan struct{}) - twResultCh := startTreeWalk(context.Background(), volume, prefix, "", true, listDir, isLeaf, endWalkCh) + twResultCh := startTreeWalk(context.Background(), volume, prefix, "", true, listDir, isLeaf, isLeafDir, endWalkCh) // Check if all entries received on the channel match the prefix. for res := range twResultCh { @@ -143,11 +143,11 @@ func testTreeWalkPrefix(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc) { } // Test if entries received on tree walk's channel appear after the supplied marker. -func testTreeWalkMarker(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc) { +func testTreeWalkMarker(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc, isLeafDir isLeafDirFunc) { // Start the tree walk go-routine. prefix := "" endWalkCh := make(chan struct{}) - twResultCh := startTreeWalk(context.Background(), volume, prefix, "d/g", true, listDir, isLeaf, endWalkCh) + twResultCh := startTreeWalk(context.Background(), volume, prefix, "d/g", true, listDir, isLeaf, isLeafDir, endWalkCh) // Check if only 3 entries, namely d/g/h, i/j/k, lmn are received on the channel. expectedCount := 3 @@ -187,11 +187,20 @@ func TestTreeWalk(t *testing.T) { isLeaf := func(volume, prefix string) bool { return !hasSuffix(prefix, slashSeparator) } + + isLeafDir := func(volume, prefix string) bool { + entries, listErr := disk.ListDir(volume, prefix, 1) + if listErr != nil { + return false + } + return len(entries) == 0 + } + listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk) // Simple test for prefix based walk. - testTreeWalkPrefix(t, listDir, isLeaf) + testTreeWalkPrefix(t, listDir, isLeaf, isLeafDir) // Simple test when marker is set. - testTreeWalkMarker(t, listDir, isLeaf) + testTreeWalkMarker(t, listDir, isLeaf, isLeafDir) err = os.RemoveAll(fsDir) if err != nil { t.Fatal(err) @@ -222,6 +231,15 @@ func TestTreeWalkTimeout(t *testing.T) { isLeaf := func(volume, prefix string) bool { return !hasSuffix(prefix, slashSeparator) } + + isLeafDir := func(volume, prefix string) bool { + entries, listErr := disk.ListDir(volume, prefix, 1) + if listErr != nil { + return false + } + return len(entries) == 0 + } + listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk) // TreeWalk pool with 2 seconds timeout for tree-walk go routines. @@ -231,7 +249,7 @@ func TestTreeWalkTimeout(t *testing.T) { prefix := "" marker := "" recursive := true - resultCh := startTreeWalk(context.Background(), volume, prefix, marker, recursive, listDir, isLeaf, endWalkCh) + resultCh := startTreeWalk(context.Background(), volume, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh) params := listParams{ bucket: volume, @@ -373,6 +391,14 @@ func TestRecursiveTreeWalk(t *testing.T) { return !hasSuffix(prefix, slashSeparator) } + isLeafDir := func(volume, prefix string) bool { + entries, listErr := disk1.ListDir(volume, prefix, 1) + if listErr != nil { + return false + } + return len(entries) == 0 + } + // Create listDir function. listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk1) @@ -450,7 +476,7 @@ func TestRecursiveTreeWalk(t *testing.T) { for i, testCase := range testCases { for entry := range startTreeWalk(context.Background(), volume, testCase.prefix, testCase.marker, testCase.recursive, - listDir, isLeaf, endWalkCh) { + listDir, isLeaf, isLeafDir, endWalkCh) { if _, found := testCase.expected[entry.entry]; !found { t.Errorf("Test %d: Expected %s, but couldn't find", i+1, entry.entry) } @@ -479,6 +505,15 @@ func TestSortedness(t *testing.T) { isLeaf := func(volume, prefix string) bool { return !hasSuffix(prefix, slashSeparator) } + + isLeafDir := func(volume, prefix string) bool { + entries, listErr := disk1.ListDir(volume, prefix, 1) + if listErr != nil { + return false + } + return len(entries) == 0 + } + // Create listDir function. listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk1) @@ -522,7 +557,7 @@ func TestSortedness(t *testing.T) { var actualEntries []string for entry := range startTreeWalk(context.Background(), volume, test.prefix, test.marker, test.recursive, - listDir, isLeaf, endWalkCh) { + listDir, isLeaf, isLeafDir, endWalkCh) { actualEntries = append(actualEntries, entry.entry) } if !sort.IsSorted(sort.StringSlice(actualEntries)) { @@ -553,6 +588,15 @@ func TestTreeWalkIsEnd(t *testing.T) { isLeaf := func(volume, prefix string) bool { return !hasSuffix(prefix, slashSeparator) } + + isLeafDir := func(volume, prefix string) bool { + entries, listErr := disk1.ListDir(volume, prefix, 1) + if listErr != nil { + return false + } + return len(entries) == 0 + } + // Create listDir function. listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk1) @@ -595,7 +639,7 @@ func TestTreeWalkIsEnd(t *testing.T) { } for i, test := range testCases { var entry treeWalkResult - for entry = range startTreeWalk(context.Background(), volume, test.prefix, test.marker, test.recursive, listDir, isLeaf, endWalkCh) { + for entry = range startTreeWalk(context.Background(), volume, test.prefix, test.marker, test.recursive, listDir, isLeaf, isLeafDir, endWalkCh) { } if entry.entry != test.expectedEntry { t.Errorf("Test %d: Expected entry %s, but received %s with the EOF marker", i, test.expectedEntry, entry.entry) diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index ebc0e4d42..44e9f4af6 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -623,7 +623,7 @@ func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucke // Returns function "listDir" of the type listDirFunc. // isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry. // disks - used for doing disk.ListDir(). Sets passes set of disks. -func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, treeWalkIgnoredErrs []error, sets ...[]StorageAPI) listDirFunc { +func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, isLeafDir isLeafDirFunc, treeWalkIgnoredErrs []error, sets ...[]StorageAPI) listDirFunc { listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string, err error) { for _, disk := range disks { if disk == nil { @@ -632,7 +632,7 @@ func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, treeWalkIgnoredE var entries []string var newEntries []string - entries, err = disk.ListDir(bucket, prefixDir) + entries, err = disk.ListDir(bucket, prefixDir, -1) if err != nil { // For any reason disk was deleted or goes offline, continue // and list from other disks if possible. @@ -723,13 +723,17 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi return s.getHashedSet(entry).isObject(bucket, entry) } + isLeafDir := func(bucket, entry string) bool { + return s.getHashedSet(entry).isObjectDir(bucket, entry) + } + var setDisks = make([][]StorageAPI, len(s.sets)) for _, set := range s.sets { setDisks = append(setDisks, set.getLoadBalancedDisks()) } - listDir := listDirSetsFactory(ctx, isLeaf, xlTreeWalkIgnoredErrs, setDisks...) - walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh) + listDir := listDirSetsFactory(ctx, isLeaf, isLeafDir, xlTreeWalkIgnoredErrs, setDisks...) + walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh) } for i := 0; i < maxKeys; { @@ -781,7 +785,7 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi result = ListObjectsInfo{IsTruncated: !eof} for _, objInfo := range objInfos { result.NextMarker = objInfo.Name - if objInfo.IsDir { + if objInfo.IsDir && delimiter == slashSeparator { result.Prefixes = append(result.Prefixes, objInfo.Name) continue } @@ -1270,7 +1274,7 @@ func listDirSetsHealFactory(isLeaf isLeafFunc, sets ...[]StorageAPI) listDirFunc } var entries []string var newEntries []string - entries, err = disk.ListDir(bucket, prefixDir) + entries, err = disk.ListDir(bucket, prefixDir, -1) if err != nil { continue } @@ -1362,7 +1366,7 @@ func (s *xlSets) listObjectsHeal(ctx context.Context, bucket, prefix, marker, de } listDir := listDirSetsHealFactory(isLeaf, setDisks...) - walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, nil, endWalkCh) + walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, nil, nil, endWalkCh) } var objInfos []ObjectInfo diff --git a/cmd/xl-v1-common.go b/cmd/xl-v1-common.go index 6de5ec96c..c7bfb818e 100644 --- a/cmd/xl-v1-common.go +++ b/cmd/xl-v1-common.go @@ -76,6 +76,32 @@ func (xl xlObjects) isObject(bucket, prefix string) (ok bool) { return false } +// isObjectDir returns if the specified path represents an empty directory. +func (xl xlObjects) isObjectDir(bucket, prefix string) (ok bool) { + for _, disk := range xl.getLoadBalancedDisks() { + if disk == nil { + continue + } + // Check if 'prefix' is an object on this 'disk', else continue the check the next disk + ctnts, err := disk.ListDir(bucket, prefix, 1) + if err == nil { + if len(ctnts) == 0 { + return true + } + return false + } + // Ignore for file not found, disk not found or faulty disk. + if IsErrIgnored(err, xlTreeWalkIgnoredErrs...) { + continue + } + reqInfo := &logger.ReqInfo{BucketName: bucket} + reqInfo.AppendTags("prefix", prefix) + ctx := logger.SetReqInfo(context.Background(), reqInfo) + logger.LogIf(ctx, err) + } // Exhausted all disks - return false. + return false +} + // Calculate the space occupied by an object in a single disk func (xl xlObjects) sizeOnDisk(fileSize int64, blockSize int64, dataBlocks int) int64 { numBlocks := fileSize / blockSize diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index 68001b0e8..286979412 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -422,7 +422,7 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o } // List and delete the object directory, - files, derr := disk.ListDir(bucket, object) + files, derr := disk.ListDir(bucket, object, -1) if derr == nil { for _, entry := range files { _ = disk.DeleteFile(bucket, diff --git a/cmd/xl-v1-list-objects.go b/cmd/xl-v1-list-objects.go index bb3d876ec..696093ccb 100644 --- a/cmd/xl-v1-list-objects.go +++ b/cmd/xl-v1-list-objects.go @@ -33,7 +33,7 @@ func listDirFactory(ctx context.Context, isLeaf isLeafFunc, treeWalkIgnoredErrs } var entries []string var newEntries []string - entries, err = disk.ListDir(bucket, prefixDir) + entries, err = disk.ListDir(bucket, prefixDir, -1) if err != nil { // For any reason disk was deleted or goes offline, continue // and list from other disks if possible. @@ -78,8 +78,9 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del if walkResultCh == nil { endWalkCh = make(chan struct{}) isLeaf := xl.isObject + isLeafDir := xl.isObjectDir listDir := listDirFactory(ctx, isLeaf, xlTreeWalkIgnoredErrs, xl.getLoadBalancedDisks()...) - walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh) + walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh) } var objInfos []ObjectInfo @@ -136,7 +137,7 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del result := ListObjectsInfo{IsTruncated: !eof} for _, objInfo := range objInfos { result.NextMarker = objInfo.Name - if objInfo.IsDir { + if objInfo.IsDir && delimiter == slashSeparator { result.Prefixes = append(result.Prefixes, objInfo.Name) continue } diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 0cff64284..182d24edf 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -156,7 +156,7 @@ func (xl xlObjects) ListMultipartUploads(ctx context.Context, bucket, object, ke if disk == nil { continue } - uploadIDs, err := disk.ListDir(minioMetaMultipartBucket, xl.getMultipartSHADir(bucket, object)) + uploadIDs, err := disk.ListDir(minioMetaMultipartBucket, xl.getMultipartSHADir(bucket, object), -1) if err != nil { if err == errFileNotFound { return result, nil @@ -862,12 +862,12 @@ func (xl xlObjects) cleanupStaleMultipartUploads(ctx context.Context, cleanupInt // Remove the old multipart uploads on the given disk. func (xl xlObjects) cleanupStaleMultipartUploadsOnDisk(ctx context.Context, disk StorageAPI, expiry time.Duration) { now := time.Now() - shaDirs, err := disk.ListDir(minioMetaMultipartBucket, "") + shaDirs, err := disk.ListDir(minioMetaMultipartBucket, "", -1) if err != nil { return } for _, shaDir := range shaDirs { - uploadIDDirs, err := disk.ListDir(minioMetaMultipartBucket, shaDir) + uploadIDDirs, err := disk.ListDir(minioMetaMultipartBucket, shaDir, -1) if err != nil { continue } diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index ac0a37fd7..a3f7e9ff1 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -359,6 +359,9 @@ func (xl xlObjects) GetObjectInfo(ctx context.Context, bucket, object string) (o } if hasSuffix(object, slashSeparator) { + if !xl.isObjectDir(bucket, object) { + return oi, toObjectErr(errFileNotFound, bucket, object) + } if oi, e = xl.getObjectInfoDir(ctx, bucket, object); e != nil { return oi, toObjectErr(e, bucket, object) }