Moved tree-walk-fs to use tree-walk-pool (#1978)

This commit is contained in:
Krishnan Parthasarathi 2016-06-24 16:41:57 -07:00 committed by Harshavardhana
parent f625392211
commit a3a310cde8
3 changed files with 46 additions and 104 deletions

View File

@ -67,14 +67,15 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
maxUploads = maxUploads - len(uploads)
}
if maxUploads > 0 {
walker := fs.lookupTreeWalk(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath})
if walker == nil {
walker = fs.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, func(bucket, object string) bool {
walkResultCh, endWalkCh := fs.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath})
if walkResultCh == nil {
endWalkCh = make(chan struct{})
walkResultCh = fs.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, func(bucket, object string) bool {
return fs.isMultipartUpload(bucket, object)
})
}, endWalkCh)
}
for maxUploads > 0 {
walkResult, ok := <-walker.ch
walkResult, ok := <-walkResultCh
if !ok {
// Closed channel.
eof = true

View File

@ -27,7 +27,6 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"github.com/minio/minio/pkg/disk"
"github.com/minio/minio/pkg/mimedb"
@ -35,10 +34,11 @@ import (
// fsObjects - Implements fs object layer.
type fsObjects struct {
storage StorageAPI
physicalDisk string
listObjectMap map[listParams][]*treeWalkerFS
listObjectMapMutex *sync.Mutex
storage StorageAPI
physicalDisk string
// List pool management.
listPool *treeWalkPool
}
// creates format.json, the FS format info in minioMetaBucket.
@ -117,10 +117,9 @@ func newFSObjects(disk string) (ObjectLayer, error) {
// Return successfully initialized object layer.
return fsObjects{
storage: storage,
physicalDisk: disk,
listObjectMap: make(map[listParams][]*treeWalkerFS),
listObjectMapMutex: &sync.Mutex{},
storage: storage,
physicalDisk: disk,
listPool: newTreeWalkPool(globalLookupTimeout),
}, nil
}
@ -443,17 +442,18 @@ func (fs fsObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
recursive = false
}
walker := fs.lookupTreeWalk(listParams{bucket, recursive, marker, prefix})
if walker == nil {
walker = fs.startTreeWalk(bucket, prefix, marker, recursive, func(bucket, object string) bool {
walkResultCh, endWalkCh := fs.listPool.Release(listParams{bucket, recursive, marker, prefix})
if walkResultCh == nil {
endWalkCh = make(chan struct{})
walkResultCh = fs.startTreeWalk(bucket, prefix, marker, recursive, func(bucket, object string) bool {
return !strings.HasSuffix(object, slashSeparator)
})
}, endWalkCh)
}
var fileInfos []FileInfo
var eof bool
var nextMarker string
for i := 0; i < maxKeys; {
walkResult, ok := <-walker.ch
walkResult, ok := <-walkResultCh
if !ok {
// Closed channel.
eof = true
@ -481,7 +481,7 @@ func (fs fsObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
}
params := listParams{bucket, recursive, nextMarker, prefix}
if !eof {
fs.saveTreeWalk(params, walker)
fs.listPool.Set(params, walkResultCh, endWalkCh)
}
result := ListObjectsInfo{IsTruncated: !eof}

View File

@ -20,26 +20,10 @@ import (
"path"
"sort"
"strings"
"time"
)
// Tree walk notify carries a channel which notifies tree walk
// results, additionally it also carries information if treeWalk
// should be timedOut.
type treeWalkerFS struct {
ch <-chan treeWalkResultFS
timedOut bool
}
// Tree walk result carries results of tree walking.
type treeWalkResultFS struct {
entry string
err error
end bool
}
// treeWalk walks FS directory tree recursively pushing fileInfo into the channel as and when it encounters files.
func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResultFS) bool, count *int, isLeaf func(string, string) bool) bool {
func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error {
// Example:
// if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively
// called with prefixDir="one/two/three/four/" and marker="five.txt"
@ -56,8 +40,12 @@ func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string,
}
entries, err := fs.storage.ListDir(bucket, prefixDir)
if err != nil {
send(treeWalkResultFS{err: err})
return false
select {
case <-endWalkCh:
return errWalkAbort
case resultCh <- treeWalkResult{err: err}:
return err
}
}
for i, entry := range entries {
@ -77,7 +65,7 @@ func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string,
entries = entries[1:]
}
if len(entries) == 0 {
return true
return nil
}
// example:
// If markerDir="four/" Search() returns the index of "four/" in the sorted
@ -86,12 +74,10 @@ func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string,
return entries[i] >= markerDir
})
entries = entries[idx:]
*count += len(entries)
for i, entry := range entries {
if i == 0 && markerDir == entry {
if !recursive {
// Skip as the marker would already be listed in the previous listing.
*count--
continue
}
if recursive && !strings.HasSuffix(entry, slashSeparator) {
@ -100,7 +86,6 @@ func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string,
// should not be skipped, instead it will need to be treeWalk()'ed into.
// Skip if it is a file though as it would be listed in previous listing.
*count--
continue
}
}
@ -113,23 +98,27 @@ func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string,
// recursing into "four/"
markerArg = markerBase
}
*count--
prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories.
if !fs.treeWalk(bucket, path.Join(prefixDir, entry), prefixMatch, markerArg, recursive, send, count, isLeaf) {
return false
markIsEnd := i == len(entries)-1 && isEnd
if tErr := fs.treeWalk(bucket, path.Join(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, resultCh, endWalkCh, markIsEnd); tErr != nil {
return tErr
}
continue
}
*count--
if !send(treeWalkResultFS{entry: pathJoin(prefixDir, entry)}) {
return false
// EOF is set if we are at last entry and the caller indicated we at the end.
isEOF := ((i == len(entries)-1) && isEnd)
select {
case <-endWalkCh:
return errWalkAbort
case resultCh <- treeWalkResult{entry: pathJoin(prefixDir, entry), end: isEOF}:
}
}
return true
// Everything is listed
return nil
}
// Initiate a new treeWalk in a goroutine.
func (fs fsObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool) *treeWalkerFS {
func (fs fsObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool, endWalkCh chan struct{}) chan treeWalkResult {
// Example 1
// If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt"
// treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt"
@ -140,8 +129,7 @@ func (fs fsObjects) startTreeWalk(bucket, prefix, marker string, recursive bool,
// treeWalk is called with prefixDir="one/two/" and marker="three/four/five.txt"
// and entryPrefixMatch="th"
ch := make(chan treeWalkResultFS, maxObjectList)
walkNotify := treeWalkerFS{ch: ch}
resultCh := make(chan treeWalkResult, maxObjectList)
entryPrefixMatch := prefix
prefixDir := ""
lastIndex := strings.LastIndex(prefix, slashSeparator)
@ -149,58 +137,11 @@ func (fs fsObjects) startTreeWalk(bucket, prefix, marker string, recursive bool,
entryPrefixMatch = prefix[lastIndex+1:]
prefixDir = prefix[:lastIndex+1]
}
count := 0
marker = strings.TrimPrefix(marker, prefixDir)
go func() {
defer close(ch)
send := func(walkResult treeWalkResultFS) bool {
if count == 0 {
walkResult.end = true
}
timer := time.After(time.Second * 60)
select {
case ch <- walkResult:
return true
case <-timer:
walkNotify.timedOut = true
return false
}
}
fs.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count, isLeaf)
isEnd := true // Indication to start walking the tree with end as true.
fs.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, resultCh, endWalkCh, isEnd)
close(resultCh)
}()
return &walkNotify
}
// Save the goroutine reference in the map
func (fs fsObjects) saveTreeWalk(params listParams, walker *treeWalkerFS) {
fs.listObjectMapMutex.Lock()
defer fs.listObjectMapMutex.Unlock()
walkers, _ := fs.listObjectMap[params]
walkers = append(walkers, walker)
fs.listObjectMap[params] = walkers
}
// Lookup the goroutine reference from map
func (fs fsObjects) lookupTreeWalk(params listParams) *treeWalkerFS {
fs.listObjectMapMutex.Lock()
defer fs.listObjectMapMutex.Unlock()
if walkChs, ok := fs.listObjectMap[params]; ok {
for i, walkCh := range walkChs {
if !walkCh.timedOut {
newWalkChs := walkChs[i+1:]
if len(newWalkChs) > 0 {
fs.listObjectMap[params] = newWalkChs
} else {
delete(fs.listObjectMap, params)
}
return walkCh
}
}
// As all channels are timed out, delete the map entry
delete(fs.listObjectMap, params)
}
return nil
return resultCh
}