XL/tree-walk: Added comments, changed variable names and structure fields to improve code readability. (#1856)

This commit is contained in:
Krishna Srinivas 2016-06-06 00:25:45 +05:30 committed by Anand Babu (AB) Periasamy
parent 37551a2ad3
commit acc393ba8b
4 changed files with 85 additions and 76 deletions

View File

@ -27,43 +27,53 @@ const (
globalLookupTimeout = time.Minute * 30 // 30minutes.
)
// errWalkAbort - returned by the treeWalker routine, it signals the end of treeWalk.
var errWalkAbort = errors.New("treeWalk abort")
// treeWalkerPoolInfo - tree walker pool info carries temporary walker
// channel stored until timeout is called.
type treeWalkerPoolInfo struct {
treeWalkerCh chan treeWalker
treeWalkerDoneCh chan struct{}
doneCh chan<- struct{}
// listParams - list object params used for list object map
type listParams struct {
bucket string
recursive bool
marker string
prefix string
}
// treeWalkerPool - tree walker pool is a set of temporary tree walker
// objects. Any item stored in the pool will be removed automatically at
// a given timeOut value. This pool is safe for use by multiple
// goroutines simultaneously. pool's purpose is to cache tree walker
// channels for later reuse.
type treeWalkerPool struct {
pool map[listParams][]treeWalkerPoolInfo
// errWalkAbort - returned by doTreeWalk() if it returns prematurely.
// doTreeWalk() can return prematurely if
// 1) treeWalk is timed out by the timer go-routine.
// 2) there is an error during tree walk.
var errWalkAbort = errors.New("treeWalk abort")
// treeWalk - represents the go routine that does the file tree walk.
type treeWalk struct {
resultCh chan treeWalkResult
endWalkCh chan struct{} // To signal when treeWalk go-routine should end.
endTimerCh chan<- struct{} // To signal when timer go-routine should end.
}
// treeWalkPool - pool of treeWalk go routines.
// A treeWalk is added to the pool by Set() and removed either by
// doing a Release() or if the concerned timer goes off.
// treeWalkPool's purpose is to maintain active treeWalk go-routines in a map so that
// it can be looked up across related list calls.
type treeWalkPool struct {
pool map[listParams][]treeWalk
timeOut time.Duration
lock *sync.Mutex
}
// newTreeWalkerPool - initialize new tree walker pool.
func newTreeWalkerPool(timeout time.Duration) *treeWalkerPool {
tPool := &treeWalkerPool{
pool: make(map[listParams][]treeWalkerPoolInfo),
// newTreeWalkPool - initialize new tree walk pool.
func newTreeWalkPool(timeout time.Duration) *treeWalkPool {
tPool := &treeWalkPool{
pool: make(map[listParams][]treeWalk),
timeOut: timeout,
lock: &sync.Mutex{},
}
return tPool
}
// Release - selects an item from the pool based on the input
// listParams, removes it from the pool, and returns treeWalker
// channels. Release will return nil, if listParams is not
// recognized.
func (t treeWalkerPool) Release(params listParams) (treeWalkerCh chan treeWalker, treeWalkerDoneCh chan struct{}) {
// Release - selects a treeWalk from the pool based on the input
// listParams, removes it from the pool, and returns the treeWalkResult
// channel.
// Returns nil if listParams does not have an asccociated treeWalk.
func (t treeWalkPool) Release(params listParams) (resultCh chan treeWalkResult, endWalkCh chan struct{}) {
t.lock.Lock()
defer t.lock.Unlock()
walks, ok := t.pool[params] // Pick the valid walks.
@ -77,40 +87,45 @@ func (t treeWalkerPool) Release(params listParams) (treeWalkerCh chan treeWalker
} else {
delete(t.pool, params)
}
walk.doneCh <- struct{}{}
return walk.treeWalkerCh, walk.treeWalkerDoneCh
walk.endTimerCh <- struct{}{}
return walk.resultCh, walk.endWalkCh
}
}
// Release return nil if params not found.
return nil, nil
}
// Set - adds new list params along with treeWalker channel to the
// pool for future. Additionally this also starts a go routine which
// waits at the configured timeout. Additionally this go-routine is
// also closed pro-actively by 'Release' call when the treeWalker
// item is obtained from the pool.
func (t treeWalkerPool) Set(params listParams, treeWalkerCh chan treeWalker, treeWalkerDoneCh chan struct{}) {
// Set - adds a treeWalk to the treeWalkPool.
// Also starts a timer go-routine that ends when:
// 1) time.After() expires after t.timeOut seconds.
// The expiration is needed so that the treeWalk go-routine resources are freed after a timeout
// if the S3 client does only partial listing of objects.
// 2) Relase() signals the timer go-routine to end on endTimerCh.
// During listing the timer should not timeout and end the treeWalk go-routine, hence the
// timer go-routine should be ended.
func (t treeWalkPool) Set(params listParams, resultCh chan treeWalkResult, endWalkCh chan struct{}) {
t.lock.Lock()
defer t.lock.Unlock()
// Should be a buffered channel so that Release() never blocks.
var doneCh = make(chan struct{}, 1)
walkInfo := treeWalkerPoolInfo{
treeWalkerCh: treeWalkerCh,
treeWalkerDoneCh: treeWalkerDoneCh,
doneCh: doneCh,
endTimerCh := make(chan struct{}, 1)
walkInfo := treeWalk{
resultCh: resultCh,
endWalkCh: endWalkCh,
endTimerCh: endTimerCh,
}
// Append new walk info.
t.pool[params] = append(t.pool[params], walkInfo)
// Safe expiry of treeWalkerCh after timeout.
go func(doneCh <-chan struct{}) {
// Timer go-routine which times out after t.timeOut seconds.
go func(endTimerCh <-chan struct{}) {
select {
// Wait until timeOut
case <-time.After(t.timeOut):
// Timeout has expired. Remove the treeWalk from treeWalkPool and
// end the treeWalk go-routine.
t.lock.Lock()
walks, ok := t.pool[params] // Look for valid walks.
walks, ok := t.pool[params]
if ok {
// Look for walkInfo, remove it from the walks list.
for i, walk := range walks {
@ -118,19 +133,21 @@ func (t treeWalkerPool) Set(params listParams, treeWalkerCh chan treeWalker, tre
walks = append(walks[:i], walks[i+1:]...)
}
}
// Walks is empty we have no more pending requests.
// Remove map entry.
if len(walks) == 0 {
// No more treeWalk go-routines associated with listParams
// hence remove map entry.
delete(t.pool, params)
} else { // Save the updated walks.
} else {
// There are more treeWalk go-routines associated with listParams
// hence save the list in the map.
t.pool[params] = walks
}
}
// Close tree walker for the backing go-routine to die.
close(treeWalkerDoneCh)
// Signal the treeWalk go-routine to die.
close(endWalkCh)
t.lock.Unlock()
case <-doneCh:
case <-endTimerCh:
return
}
}(doneCh)
}(endTimerCh)
}

View File

@ -21,16 +21,8 @@ import (
"strings"
)
// listParams - list object params used for list object map
type listParams struct {
bucket string
recursive bool
marker string
prefix string
}
// Tree walk result carries results of tree walking.
type treeWalker struct {
type treeWalkResult struct {
entry string
err error
end bool
@ -80,7 +72,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string)
}
// treeWalk walks directory tree recursively pushing fileInfo into the channel as and when it encounters files.
func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, treeWalkCh chan treeWalker, doneCh chan struct{}, isEnd bool) error {
func (xl xlObjects) doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error {
// Example:
// if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively
// called with prefixDir="one/two/three/four/" and marker="five.txt"
@ -100,9 +92,9 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string,
}, isLeaf)
if err != nil {
select {
case <-doneCh:
case <-endWalkCh:
return errWalkAbort
case treeWalkCh <- treeWalker{err: err}:
case resultCh <- treeWalkResult{err: err}:
return err
}
}
@ -149,7 +141,7 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string,
// markIsEnd is passed to this entry's treeWalk() so that treeWalker.end can be marked
// true at the end of the treeWalk stream.
markIsEnd := i == len(entries)-1 && isEnd
if tErr := xl.treeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, treeWalkCh, doneCh, markIsEnd); tErr != nil {
if tErr := xl.doTreeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, resultCh, endWalkCh, markIsEnd); tErr != nil {
return tErr
}
continue
@ -157,9 +149,9 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string,
// EOF is set if we are at last entry and the caller indicated we at the end.
isEOF := ((i == len(entries)-1) && isEnd)
select {
case <-doneCh:
case <-endWalkCh:
return errWalkAbort
case treeWalkCh <- treeWalker{entry: pathJoin(prefixDir, entry), end: isEOF}:
case resultCh <- treeWalkResult{entry: pathJoin(prefixDir, entry), end: isEOF}:
}
}
@ -168,7 +160,7 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string,
}
// Initiate a new treeWalk in a goroutine.
func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool, doneCh chan struct{}) chan treeWalker {
func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool, endWalkCh chan struct{}) chan treeWalkResult {
// Example 1
// If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt"
// treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt"
@ -179,7 +171,7 @@ func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool,
// treeWalk is called with prefixDir="one/two/" and marker="three/four/five.txt"
// and entryPrefixMatch="th"
treeWalkCh := make(chan treeWalker, maxObjectList)
resultCh := make(chan treeWalkResult, maxObjectList)
entryPrefixMatch := prefix
prefixDir := ""
lastIndex := strings.LastIndex(prefix, slashSeparator)
@ -190,8 +182,8 @@ func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool,
marker = strings.TrimPrefix(marker, prefixDir)
go func() {
isEnd := true // Indication to start walking the tree with end as true.
xl.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, treeWalkCh, doneCh, isEnd)
close(treeWalkCh)
xl.doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, resultCh, endWalkCh, isEnd)
close(resultCh)
}()
return treeWalkCh
return resultCh
}

