From 1cf1532ca33dc1aab7d854f706868a70e8dc6413 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 29 May 2016 21:05:00 -0700 Subject: [PATCH] XL: Implement ListObjects channel and pool management. --- object-api-multipart_test.go | 1 + tree-walk-pool.go | 118 +++++++++++++++++++++++++++++++ tree-walk-xl.go | 130 +++++++++++++---------------------- xl-v1-list-objects.go | 18 ++--- xl-v1-multipart.go | 17 +++-- xl-v1.go | 15 ++-- 6 files changed, 190 insertions(+), 109 deletions(-) create mode 100644 tree-walk-pool.go diff --git a/object-api-multipart_test.go b/object-api-multipart_test.go index e6abb0095..a11ed2757 100644 --- a/object-api-multipart_test.go +++ b/object-api-multipart_test.go @@ -1056,6 +1056,7 @@ func testListMultipartUploads(obj ObjectLayer, instanceType string, t *testing.T } for i, testCase := range testCases { + // fmt.Println(testCase) // uncomment to peek into the test cases. actualResult, actualErr := obj.ListMultipartUploads(testCase.bucket, testCase.prefix, testCase.keyMarker, testCase.uploadIDMarker, testCase.delimiter, testCase.maxUploads) if actualErr != nil && testCase.shouldPass { t.Errorf("Test %d: %s: Expected to pass, but failed with: %s", i+1, instanceType, actualErr.Error()) diff --git a/tree-walk-pool.go b/tree-walk-pool.go new file mode 100644 index 000000000..11e957126 --- /dev/null +++ b/tree-walk-pool.go @@ -0,0 +1,118 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "sync" + "time" +) + +// Global lookup timeout. +const ( + globalLookupTimeout = time.Minute * 30 // 30minutes. +) + +// treeWalkerPoolInfo - tree walker pool info carries temporary walker +// channel stored until timeout is called. +type treeWalkerPoolInfo struct { + treeWalkerCh chan treeWalker + treeWalkerDoneCh chan struct{} + doneCh chan<- struct{} +} + +// treeWalkerPool - tree walker pool is a set of temporary tree walker +// objects. Any item stored in the pool will be removed automatically at +// a given timeOut value. This pool is safe for use by multiple +// goroutines simultaneously. pool's purpose is to cache tree walker +// channels for later reuse. +type treeWalkerPool struct { + pool map[listParams][]treeWalkerPoolInfo + timeOut time.Duration + lock *sync.Mutex +} + +// newTreeWalkerPool - initialize new tree walker pool. +func newTreeWalkerPool(timeout time.Duration) *treeWalkerPool { + tPool := &treeWalkerPool{ + pool: make(map[listParams][]treeWalkerPoolInfo), + timeOut: timeout, + lock: &sync.Mutex{}, + } + return tPool +} + +// Release - selects an item from the pool based on the input +// listParams, removes it from the pool, and returns treeWalker +// channels. Release will return nil, if listParams is not +// recognized. +func (t treeWalkerPool) Release(params listParams) (treeWalkerCh chan treeWalker, treeWalkerDoneCh chan struct{}) { + t.lock.Lock() + defer t.lock.Unlock() + treeWalk, ok := t.pool[params] + if ok { + if len(treeWalk) > 0 { + treeWalker := treeWalk[0] + if len(treeWalk[1:]) > 0 { + t.pool[params] = treeWalk[1:] + } else { + delete(t.pool, params) + } + treeWalker.doneCh <- struct{}{} + return treeWalker.treeWalkerCh, treeWalker.treeWalkerDoneCh + } + } + // Release return nil if params not found. + return nil, nil +} + +// Set - adds new list params along with treeWalker channel to the +// pool for future. Additionally this also starts a go routine which +// waits at the configured timeout. Additionally this go-routine is +// also closed pro-actively by 'Release' call when the treeWalker +// item is obtained from the pool. +func (t treeWalkerPool) Set(params listParams, treeWalkerCh chan treeWalker, treeWalkerDoneCh chan struct{}) { + t.lock.Lock() + defer t.lock.Unlock() + + var treeWalkerIdx = len(t.pool[params]) + var doneCh = make(chan struct{}) + t.pool[params] = append(t.pool[params], treeWalkerPoolInfo{ + treeWalkerCh: treeWalkerCh, + treeWalkerDoneCh: treeWalkerDoneCh, + doneCh: doneCh, + }) + + // Safe expiry of treeWalkerCh after timeout. + go func(doneCh <-chan struct{}) { + select { + // Wait until timeOut + case <-time.After(t.timeOut): + t.lock.Lock() + treeWalk := t.pool[params] + treeWalk = append(treeWalk[:treeWalkerIdx], treeWalk[treeWalkerIdx+1:]...) + if len(treeWalk) == 0 { + delete(t.pool, params) + } else { + t.pool[params] = treeWalk + } + close(treeWalkerDoneCh) + t.lock.Unlock() + case <-doneCh: + return + } + }(doneCh) +} diff --git a/tree-walk-xl.go b/tree-walk-xl.go index 3a0ddf3c8..d983353bb 100644 --- a/tree-walk-xl.go +++ b/tree-walk-xl.go @@ -19,7 +19,6 @@ package main import ( "sort" "strings" - "time" ) // listParams - list object params used for list object map @@ -31,20 +30,12 @@ type listParams struct { } // Tree walk result carries results of tree walking. -type treeWalkResult struct { +type treeWalker struct { entry string err error end bool } -// Tree walk notify carries a channel which notifies tree walk -// results, additionally it also carries information if treeWalk -// should be timedOut. -type treeWalker struct { - ch <-chan treeWalkResult - timedOut bool -} - // listDir - listDir. func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) bool, isLeaf func(string, string) bool) (entries []string, err error) { for _, disk := range xl.getLoadBalancedQuorumDisks() { @@ -62,7 +53,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) } // Skip the entries which do not match the filter. for i, entry := range entries { - if filter(entry) { + if !filter(entry) { entries[i] = "" continue } @@ -83,7 +74,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) } // treeWalk walks directory tree recursively pushing fileInfo into the channel as and when it encounters files. -func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResult) bool, count *int, isLeaf func(string, string) bool) bool { +func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, treeWalkCh chan treeWalker, doneCh chan struct{}, stackDepth int, isEnd bool) { // Example: // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively // called with prefixDir="one/two/three/four/" and marker="five.txt" @@ -99,14 +90,23 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, } } entries, err := xl.listDir(bucket, prefixDir, func(entry string) bool { - return !strings.HasPrefix(entry, entryPrefixMatch) + return strings.HasPrefix(entry, entryPrefixMatch) }, isLeaf) if err != nil { - send(treeWalkResult{err: err}) - return false + select { + case <-doneCh: + if stackDepth == 0 { + close(treeWalkCh) + } + case treeWalkCh <- treeWalker{err: err}: + } + return } if len(entries) == 0 { - return true + if stackDepth == 0 { + close(treeWalkCh) + } + return } // example: @@ -116,12 +116,16 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, return entries[i] >= markerDir }) entries = entries[idx:] - *count += len(entries) + if len(entries) == 0 { + if stackDepth == 0 { + close(treeWalkCh) + } + return + } for i, entry := range entries { if i == 0 && markerDir == entry { if !recursive { // Skip as the marker would already be listed in the previous listing. - *count-- continue } if recursive && !strings.HasSuffix(entry, slashSeparator) { @@ -130,11 +134,9 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, // should not be skipped, instead it will need to be treeWalk()'ed into. // Skip if it is a file though as it would be listed in previous listing. - *count-- continue } } - if recursive && strings.HasSuffix(entry, slashSeparator) { // If the entry is a directory, we will need recurse into it. markerArg := "" @@ -143,23 +145,37 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, // recursing into "four/" markerArg = markerBase } - *count-- prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories. - if !xl.treeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, send, count, isLeaf) { - return false + if i == len(entries)-1 && stackDepth == 0 { + isEnd = true } + stackDepth++ + xl.treeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, treeWalkCh, doneCh, stackDepth, isEnd) + stackDepth-- continue } - *count-- - if !send(treeWalkResult{entry: pathJoin(prefixDir, entry)}) { - return false + var isEOF bool + if stackDepth == 0 && i == len(entries)-1 { + isEOF = true + } else if i == len(entries)-1 && isEnd { + isEOF = true + } + select { + case <-doneCh: + if stackDepth == 0 { + close(treeWalkCh) + return + } + case treeWalkCh <- treeWalker{entry: pathJoin(prefixDir, entry), end: isEOF}: } } - return true + if stackDepth == 0 { + close(treeWalkCh) + } } // Initiate a new treeWalk in a goroutine. -func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool) *treeWalker { +func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool, doneCh chan struct{}) chan treeWalker { // Example 1 // If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt" // treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt" @@ -170,8 +186,7 @@ func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, // treeWalk is called with prefixDir="one/two/" and marker="three/four/five.txt" // and entryPrefixMatch="th" - ch := make(chan treeWalkResult, maxObjectList) - walkNotify := treeWalker{ch: ch} + treeWalkCh := make(chan treeWalker, maxObjectList) entryPrefixMatch := prefix prefixDir := "" lastIndex := strings.LastIndex(prefix, slashSeparator) @@ -179,58 +194,7 @@ func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, entryPrefixMatch = prefix[lastIndex+1:] prefixDir = prefix[:lastIndex+1] } - count := 0 marker = strings.TrimPrefix(marker, prefixDir) - go func() { - defer close(ch) - send := func(walkResult treeWalkResult) bool { - if count == 0 { - walkResult.end = true - } - timer := time.After(time.Second * 60) - select { - case ch <- walkResult: - return true - case <-timer: - walkNotify.timedOut = true - return false - } - } - xl.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count, isLeaf) - }() - return &walkNotify -} - -// Save the goroutine reference in the map -func (xl xlObjects) saveTreeWalk(params listParams, walker *treeWalker) { - xl.listObjectMapMutex.Lock() - defer xl.listObjectMapMutex.Unlock() - - walkers, _ := xl.listObjectMap[params] - walkers = append(walkers, walker) - - xl.listObjectMap[params] = walkers -} - -// Lookup the goroutine reference from map -func (xl xlObjects) lookupTreeWalk(params listParams) *treeWalker { - xl.listObjectMapMutex.Lock() - defer xl.listObjectMapMutex.Unlock() - - if walkChs, ok := xl.listObjectMap[params]; ok { - for i, walkCh := range walkChs { - if !walkCh.timedOut { - newWalkChs := walkChs[i+1:] - if len(newWalkChs) > 0 { - xl.listObjectMap[params] = newWalkChs - } else { - delete(xl.listObjectMap, params) - } - return walkCh - } - } - // As all channels are timed out, delete the map entry - delete(xl.listObjectMap, params) - } - return nil + go xl.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, treeWalkCh, doneCh, 0, false) + return treeWalkCh } diff --git a/xl-v1-list-objects.go b/xl-v1-list-objects.go index 2d5e8a71e..5708490f2 100644 --- a/xl-v1-list-objects.go +++ b/xl-v1-list-objects.go @@ -26,15 +26,17 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey recursive = false } - walker := xl.lookupTreeWalk(listParams{bucket, recursive, marker, prefix}) - if walker == nil { - walker = xl.startTreeWalk(bucket, prefix, marker, recursive, xl.isObject) + walkerCh, walkerDoneCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix}) + if walkerCh == nil { + walkerDoneCh = make(chan struct{}) + walkerCh = xl.startTreeWalk(bucket, prefix, marker, recursive, xl.isObject, walkerDoneCh) } + var objInfos []ObjectInfo var eof bool var nextMarker string for i := 0; i < maxKeys; { - walkResult, ok := <-walker.ch + walkResult, ok := <-walkerCh if !ok { // Closed channel. eof = true @@ -63,18 +65,18 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey return ListObjectsInfo{}, toObjectErr(err, bucket, prefix) } } - nextMarker = objInfo.Name objInfos = append(objInfos, objInfo) - if walkResult.end { + i++ + if walkResult.end == true { eof = true break } - i++ } + params := listParams{bucket, recursive, nextMarker, prefix} if !eof { - xl.saveTreeWalk(params, walker) + xl.listPool.Set(params, walkerCh, walkerDoneCh) } result := ListObjectsInfo{IsTruncated: !eof} diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index 844e3f66c..edec2d616 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -81,13 +81,14 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark } // Validate if we need to list further depending on maxUploads. if maxUploads > 0 { - walker := xl.lookupTreeWalk(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) - if walker == nil { - walker = xl.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, xl.isMultipartUpload) + walkerCh, walkerDoneCh := xl.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) + if walkerCh == nil { + walkerDoneCh = make(chan struct{}) + walkerCh = xl.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, xl.isMultipartUpload, walkerDoneCh) } // Collect uploads until we have reached maxUploads count to 0. for maxUploads > 0 { - walkResult, ok := <-walker.ch + walkResult, ok := <-walkerCh if !ok { // Closed channel. eof = true @@ -110,10 +111,8 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark }) maxUploads-- if maxUploads == 0 { - if walkResult.end { - eof = true - break - } + eof = true + break } continue } @@ -142,7 +141,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark } uploads = append(uploads, newUploads...) maxUploads -= len(newUploads) - if walkResult.end && end { + if end && walkResult.end { eof = true break } diff --git a/xl-v1.go b/xl-v1.go index 2211bbfd5..740ee0b70 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "sort" - "sync" "github.com/minio/minio/pkg/disk" ) @@ -45,8 +44,7 @@ type xlObjects struct { writeQuorum int // writeQuorum minimum required disks to write data. // List pool management. - listObjectMap map[listParams][]*treeWalker - listObjectMapMutex *sync.Mutex + listPool *treeWalkerPool } // errXLMaxDisks - returned for reached maximum of disks. @@ -159,12 +157,11 @@ func newXLObjects(disks []string) (ObjectLayer, error) { // Initialize xl objects. xl := xlObjects{ - physicalDisks: disks, - storageDisks: newPosixDisks, - dataBlocks: dataBlocks, - parityBlocks: parityBlocks, - listObjectMap: make(map[listParams][]*treeWalker), - listObjectMapMutex: &sync.Mutex{}, + physicalDisks: disks, + storageDisks: newPosixDisks, + dataBlocks: dataBlocks, + parityBlocks: parityBlocks, + listPool: newTreeWalkerPool(globalLookupTimeout), } // Figure out read and write quorum based on number of storage disks.