mirror of
https://github.com/minio/minio.git
synced 2025-01-25 21:53:16 -05:00
fix: bring back delayed leaf detection in listing (#10346)
This commit is contained in:
parent
17a1eda702
commit
d19b434ffc
@ -36,7 +36,7 @@ import (
|
||||
const (
|
||||
// RFC3339 a subset of the ISO8601 timestamp format. e.g 2014-04-29T18:30:38Z
|
||||
iso8601TimeFormat = "2006-01-02T15:04:05.000Z" // Reply date format with nanosecond precision.
|
||||
maxObjectList = 10000 // Limit number of objects in a listObjectsResponse/listObjectsVersionsResponse.
|
||||
maxObjectList = 1000 // Limit number of objects in a listObjectsResponse/listObjectsVersionsResponse.
|
||||
maxDeleteList = 10000 // Limit number of objects deleted in a delete call.
|
||||
maxUploadsList = 10000 // Limit number of uploads in a listUploadsResponse.
|
||||
maxPartsList = 10000 // Limit number of parts in a listPartsResponse.
|
||||
|
@ -838,9 +838,17 @@ func (f *FileInfoCh) Push(fi FileInfo) {
|
||||
// if the caller wishes to list N entries to call lexicallySortedEntry
|
||||
// N times until this boolean is 'false'.
|
||||
func lexicallySortedEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) (FileInfo, int, bool) {
|
||||
for i := range entryChs {
|
||||
entries[i], entriesValid[i] = entryChs[i].Pop()
|
||||
var wg sync.WaitGroup
|
||||
for j := range entryChs {
|
||||
j := j
|
||||
wg.Add(1)
|
||||
// Pop() entries in parallel for large drive setups.
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
entries[j], entriesValid[j] = entryChs[j].Pop()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
var isTruncated = false
|
||||
for _, valid := range entriesValid {
|
||||
@ -902,9 +910,17 @@ func lexicallySortedEntry(entryChs []FileInfoCh, entries []FileInfo, entriesVali
|
||||
// if the caller wishes to list N entries to call lexicallySortedEntry
|
||||
// N times until this boolean is 'false'.
|
||||
func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileInfoVersions, entriesValid []bool) (FileInfoVersions, int, bool) {
|
||||
for i := range entryChs {
|
||||
entries[i], entriesValid[i] = entryChs[i].Pop()
|
||||
var wg sync.WaitGroup
|
||||
for j := range entryChs {
|
||||
j := j
|
||||
wg.Add(1)
|
||||
// Pop() entries in parallel for large drive setups.
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
entries[j], entriesValid[j] = entryChs[j].Pop()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
var isTruncated = false
|
||||
for _, valid := range entriesValid {
|
||||
|
22
cmd/fs-v1.go
22
cmd/fs-v1.go
@ -1334,23 +1334,31 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string, op
|
||||
return ObjectInfo{Bucket: bucket, Name: object}, nil
|
||||
}
|
||||
|
||||
func (fs *FSObjects) isLeafDir(bucket string, leafPath string) bool {
|
||||
return fs.isObjectDir(bucket, leafPath)
|
||||
}
|
||||
|
||||
func (fs *FSObjects) isLeaf(bucket string, leafPath string) bool {
|
||||
return !strings.HasSuffix(leafPath, slashSeparator)
|
||||
}
|
||||
|
||||
// Returns function "listDir" of the type listDirFunc.
|
||||
// isLeaf - is used by listDir function to check if an entry
|
||||
// is a leaf or non-leaf entry.
|
||||
func (fs *FSObjects) listDirFactory() ListDirFunc {
|
||||
// listDir - lists all the entries at a given prefix and given entry in the prefix.
|
||||
listDir := func(bucket, prefixDir, prefixEntry string) (emptyDir bool, entries []string) {
|
||||
listDir := func(bucket, prefixDir, prefixEntry string) (emptyDir bool, entries []string, delayIsLeaf bool) {
|
||||
var err error
|
||||
entries, err = readDir(pathJoin(fs.fsPath, bucket, prefixDir))
|
||||
if err != nil && err != errFileNotFound {
|
||||
logger.LogIf(GlobalContext, err)
|
||||
return false, nil
|
||||
return false, nil, false
|
||||
}
|
||||
if len(entries) == 0 {
|
||||
return true, nil
|
||||
return true, nil, false
|
||||
}
|
||||
sort.Strings(entries)
|
||||
return false, filterMatchingPrefix(entries, prefixEntry)
|
||||
entries, delayIsLeaf = filterListEntries(bucket, prefixDir, entries, prefixEntry, fs.isLeaf)
|
||||
return false, entries, delayIsLeaf
|
||||
}
|
||||
|
||||
// Return list factory instance.
|
||||
@ -1453,7 +1461,7 @@ func (fs *FSObjects) ListObjects(ctx context.Context, bucket, prefix, marker, de
|
||||
}()
|
||||
|
||||
return listObjects(ctx, fs, bucket, prefix, marker, delimiter, maxKeys, fs.listPool,
|
||||
fs.listDirFactory(), fs.getObjectInfoNoFSLock, fs.getObjectInfoNoFSLock)
|
||||
fs.listDirFactory(), fs.isLeaf, fs.isLeafDir, fs.getObjectInfoNoFSLock, fs.getObjectInfoNoFSLock)
|
||||
}
|
||||
|
||||
// GetObjectTags - get object tags from an existing object
|
||||
@ -1550,7 +1558,7 @@ func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun, remo
|
||||
// error walker returns error. Optionally if context.Done() is received
|
||||
// then Walk() stops the walker.
|
||||
func (fs *FSObjects) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error {
|
||||
return fsWalk(ctx, fs, bucket, prefix, fs.listDirFactory(), results, fs.getObjectInfoNoFSLock, fs.getObjectInfoNoFSLock)
|
||||
return fsWalk(ctx, fs, bucket, prefix, fs.listDirFactory(), fs.isLeaf, fs.isLeafDir, results, fs.getObjectInfoNoFSLock, fs.getObjectInfoNoFSLock)
|
||||
}
|
||||
|
||||
// HealObjects - no-op for fs. Valid only for Erasure.
|
||||
|
@ -48,8 +48,8 @@ var (
|
||||
// ListObjects function alias.
|
||||
ListObjects = listObjects
|
||||
|
||||
// FilterMatchingPrefix function alias.
|
||||
FilterMatchingPrefix = filterMatchingPrefix
|
||||
// FilterListEntries function alias.
|
||||
FilterListEntries = filterListEntries
|
||||
|
||||
// IsStringEqual is string equal.
|
||||
IsStringEqual = isStringEqual
|
||||
|
@ -348,9 +348,17 @@ func (n *hdfsObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketIn
|
||||
return buckets, nil
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) isLeafDir(bucket, leafPath string) bool {
|
||||
return n.isObjectDir(context.Background(), bucket, leafPath)
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) isLeaf(bucket, leafPath string) bool {
|
||||
return !strings.HasSuffix(leafPath, hdfsSeparator)
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) listDirFactory() minio.ListDirFunc {
|
||||
// listDir - lists all the entries at a given prefix and given entry in the prefix.
|
||||
listDir := func(bucket, prefixDir, prefixEntry string) (emptyDir bool, entries []string) {
|
||||
listDir := func(bucket, prefixDir, prefixEntry string) (emptyDir bool, entries []string, delayIsLeaf bool) {
|
||||
f, err := n.clnt.Open(n.hdfsPathJoin(bucket, prefixDir))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
@ -366,7 +374,7 @@ func (n *hdfsObjects) listDirFactory() minio.ListDirFunc {
|
||||
return
|
||||
}
|
||||
if len(fis) == 0 {
|
||||
return true, nil
|
||||
return true, nil, false
|
||||
}
|
||||
for _, fi := range fis {
|
||||
if fi.IsDir() {
|
||||
@ -375,7 +383,8 @@ func (n *hdfsObjects) listDirFactory() minio.ListDirFunc {
|
||||
entries = append(entries, fi.Name())
|
||||
}
|
||||
}
|
||||
return false, minio.FilterMatchingPrefix(entries, prefixEntry)
|
||||
entries, delayIsLeaf = minio.FilterListEntries(bucket, prefixDir, entries, prefixEntry, n.isLeaf)
|
||||
return false, entries, delayIsLeaf
|
||||
}
|
||||
|
||||
// Return list factory instance.
|
||||
@ -426,7 +435,7 @@ func (n *hdfsObjects) ListObjects(ctx context.Context, bucket, prefix, marker, d
|
||||
return objectInfo, nil
|
||||
}
|
||||
|
||||
return minio.ListObjects(ctx, n, bucket, prefix, marker, delimiter, maxKeys, n.listPool, n.listDirFactory(), getObjectInfo, getObjectInfo)
|
||||
return minio.ListObjects(ctx, n, bucket, prefix, marker, delimiter, maxKeys, n.listPool, n.listDirFactory(), n.isLeaf, n.isLeafDir, getObjectInfo, getObjectInfo)
|
||||
}
|
||||
|
||||
// Lists a path's direct, first-level entries and populates them in the `fileInfos` cache which maps
|
||||
|
@ -139,11 +139,11 @@ func cleanupDir(ctx context.Context, storage StorageAPI, volume, dirPath string)
|
||||
return err
|
||||
}
|
||||
|
||||
func listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, tpool *TreeWalkPool, listDir ListDirFunc, getObjInfo func(context.Context, string, string) (ObjectInfo, error), getObjectInfoDirs ...func(context.Context, string, string) (ObjectInfo, error)) (loi ListObjectsInfo, err error) {
|
||||
func listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, tpool *TreeWalkPool, listDir ListDirFunc, isLeaf IsLeafFunc, isLeafDir IsLeafDirFunc, getObjInfo func(context.Context, string, string) (ObjectInfo, error), getObjectInfoDirs ...func(context.Context, string, string) (ObjectInfo, error)) (loi ListObjectsInfo, err error) {
|
||||
endWalkCh := make(chan struct{})
|
||||
defer close(endWalkCh)
|
||||
recursive := true
|
||||
walkResultCh := startTreeWalk(ctx, bucket, prefix, "", recursive, listDir, endWalkCh)
|
||||
walkResultCh := startTreeWalk(ctx, bucket, prefix, "", recursive, listDir, isLeaf, isLeafDir, endWalkCh)
|
||||
|
||||
var objInfos []ObjectInfo
|
||||
var eof bool
|
||||
@ -227,14 +227,14 @@ func listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter
|
||||
// to allocate a receive channel for ObjectInfo, upon any unhandled
|
||||
// error walker returns error. Optionally if context.Done() is received
|
||||
// then Walk() stops the walker.
|
||||
func fsWalk(ctx context.Context, obj ObjectLayer, bucket, prefix string, listDir ListDirFunc, results chan<- ObjectInfo, getObjInfo func(context.Context, string, string) (ObjectInfo, error), getObjectInfoDirs ...func(context.Context, string, string) (ObjectInfo, error)) error {
|
||||
func fsWalk(ctx context.Context, obj ObjectLayer, bucket, prefix string, listDir ListDirFunc, isLeaf IsLeafFunc, isLeafDir IsLeafDirFunc, results chan<- ObjectInfo, getObjInfo func(context.Context, string, string) (ObjectInfo, error), getObjectInfoDirs ...func(context.Context, string, string) (ObjectInfo, error)) error {
|
||||
if err := checkListObjsArgs(ctx, bucket, prefix, "", obj); err != nil {
|
||||
// Upon error close the channel.
|
||||
close(results)
|
||||
return err
|
||||
}
|
||||
|
||||
walkResultCh := startTreeWalk(ctx, bucket, prefix, "", true, listDir, ctx.Done())
|
||||
walkResultCh := startTreeWalk(ctx, bucket, prefix, "", true, listDir, isLeaf, isLeafDir, ctx.Done())
|
||||
|
||||
go func() {
|
||||
defer close(results)
|
||||
@ -277,9 +277,9 @@ func fsWalk(ctx context.Context, obj ObjectLayer, bucket, prefix string, listDir
|
||||
return nil
|
||||
}
|
||||
|
||||
func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, delimiter string, maxKeys int, tpool *TreeWalkPool, listDir ListDirFunc, getObjInfo func(context.Context, string, string) (ObjectInfo, error), getObjectInfoDirs ...func(context.Context, string, string) (ObjectInfo, error)) (loi ListObjectsInfo, err error) {
|
||||
func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, delimiter string, maxKeys int, tpool *TreeWalkPool, listDir ListDirFunc, isLeaf IsLeafFunc, isLeafDir IsLeafDirFunc, getObjInfo func(context.Context, string, string) (ObjectInfo, error), getObjectInfoDirs ...func(context.Context, string, string) (ObjectInfo, error)) (loi ListObjectsInfo, err error) {
|
||||
if delimiter != SlashSeparator && delimiter != "" {
|
||||
return listObjectsNonSlash(ctx, bucket, prefix, marker, delimiter, maxKeys, tpool, listDir, getObjInfo, getObjectInfoDirs...)
|
||||
return listObjectsNonSlash(ctx, bucket, prefix, marker, delimiter, maxKeys, tpool, listDir, isLeaf, isLeafDir, getObjInfo, getObjectInfoDirs...)
|
||||
}
|
||||
|
||||
if err := checkListObjsArgs(ctx, bucket, prefix, marker, obj); err != nil {
|
||||
@ -322,7 +322,7 @@ func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, d
|
||||
walkResultCh, endWalkCh := tpool.Release(listParams{bucket, recursive, marker, prefix})
|
||||
if walkResultCh == nil {
|
||||
endWalkCh = make(chan struct{})
|
||||
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, endWalkCh)
|
||||
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh)
|
||||
}
|
||||
|
||||
var objInfos []ObjectInfo
|
||||
|
@ -30,10 +30,10 @@ import (
|
||||
// refer https://github.com/golang/go/issues/24015
|
||||
const blockSize = 8 << 10 // 8192
|
||||
|
||||
// By default atleast 1000 entries in single getdents call
|
||||
// By default atleast 20 entries in single getdents call
|
||||
var direntPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
buf := make([]byte, blockSize*1000)
|
||||
buf := make([]byte, blockSize*20)
|
||||
return &buf
|
||||
},
|
||||
}
|
||||
|
111
cmd/tree-walk.go
111
cmd/tree-walk.go
@ -51,15 +51,84 @@ func filterMatchingPrefix(entries []string, prefixEntry string) []string {
|
||||
}
|
||||
end--
|
||||
}
|
||||
sort.Strings(entries[start:end])
|
||||
return entries[start:end]
|
||||
}
|
||||
|
||||
// xl.ListDir returns entries with trailing "/" for directories. At the object layer
|
||||
// we need to remove this trailing "/" for objects and retain "/" for prefixes before
|
||||
// sorting because the trailing "/" can affect the sorting results for certain cases.
|
||||
// Ex. lets say entries = ["a-b/", "a/"] and both are objects.
|
||||
// sorting with out trailing "/" = ["a", "a-b"]
|
||||
// sorting with trailing "/" = ["a-b/", "a/"]
|
||||
// Hence if entries[] does not have a case like the above example then isLeaf() check
|
||||
// can be delayed till the entry is pushed into the TreeWalkResult channel.
|
||||
// delayIsLeafCheck() returns true if isLeaf can be delayed or false if
|
||||
// isLeaf should be done in listDir()
|
||||
func delayIsLeafCheck(entries []string) bool {
|
||||
for i, entry := range entries {
|
||||
if i == len(entries)-1 {
|
||||
break
|
||||
}
|
||||
// If any byte in the "entry" string is less than '/' then the
|
||||
// next "entry" should not contain '/' at the same same byte position.
|
||||
for j := 0; j < len(entry); j++ {
|
||||
if entry[j] < '/' {
|
||||
if len(entries[i+1]) > j {
|
||||
if entries[i+1][j] == '/' {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// ListDirFunc - "listDir" function of type listDirFunc returned by listDirFactory() - explained below.
|
||||
type ListDirFunc func(bucket, prefixDir, prefixEntry string) (emptyDir bool, entries []string)
|
||||
type ListDirFunc func(bucket, prefixDir, prefixEntry string) (emptyDir bool, entries []string, delayIsLeaf bool)
|
||||
|
||||
// IsLeafFunc - A function isLeaf of type isLeafFunc is used to detect if an
|
||||
// entry is a leaf entry. There are 2 scenarios where isLeaf should behave
|
||||
// differently depending on the backend:
|
||||
// 1. FS backend object listing - isLeaf is true if the entry
|
||||
// has no trailing "/"
|
||||
// 2. Erasure backend object listing - isLeaf is true if the entry
|
||||
// is a directory and contains xl.meta
|
||||
type IsLeafFunc func(string, string) bool
|
||||
|
||||
// IsLeafDirFunc - A function isLeafDir of type isLeafDirFunc is used to detect
|
||||
// if an entry is empty directory.
|
||||
type IsLeafDirFunc func(string, string) bool
|
||||
|
||||
func filterListEntries(bucket, prefixDir string, entries []string, prefixEntry string, isLeaf IsLeafFunc) ([]string, bool) {
|
||||
// Listing needs to be sorted.
|
||||
sort.Strings(entries)
|
||||
|
||||
// Filter entries that have the prefix prefixEntry.
|
||||
entries = filterMatchingPrefix(entries, prefixEntry)
|
||||
|
||||
// Can isLeaf() check be delayed till when it has to be sent down the
|
||||
// TreeWalkResult channel?
|
||||
delayIsLeaf := delayIsLeafCheck(entries)
|
||||
if delayIsLeaf {
|
||||
return entries, true
|
||||
}
|
||||
|
||||
// isLeaf() check has to happen here so that trailing "/" for objects can be removed.
|
||||
for i, entry := range entries {
|
||||
if isLeaf(bucket, pathJoin(prefixDir, entry)) {
|
||||
entries[i] = strings.TrimSuffix(entry, slashSeparator)
|
||||
}
|
||||
}
|
||||
|
||||
// Sort again after removing trailing "/" for objects as the previous sort
|
||||
// does not hold good anymore.
|
||||
sort.Strings(entries)
|
||||
return entries, false
|
||||
}
|
||||
|
||||
// treeWalk walks directory tree recursively pushing TreeWalkResult into the channel as and when it encounters files.
|
||||
func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir ListDirFunc, resultCh chan TreeWalkResult, endWalkCh <-chan struct{}, isEnd bool) (emptyDir bool, treeErr error) {
|
||||
func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir ListDirFunc, isLeaf IsLeafFunc, isLeafDir IsLeafDirFunc, resultCh chan TreeWalkResult, endWalkCh <-chan struct{}, isEnd bool) (emptyDir bool, treeErr error) {
|
||||
// Example:
|
||||
// if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively
|
||||
// called with prefixDir="one/two/three/four/" and marker="five.txt"
|
||||
@ -75,7 +144,12 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
|
||||
}
|
||||
}
|
||||
|
||||
emptyDir, entries := listDir(bucket, prefixDir, entryPrefixMatch)
|
||||
emptyDir, entries, delayIsLeaf := listDir(bucket, prefixDir, entryPrefixMatch)
|
||||
// When isleaf check is delayed, make sure that it is set correctly here.
|
||||
if delayIsLeaf && isLeaf == nil || isLeafDir == nil {
|
||||
return false, errInvalidArgument
|
||||
}
|
||||
|
||||
// For an empty list return right here.
|
||||
if emptyDir {
|
||||
return true, nil
|
||||
@ -94,8 +168,23 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
|
||||
}
|
||||
|
||||
for i, entry := range entries {
|
||||
pentry := pathJoin(prefixDir, entry)
|
||||
isDir := HasSuffix(pentry, SlashSeparator)
|
||||
var leaf, leafDir bool
|
||||
|
||||
// Decision to do isLeaf check was pushed from listDir() to here.
|
||||
if delayIsLeaf {
|
||||
leaf = isLeaf(bucket, pathJoin(prefixDir, entry))
|
||||
if leaf {
|
||||
entry = strings.TrimSuffix(entry, slashSeparator)
|
||||
}
|
||||
} else {
|
||||
leaf = !strings.HasSuffix(entry, slashSeparator)
|
||||
}
|
||||
|
||||
if strings.HasSuffix(entry, slashSeparator) {
|
||||
leafDir = isLeafDir(bucket, pathJoin(prefixDir, entry))
|
||||
}
|
||||
|
||||
isDir := !leafDir && !leaf
|
||||
|
||||
if i == 0 && markerDir == entry {
|
||||
if !recursive {
|
||||
@ -123,8 +212,8 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
|
||||
// markIsEnd is passed to this entry's treeWalk() so that treeWalker.end can be marked
|
||||
// true at the end of the treeWalk stream.
|
||||
markIsEnd := i == len(entries)-1 && isEnd
|
||||
emptyDir, err := doTreeWalk(ctx, bucket, pentry, prefixMatch, markerArg, recursive,
|
||||
listDir, resultCh, endWalkCh, markIsEnd)
|
||||
emptyDir, err := doTreeWalk(ctx, bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive,
|
||||
listDir, isLeaf, isLeafDir, resultCh, endWalkCh, markIsEnd)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -142,7 +231,7 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
|
||||
select {
|
||||
case <-endWalkCh:
|
||||
return false, errWalkAbort
|
||||
case resultCh <- TreeWalkResult{entry: pentry, end: isEOF}:
|
||||
case resultCh <- TreeWalkResult{entry: pathJoin(prefixDir, entry), end: isEOF}:
|
||||
}
|
||||
}
|
||||
|
||||
@ -151,7 +240,7 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
|
||||
}
|
||||
|
||||
// Initiate a new treeWalk in a goroutine.
|
||||
func startTreeWalk(ctx context.Context, bucket, prefix, marker string, recursive bool, listDir ListDirFunc, endWalkCh <-chan struct{}) chan TreeWalkResult {
|
||||
func startTreeWalk(ctx context.Context, bucket, prefix, marker string, recursive bool, listDir ListDirFunc, isLeaf IsLeafFunc, isLeafDir IsLeafDirFunc, endWalkCh <-chan struct{}) chan TreeWalkResult {
|
||||
// Example 1
|
||||
// If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt"
|
||||
// treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt"
|
||||
@ -173,7 +262,7 @@ func startTreeWalk(ctx context.Context, bucket, prefix, marker string, recursive
|
||||
marker = strings.TrimPrefix(marker, prefixDir)
|
||||
go func() {
|
||||
isEnd := true // Indication to start walking the tree with end as true.
|
||||
doTreeWalk(ctx, bucket, prefixDir, entryPrefixMatch, marker, recursive, listDir, resultCh, endWalkCh, isEnd)
|
||||
doTreeWalk(ctx, bucket, prefixDir, entryPrefixMatch, marker, recursive, listDir, isLeaf, isLeafDir, resultCh, endWalkCh, isEnd)
|
||||
close(resultCh)
|
||||
}()
|
||||
return resultCh
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
@ -85,27 +86,27 @@ func createNamespace(disk StorageAPI, volume string, files []string) error {
|
||||
|
||||
// Returns function "listDir" of the type listDirFunc.
|
||||
// disks - used for doing disk.ListDir()
|
||||
func listDirFactory(ctx context.Context, disk StorageAPI) ListDirFunc {
|
||||
return func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string) {
|
||||
func listDirFactory(ctx context.Context, disk StorageAPI, isLeaf IsLeafFunc) ListDirFunc {
|
||||
return func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string, delayIsLeaf bool) {
|
||||
entries, err := disk.ListDir(volume, dirPath, -1)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
return false, nil, false
|
||||
}
|
||||
if len(entries) == 0 {
|
||||
return true, nil
|
||||
return true, nil, false
|
||||
}
|
||||
sort.Strings(entries)
|
||||
return false, filterMatchingPrefix(entries, dirEntry)
|
||||
entries, delayIsLeaf = filterListEntries(volume, dirPath, entries, dirEntry, isLeaf)
|
||||
return false, entries, delayIsLeaf
|
||||
}
|
||||
}
|
||||
|
||||
// Test if tree walker returns entries matching prefix alone are received
|
||||
// when a non empty prefix is supplied.
|
||||
func testTreeWalkPrefix(t *testing.T, listDir ListDirFunc) {
|
||||
func testTreeWalkPrefix(t *testing.T, listDir ListDirFunc, isLeaf IsLeafFunc, isLeafDir IsLeafDirFunc) {
|
||||
// Start the tree walk go-routine.
|
||||
prefix := "d/"
|
||||
endWalkCh := make(chan struct{})
|
||||
twResultCh := startTreeWalk(context.Background(), volume, prefix, "", true, listDir, endWalkCh)
|
||||
twResultCh := startTreeWalk(context.Background(), volume, prefix, "", true, listDir, isLeaf, isLeafDir, endWalkCh)
|
||||
|
||||
// Check if all entries received on the channel match the prefix.
|
||||
for res := range twResultCh {
|
||||
@ -116,11 +117,11 @@ func testTreeWalkPrefix(t *testing.T, listDir ListDirFunc) {
|
||||
}
|
||||
|
||||
// Test if entries received on tree walk's channel appear after the supplied marker.
|
||||
func testTreeWalkMarker(t *testing.T, listDir ListDirFunc) {
|
||||
func testTreeWalkMarker(t *testing.T, listDir ListDirFunc, isLeaf IsLeafFunc, isLeafDir IsLeafDirFunc) {
|
||||
// Start the tree walk go-routine.
|
||||
prefix := ""
|
||||
endWalkCh := make(chan struct{})
|
||||
twResultCh := startTreeWalk(context.Background(), volume, prefix, "d/g", true, listDir, endWalkCh)
|
||||
twResultCh := startTreeWalk(context.Background(), volume, prefix, "d/g", true, listDir, isLeaf, isLeafDir, endWalkCh)
|
||||
|
||||
// Check if only 3 entries, namely d/g/h, i/j/k, lmn are received on the channel.
|
||||
expectedCount := 3
|
||||
@ -157,12 +158,22 @@ func TestTreeWalk(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
listDir := listDirFactory(context.Background(), disk)
|
||||
isLeaf := func(bucket, leafPath string) bool {
|
||||
return !strings.HasSuffix(leafPath, slashSeparator)
|
||||
}
|
||||
|
||||
isLeafDir := func(bucket, leafPath string) bool {
|
||||
entries, _ := disk.ListDir(bucket, leafPath, 1)
|
||||
return len(entries) == 0
|
||||
}
|
||||
|
||||
listDir := listDirFactory(context.Background(), disk, isLeaf)
|
||||
|
||||
// Simple test for prefix based walk.
|
||||
testTreeWalkPrefix(t, listDir)
|
||||
testTreeWalkPrefix(t, listDir, isLeaf, isLeafDir)
|
||||
|
||||
// Simple test when marker is set.
|
||||
testTreeWalkMarker(t, listDir)
|
||||
testTreeWalkMarker(t, listDir, isLeaf, isLeafDir)
|
||||
|
||||
err = os.RemoveAll(fsDir)
|
||||
if err != nil {
|
||||
@ -191,7 +202,16 @@ func TestTreeWalkTimeout(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
listDir := listDirFactory(context.Background(), disk)
|
||||
isLeaf := func(bucket, leafPath string) bool {
|
||||
return !strings.HasSuffix(leafPath, slashSeparator)
|
||||
}
|
||||
|
||||
isLeafDir := func(bucket, leafPath string) bool {
|
||||
entries, _ := disk.ListDir(bucket, leafPath, 1)
|
||||
return len(entries) == 0
|
||||
}
|
||||
|
||||
listDir := listDirFactory(context.Background(), disk, isLeaf)
|
||||
|
||||
// TreeWalk pool with 2 seconds timeout for tree-walk go routines.
|
||||
pool := NewTreeWalkPool(2 * time.Second)
|
||||
@ -200,7 +220,7 @@ func TestTreeWalkTimeout(t *testing.T) {
|
||||
prefix := ""
|
||||
marker := ""
|
||||
recursive := true
|
||||
resultCh := startTreeWalk(context.Background(), volume, prefix, marker, recursive, listDir, endWalkCh)
|
||||
resultCh := startTreeWalk(context.Background(), volume, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh)
|
||||
|
||||
params := listParams{
|
||||
bucket: volume,
|
||||
@ -250,8 +270,17 @@ func TestRecursiveTreeWalk(t *testing.T) {
|
||||
t.Fatalf("Unable to create StorageAPI: %s", err)
|
||||
}
|
||||
|
||||
isLeaf := func(bucket, leafPath string) bool {
|
||||
return !strings.HasSuffix(leafPath, slashSeparator)
|
||||
}
|
||||
|
||||
isLeafDir := func(bucket, leafPath string) bool {
|
||||
entries, _ := disk1.ListDir(bucket, leafPath, 1)
|
||||
return len(entries) == 0
|
||||
}
|
||||
|
||||
// Create listDir function.
|
||||
listDir := listDirFactory(context.Background(), disk1)
|
||||
listDir := listDirFactory(context.Background(), disk1, isLeaf)
|
||||
|
||||
// Create the namespace.
|
||||
var files = []string{
|
||||
@ -329,7 +358,7 @@ func TestRecursiveTreeWalk(t *testing.T) {
|
||||
t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) {
|
||||
for entry := range startTreeWalk(context.Background(), volume,
|
||||
testCase.prefix, testCase.marker, testCase.recursive,
|
||||
listDir, endWalkCh) {
|
||||
listDir, isLeaf, isLeafDir, endWalkCh) {
|
||||
if _, found := testCase.expected[entry.entry]; !found {
|
||||
t.Errorf("Expected %s, but couldn't find", entry.entry)
|
||||
}
|
||||
@ -355,8 +384,17 @@ func TestSortedness(t *testing.T) {
|
||||
t.Fatalf("Unable to create StorageAPI: %s", err)
|
||||
}
|
||||
|
||||
isLeaf := func(bucket, leafPath string) bool {
|
||||
return !strings.HasSuffix(leafPath, slashSeparator)
|
||||
}
|
||||
|
||||
isLeafDir := func(bucket, leafPath string) bool {
|
||||
entries, _ := disk1.ListDir(bucket, leafPath, 1)
|
||||
return len(entries) == 0
|
||||
}
|
||||
|
||||
// Create listDir function.
|
||||
listDir := listDirFactory(context.Background(), disk1)
|
||||
listDir := listDirFactory(context.Background(), disk1, isLeaf)
|
||||
|
||||
// Create the namespace.
|
||||
var files = []string{
|
||||
@ -398,7 +436,7 @@ func TestSortedness(t *testing.T) {
|
||||
var actualEntries []string
|
||||
for entry := range startTreeWalk(context.Background(), volume,
|
||||
test.prefix, test.marker, test.recursive,
|
||||
listDir, endWalkCh) {
|
||||
listDir, isLeaf, isLeafDir, endWalkCh) {
|
||||
actualEntries = append(actualEntries, entry.entry)
|
||||
}
|
||||
if !sort.IsSorted(sort.StringSlice(actualEntries)) {
|
||||
@ -426,8 +464,17 @@ func TestTreeWalkIsEnd(t *testing.T) {
|
||||
t.Fatalf("Unable to create StorageAPI: %s", err)
|
||||
}
|
||||
|
||||
isLeaf := func(bucket, leafPath string) bool {
|
||||
return !strings.HasSuffix(leafPath, slashSeparator)
|
||||
}
|
||||
|
||||
isLeafDir := func(bucket, leafPath string) bool {
|
||||
entries, _ := disk1.ListDir(bucket, leafPath, 1)
|
||||
return len(entries) == 0
|
||||
}
|
||||
|
||||
// Create listDir function.
|
||||
listDir := listDirFactory(context.Background(), disk1)
|
||||
listDir := listDirFactory(context.Background(), disk1, isLeaf)
|
||||
|
||||
// Create the namespace.
|
||||
var files = []string{
|
||||
@ -469,7 +516,7 @@ func TestTreeWalkIsEnd(t *testing.T) {
|
||||
for i, test := range testCases {
|
||||
var entry TreeWalkResult
|
||||
for entry = range startTreeWalk(context.Background(), volume, test.prefix,
|
||||
test.marker, test.recursive, listDir, endWalkCh) {
|
||||
test.marker, test.recursive, listDir, isLeaf, isLeafDir, endWalkCh) {
|
||||
}
|
||||
if entry.entry != test.expectedEntry {
|
||||
t.Errorf("Test %d: Expected entry %s, but received %s with the EOF marker", i, test.expectedEntry, entry.entry)
|
||||
|
@ -28,10 +28,10 @@ import (
|
||||
"io/ioutil"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
slashpath "path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -733,8 +733,6 @@ func (s *xlStorage) ListDirSplunk(volume, dirPath string, count int) (entries []
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
const receiptJSON = "receipt.json"
|
||||
|
||||
atomic.AddInt32(&s.activeIOCount, 1)
|
||||
defer func() {
|
||||
atomic.AddInt32(&s.activeIOCount, -1)
|
||||
@ -765,24 +763,45 @@ func (s *xlStorage) ListDirSplunk(volume, dirPath string, count int) (entries []
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i, entry := range entries {
|
||||
if entry != receiptJSON {
|
||||
continue
|
||||
}
|
||||
_, err = os.Stat(pathJoin(dirPathAbs, entry, xlStorageFormatFile))
|
||||
if err == nil {
|
||||
entries[i] = strings.TrimSuffix(entry, SlashSeparator)
|
||||
continue
|
||||
}
|
||||
if os.IsNotExist(err) {
|
||||
if err = s.renameLegacyMetadata(volume, pathJoin(dirPath, entry)); err == nil {
|
||||
// Rename was successful means we found old `xl.json`
|
||||
entries[i] = strings.TrimSuffix(entry, SlashSeparator)
|
||||
}
|
||||
}
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
return entries, nil
|
||||
func (s *xlStorage) isLeafSplunk(volume string, leafPath string) bool {
|
||||
const receiptJSON = "receipt.json"
|
||||
|
||||
if path.Base(leafPath) != receiptJSON {
|
||||
return false
|
||||
}
|
||||
return s.isLeaf(volume, leafPath)
|
||||
}
|
||||
|
||||
func (s *xlStorage) isLeaf(volume string, leafPath string) bool {
|
||||
volumeDir, err := s.getVolDir(volume)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
_, err = os.Stat(pathJoin(volumeDir, leafPath, xlStorageFormatFile))
|
||||
if err == nil {
|
||||
return true
|
||||
}
|
||||
if os.IsNotExist(err) {
|
||||
// We need a fallback code where directory might contain
|
||||
// legacy `xl.json`, in such situation we just rename
|
||||
// and proceed if rename is successful we know that it
|
||||
// is the leaf since `xl.json` was present.
|
||||
return s.renameLegacyMetadata(volume, leafPath) == nil
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *xlStorage) isLeafDir(volume, leafPath string) bool {
|
||||
volumeDir, err := s.getVolDir(volume)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return isDirEmpty(pathJoin(volumeDir, leafPath))
|
||||
}
|
||||
|
||||
// WalkSplunk - is a sorted walker which returns file entries in lexically
|
||||
@ -810,19 +829,19 @@ func (s *xlStorage) WalkSplunk(volume, dirPath, marker string, endWalkCh <-chan
|
||||
ch = make(chan FileInfo)
|
||||
go func() {
|
||||
defer close(ch)
|
||||
listDir := func(volume, dirPath, dirEntry string) (bool, []string) {
|
||||
listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string, delayIsLeaf bool) {
|
||||
entries, err := s.ListDirSplunk(volume, dirPath, -1)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
return false, nil, false
|
||||
}
|
||||
if len(entries) == 0 {
|
||||
return true, nil
|
||||
return true, nil, false
|
||||
}
|
||||
sort.Strings(entries)
|
||||
return false, filterMatchingPrefix(entries, dirEntry)
|
||||
entries, delayIsLeaf = filterListEntries(volume, dirPath, entries, dirEntry, s.isLeafSplunk)
|
||||
return false, entries, delayIsLeaf
|
||||
}
|
||||
|
||||
walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, true, listDir, endWalkCh)
|
||||
walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, true, listDir, s.isLeafSplunk, s.isLeafDir, endWalkCh)
|
||||
for {
|
||||
walkResult, ok := <-walkResultCh
|
||||
if !ok {
|
||||
@ -895,23 +914,22 @@ func (s *xlStorage) WalkVersions(volume, dirPath, marker string, recursive bool,
|
||||
}
|
||||
}
|
||||
|
||||
// buffer channel matches the S3 ListObjects implementation
|
||||
ch = make(chan FileInfoVersions, maxObjectList)
|
||||
ch = make(chan FileInfoVersions)
|
||||
go func() {
|
||||
defer close(ch)
|
||||
listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string) {
|
||||
listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string, delayIsLeaf bool) {
|
||||
entries, err := s.ListDir(volume, dirPath, -1)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
return false, nil, false
|
||||
}
|
||||
if len(entries) == 0 {
|
||||
return true, nil
|
||||
return true, nil, false
|
||||
}
|
||||
sort.Strings(entries)
|
||||
return false, filterMatchingPrefix(entries, dirEntry)
|
||||
entries, delayIsLeaf = filterListEntries(volume, dirPath, entries, dirEntry, s.isLeaf)
|
||||
return false, entries, delayIsLeaf
|
||||
}
|
||||
|
||||
walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, recursive, listDir, endWalkCh)
|
||||
walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, recursive, listDir, s.isLeaf, s.isLeafDir, endWalkCh)
|
||||
for walkResult := range walkResultCh {
|
||||
var fiv FileInfoVersions
|
||||
if HasSuffix(walkResult.entry, SlashSeparator) {
|
||||
@ -981,23 +999,22 @@ func (s *xlStorage) Walk(volume, dirPath, marker string, recursive bool, endWalk
|
||||
}
|
||||
}
|
||||
|
||||
// buffer channel matches the S3 ListObjects implementation
|
||||
ch = make(chan FileInfo, maxObjectList)
|
||||
ch = make(chan FileInfo)
|
||||
go func() {
|
||||
defer close(ch)
|
||||
listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string) {
|
||||
listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string, delayIsLeaf bool) {
|
||||
entries, err := s.ListDir(volume, dirPath, -1)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
return false, nil, false
|
||||
}
|
||||
if len(entries) == 0 {
|
||||
return true, nil
|
||||
return true, nil, false
|
||||
}
|
||||
sort.Strings(entries)
|
||||
return false, filterMatchingPrefix(entries, dirEntry)
|
||||
entries, delayIsLeaf = filterListEntries(volume, dirPath, entries, dirEntry, s.isLeaf)
|
||||
return false, entries, delayIsLeaf
|
||||
}
|
||||
|
||||
walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, recursive, listDir, endWalkCh)
|
||||
walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, recursive, listDir, s.isLeaf, s.isLeafDir, endWalkCh)
|
||||
for walkResult := range walkResultCh {
|
||||
var fi FileInfo
|
||||
if HasSuffix(walkResult.entry, SlashSeparator) {
|
||||
@ -1066,20 +1083,6 @@ func (s *xlStorage) ListDir(volume, dirPath string, count int) (entries []string
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i, entry := range entries {
|
||||
_, err = os.Stat(pathJoin(dirPathAbs, entry, xlStorageFormatFile))
|
||||
if err == nil {
|
||||
entries[i] = strings.TrimSuffix(entry, SlashSeparator)
|
||||
continue
|
||||
}
|
||||
if os.IsNotExist(err) {
|
||||
if err = s.renameLegacyMetadata(volume, pathJoin(dirPath, entry)); err == nil {
|
||||
// if rename was successful, means we did find old `xl.json`
|
||||
entries[i] = strings.TrimSuffix(entry, SlashSeparator)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user