View File

@ -26,17 +26,17 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
recursive = false
}
walkerCh, walkerDoneCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix})
if walkerCh == nil {
walkerDoneCh = make(chan struct{})
walkerCh = xl.startTreeWalk(bucket, prefix, marker, recursive, xl.isObject, walkerDoneCh)
walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix})
if walkResultCh == nil {
endWalkCh = make(chan struct{})
walkResultCh = xl.startTreeWalk(bucket, prefix, marker, recursive, xl.isObject, endWalkCh)
}
var objInfos []ObjectInfo
var eof bool
var nextMarker string
for i := 0; i < maxKeys; {
walkResult, ok := <-walkerCh
walkResult, ok := <-walkResultCh
if !ok {
// Closed channel.
eof = true
@ -76,7 +76,7 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
params := listParams{bucket, recursive, nextMarker, prefix}
if !eof {
xl.listPool.Set(params, walkerCh, walkerDoneCh)
xl.listPool.Set(params, walkResultCh, endWalkCh)
}
result := ListObjectsInfo{IsTruncated: !eof}

View File

@ -44,7 +44,7 @@ type xlObjects struct {
writeQuorum int // writeQuorum minimum required disks to write data.
// List pool management.
listPool *treeWalkerPool
listPool *treeWalkPool
}
// errXLMaxDisks - returned for reached maximum of disks.
@ -161,7 +161,7 @@ func newXLObjects(disks []string) (ObjectLayer, error) {
storageDisks: newPosixDisks,
dataBlocks: dataBlocks,
parityBlocks: parityBlocks,
listPool: newTreeWalkerPool(globalLookupTimeout),
listPool: newTreeWalkPool(globalLookupTimeout),
}
// Figure out read and write quorum based on number of storage disks.