From 7a8b8cd0a10d81b7119db99d75d1a229746a0eea Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Mon, 4 Jul 2016 14:19:27 +0530 Subject: [PATCH] tree-walk: unify FS and XL tree-walk with functional approach. (#2027) --- fs-v1-multipart.go | 5 +- fs-v1.go | 5 +- tree-walk-fs.go | 147 -------------- tree-walk-xl.go => tree-walk.go | 103 +++++----- tree-walk_test.go | 349 ++++++++++++++++---------------- xl-v1-list-objects.go | 3 +- xl-v1-multipart.go | 3 +- 7 files changed, 233 insertions(+), 382 deletions(-) delete mode 100644 tree-walk-fs.go rename tree-walk-xl.go => tree-walk.go (60%) diff --git a/fs-v1-multipart.go b/fs-v1-multipart.go index 429ab3072..ca7b4bebd 100644 --- a/fs-v1-multipart.go +++ b/fs-v1-multipart.go @@ -70,9 +70,8 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark walkResultCh, endWalkCh := fs.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) if walkResultCh == nil { endWalkCh = make(chan struct{}) - walkResultCh = fs.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, func(bucket, object string) bool { - return fs.isMultipartUpload(bucket, object) - }, endWalkCh) + listDir := listDirFactory(fs.isMultipartUpload, fs.storage) + walkResultCh = startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, endWalkCh) } for maxUploads > 0 { walkResult, ok := <-walkResultCh diff --git a/fs-v1.go b/fs-v1.go index cc1c416b1..42c44b85a 100644 --- a/fs-v1.go +++ b/fs-v1.go @@ -468,9 +468,10 @@ func (fs fsObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey walkResultCh, endWalkCh := fs.listPool.Release(listParams{bucket, recursive, marker, prefix}) if walkResultCh == nil { endWalkCh = make(chan struct{}) - walkResultCh = fs.startTreeWalk(bucket, prefix, marker, recursive, func(bucket, object string) bool { + listDir := listDirFactory(func(bucket, object string) bool { return !strings.HasSuffix(object, slashSeparator) - }, endWalkCh) + }, fs.storage) + walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, endWalkCh) } var fileInfos []FileInfo var eof bool diff --git a/tree-walk-fs.go b/tree-walk-fs.go deleted file mode 100644 index b5ea4c656..000000000 --- a/tree-walk-fs.go +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "path" - "sort" - "strings" -) - -// treeWalk walks FS directory tree recursively pushing fileInfo into the channel as and when it encounters files. -func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error { - // Example: - // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively - // called with prefixDir="one/two/three/four/" and marker="five.txt" - - var markerBase, markerDir string - if marker != "" { - // Ex: if marker="four/five.txt", markerDir="four/" markerBase="five.txt" - markerSplit := strings.SplitN(marker, slashSeparator, 2) - markerDir = markerSplit[0] - if len(markerSplit) == 2 { - markerDir += slashSeparator - markerBase = markerSplit[1] - } - } - entries, err := fs.storage.ListDir(bucket, prefixDir) - if err != nil { - select { - case <-endWalkCh: - return errWalkAbort - case resultCh <- treeWalkResult{err: err}: - return err - } - } - - for i, entry := range entries { - if entryPrefixMatch != "" { - if !strings.HasPrefix(entry, entryPrefixMatch) { - entries[i] = "" - continue - } - } - if isLeaf(bucket, pathJoin(prefixDir, entry)) { - entries[i] = strings.TrimSuffix(entry, slashSeparator) - } - } - sort.Strings(entries) - // Skip the empty strings - for len(entries) > 0 && entries[0] == "" { - entries = entries[1:] - } - if len(entries) == 0 { - return nil - } - // example: - // If markerDir="four/" Search() returns the index of "four/" in the sorted - // entries list so we skip all the entries till "four/" - idx := sort.Search(len(entries), func(i int) bool { - return entries[i] >= markerDir - }) - entries = entries[idx:] - for i, entry := range entries { - if i == 0 && markerDir == entry { - if !recursive { - // Skip as the marker would already be listed in the previous listing. - continue - } - if recursive && !strings.HasSuffix(entry, slashSeparator) { - // We should not skip for recursive listing and if markerDir is a directory - // for ex. if marker is "four/five.txt" markerDir will be "four/" which - // should not be skipped, instead it will need to be treeWalk()'ed into. - - // Skip if it is a file though as it would be listed in previous listing. - continue - } - } - - if recursive && strings.HasSuffix(entry, slashSeparator) { - // If the entry is a directory, we will need recurse into it. - markerArg := "" - if entry == markerDir { - // We need to pass "five.txt" as marker only if we are - // recursing into "four/" - markerArg = markerBase - } - prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories. - markIsEnd := i == len(entries)-1 && isEnd - if tErr := fs.treeWalk(bucket, path.Join(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, resultCh, endWalkCh, markIsEnd); tErr != nil { - return tErr - } - continue - } - // EOF is set if we are at last entry and the caller indicated we at the end. - isEOF := ((i == len(entries)-1) && isEnd) - select { - case <-endWalkCh: - return errWalkAbort - case resultCh <- treeWalkResult{entry: pathJoin(prefixDir, entry), end: isEOF}: - } - } - // Everything is listed - return nil -} - -// Initiate a new treeWalk in a goroutine. -func (fs fsObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool, endWalkCh chan struct{}) chan treeWalkResult { - // Example 1 - // If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt" - // treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt" - // and entryPrefixMatch="" - - // Example 2 - // if prefix is "one/two/th" and marker is "one/two/three/four/five.txt" - // treeWalk is called with prefixDir="one/two/" and marker="three/four/five.txt" - // and entryPrefixMatch="th" - - resultCh := make(chan treeWalkResult, maxObjectList) - entryPrefixMatch := prefix - prefixDir := "" - lastIndex := strings.LastIndex(prefix, slashSeparator) - if lastIndex != -1 { - entryPrefixMatch = prefix[lastIndex+1:] - prefixDir = prefix[:lastIndex+1] - } - marker = strings.TrimPrefix(marker, prefixDir) - go func() { - isEnd := true // Indication to start walking the tree with end as true. - fs.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, resultCh, endWalkCh, isEnd) - close(resultCh) - }() - return resultCh -} diff --git a/tree-walk-xl.go b/tree-walk.go similarity index 60% rename from tree-walk-xl.go rename to tree-walk.go index b240068da..83b24583f 100644 --- a/tree-walk-xl.go +++ b/tree-walk.go @@ -28,51 +28,60 @@ type treeWalkResult struct { end bool } -// listDir - lists all the entries at a given prefix, takes additional params as filter and leaf detection. -// filter is required to filter out the listed entries usually this function is supposed to return -// true or false. -// isLeaf is required to differentiate between directories and objects, this is a special requirement for XL -// backend since objects are kept as directories, the only way to know if a directory is truly an object -// we validate if 'xl.json' exists at the leaf. isLeaf replies true/false based on the outcome of a Stat -// operation. -func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) bool, isLeaf func(string, string) bool) (entries []string, err error) { - for _, disk := range xl.getLoadBalancedQuorumDisks() { - if disk == nil { - continue - } - entries, err = disk.ListDir(bucket, prefixDir) - if err != nil { - // For any reason disk was deleted or goes offline, continue - // and list form other disks if possible. - if err == errDiskNotFound || err == errFaultyDisk { - continue - } - break - } - // Skip the entries which do not match the filter. - for i, entry := range entries { - if !filter(entry) { - entries[i] = "" - continue - } - if strings.HasSuffix(entry, slashSeparator) && isLeaf(bucket, pathJoin(prefixDir, entry)) { - entries[i] = strings.TrimSuffix(entry, slashSeparator) - } - } - sort.Strings(entries) - // Skip the empty strings - for len(entries) > 0 && entries[0] == "" { - entries = entries[1:] - } - return entries, nil - } +// "listDir" function of type listDirFunc returned by listDirFactory() - explained below. +type listDirFunc func(bucket, prefixDir, prefixEntry string) (entries []string, err error) - // Return error at the end. - return nil, err +// Returns function "listDir" of the type listDirFunc. +// isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry. +// disks - used for doing disk.ListDir(). FS passes single disk argument, XL passes a list of disks. +func listDirFactory(isLeaf func(string, string) bool, disks ...StorageAPI) listDirFunc { + // listDir - lists all the entries at a given prefix and given entry in the prefix. + // isLeaf is used to detect if an entry is a leaf entry. There are four scenarios where isLeaf + // should behave differently: + // 1. FS backend object listing - isLeaf is true if the entry has a trailing "/" + // 2. FS backend multipart listing - isLeaf is true if the entry is a directory and contains uploads.json + // 3. XL backend object listing - isLeaf is true if the entry is a directory and contains xl.json + // 4. XL backend multipart listing - isLeaf is true if the entry is a directory and contains uploads.json + listDir := func(bucket, prefixDir, prefixEntry string) (entries []string, err error) { + for _, disk := range disks { + if disk == nil { + continue + } + entries, err = disk.ListDir(bucket, prefixDir) + if err != nil { + // For any reason disk was deleted or goes offline, continue + // and list from other disks if possible. + if err == errDiskNotFound || err == errFaultyDisk { + continue + } + break + } + // Skip the entries which do not match the prefixEntry. + for i, entry := range entries { + if !strings.HasPrefix(entry, prefixEntry) { + entries[i] = "" + continue + } + if isLeaf(bucket, pathJoin(prefixDir, entry)) { + entries[i] = strings.TrimSuffix(entry, slashSeparator) + } + } + sort.Strings(entries) + // Skip the empty strings + for len(entries) > 0 && entries[0] == "" { + entries = entries[1:] + } + return entries, nil + } + + // Return error at the end. + return nil, err + } + return listDir } -// treeWalk walks directory tree recursively pushing fileInfo into the channel as and when it encounters files. -func (xl xlObjects) doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error { +// treeWalk walks directory tree recursively pushing treeWalkResult into the channel as and when it encounters files. +func doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir listDirFunc, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error { // Example: // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively // called with prefixDir="one/two/three/four/" and marker="five.txt" @@ -87,9 +96,7 @@ func (xl xlObjects) doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker strin markerBase = markerSplit[1] } } - entries, err := xl.listDir(bucket, prefixDir, func(entry string) bool { - return strings.HasPrefix(entry, entryPrefixMatch) - }, isLeaf) + entries, err := listDir(bucket, prefixDir, entryPrefixMatch) if err != nil { select { case <-endWalkCh: @@ -141,7 +148,7 @@ func (xl xlObjects) doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker strin // markIsEnd is passed to this entry's treeWalk() so that treeWalker.end can be marked // true at the end of the treeWalk stream. markIsEnd := i == len(entries)-1 && isEnd - if tErr := xl.doTreeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, resultCh, endWalkCh, markIsEnd); tErr != nil { + if tErr := doTreeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, listDir, resultCh, endWalkCh, markIsEnd); tErr != nil { return tErr } continue @@ -160,7 +167,7 @@ func (xl xlObjects) doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker strin } // Initiate a new treeWalk in a goroutine. -func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool, endWalkCh chan struct{}) chan treeWalkResult { +func startTreeWalk(bucket, prefix, marker string, recursive bool, listDir listDirFunc, endWalkCh chan struct{}) chan treeWalkResult { // Example 1 // If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt" // treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt" @@ -182,7 +189,7 @@ func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, marker = strings.TrimPrefix(marker, prefixDir) go func() { isEnd := true // Indication to start walking the tree with end as true. - xl.doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, resultCh, endWalkCh, isEnd) + doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, listDir, resultCh, endWalkCh, isEnd) close(resultCh) }() return resultCh diff --git a/tree-walk_test.go b/tree-walk_test.go index ed06050a2..1a7baef5a 100644 --- a/tree-walk_test.go +++ b/tree-walk_test.go @@ -17,43 +17,34 @@ package main import ( - "bytes" - "strconv" + "fmt" + "io/ioutil" "strings" "testing" "time" ) -// Helper function that invokes startTreeWalk depending on the type implementing objectLayer. -func startTreeWalk(obj ObjectLayer, bucket, prefix, marker string, - recursive bool, endWalkCh chan struct{}) chan treeWalkResult { - var twResultCh chan treeWalkResult - switch typ := obj.(type) { - case fsObjects: - twResultCh = typ.startTreeWalk(bucket, prefix, marker, true, - func(bucket, object string) bool { - return !strings.HasSuffix(object, slashSeparator) - }, endWalkCh) - case xlObjects: - twResultCh = typ.startTreeWalk(bucket, prefix, marker, true, - typ.isObject, endWalkCh) - } - return twResultCh +// Sample entries for the namespace. +var volume = "testvolume" +var files = []string{ + "d/e", + "d/f", + "d/g/h", + "i/j/k", + "lmn", } -// Helper function that creates a bucket, bucket and objects from objects []string. -func createObjNamespace(obj ObjectLayer, bucket string, objects []string) error { - // Make a bucket. - var err error - err = obj.MakeBucket(bucket) +// Helper function that creates a volume and files in it. +func createNamespace(disk StorageAPI, volume string, files []string) error { + // Make a volume. + err := disk.MakeVol(volume) if err != nil { return err } - // Create objects. - for _, object := range objects { - _, err = obj.PutObject(bucket, object, int64(len("hello")), - bytes.NewReader([]byte("hello")), nil) + // Create files. + for _, file := range files { + err = disk.AppendFile(volume, file, []byte{}) if err != nil { return err } @@ -61,32 +52,13 @@ func createObjNamespace(obj ObjectLayer, bucket string, objects []string) error return err } -// Wrapper for testTreeWalkPrefix to run the unit test for both FS and XL backend. -func TestTreeWalkPrefix(t *testing.T) { - ExecObjectLayerTest(t, testTreeWalkPrefix) -} - // Test if tree walker returns entries matching prefix alone are received // when a non empty prefix is supplied. -func testTreeWalkPrefix(obj ObjectLayer, instanceType string, t *testing.T) { - bucket := "abc" - objects := []string{ - "d/e", - "d/f", - "d/g/h", - "i/j/k", - "lmn", - } - - err := createObjNamespace(obj, bucket, objects) - if err != nil { - t.Fatal(err) - } - +func testTreeWalkPrefix(t *testing.T, listDir listDirFunc) { // Start the tree walk go-routine. prefix := "d/" endWalkCh := make(chan struct{}) - twResultCh := startTreeWalk(obj, bucket, prefix, "", true, endWalkCh) + twResultCh := startTreeWalk(volume, prefix, "", true, listDir, endWalkCh) // Check if all entries received on the channel match the prefix. for res := range twResultCh { @@ -96,31 +68,12 @@ func testTreeWalkPrefix(obj ObjectLayer, instanceType string, t *testing.T) { } } -// Wrapper for testTreeWalkMarker to run the unit test for both FS and XL backend. -func TestTreeWalkMarker(t *testing.T) { - ExecObjectLayerTest(t, testTreeWalkMarker) -} - // Test if entries received on tree walk's channel appear after the supplied marker. -func testTreeWalkMarker(obj ObjectLayer, instanceType string, t *testing.T) { - bucket := "abc" - objects := []string{ - "d/e", - "d/f", - "d/g/h", - "i/j/k", - "lmn", - } - - err := createObjNamespace(obj, bucket, objects) - if err != nil { - t.Fatal(err) - } - +func testTreeWalkMarker(t *testing.T, listDir listDirFunc) { // Start the tree walk go-routine. prefix := "" endWalkCh := make(chan struct{}) - twResultCh := startTreeWalk(obj, bucket, prefix, "d/g", true, endWalkCh) + twResultCh := startTreeWalk(volume, prefix, "d/g", true, listDir, endWalkCh) // Check if only 3 entries, namely d/g/h, i/j/k, lmn are received on the channel. expectedCount := 3 @@ -131,142 +84,178 @@ func testTreeWalkMarker(obj ObjectLayer, instanceType string, t *testing.T) { if expectedCount != actualCount { t.Errorf("Expected %d entries, actual no. of entries were %d", expectedCount, actualCount) } - } -// Wrapper for testTreeWalkAbort to run the unit test for both FS and XL backend. -func TestTreeWalkAbort(t *testing.T) { - ExecObjectLayerTest(t, testTreeWalkAbort) -} - -// Extend treeWalk type to provide a method to reset timeout -func (t *treeWalkPool) setTimeout(newTimeout time.Duration) { - t.timeOut = newTimeout -} - -// Helper function to set treewalk (idle) timeout -func setTimeout(obj ObjectLayer, newTimeout time.Duration) { - switch typ := obj.(type) { - case fsObjects: - typ.listPool.setTimeout(newTimeout) - case xlObjects: - typ.listPool.setTimeout(newTimeout) - +// Test tree-walk. +func TestTreeWalk(t *testing.T) { + fsDir, err := ioutil.TempDir("", "minio-") + if err != nil { + t.Errorf("Unable to create tmp directory: %s", err) } -} - -// Helper function to put the tree walk go-routine into the pool -func putbackTreeWalk(obj ObjectLayer, params listParams, resultCh chan treeWalkResult, endWalkCh chan struct{}) { - switch typ := obj.(type) { - case fsObjects: - typ.listPool.Set(params, resultCh, endWalkCh) - case xlObjects: - typ.listPool.Set(params, resultCh, endWalkCh) - - } -} - -// Test if tree walk go-routine exits cleanly if tree walk is aborted before compeletion. -func testTreeWalkAbort(obj ObjectLayer, instanceType string, t *testing.T) { - bucket := "abc" - - var objects []string - for i := 0; i < 1001; i++ { - objects = append(objects, "obj"+strconv.Itoa(i)) + disk, err := newStorageAPI(fsDir) + if err != nil { + t.Errorf("Unable to create StorageAPI: %s", err) } - err := createObjNamespace(obj, bucket, objects) + err = createNamespace(disk, volume, files) if err != nil { t.Fatal(err) } - // Set treewalk pool timeout to be test friendly - setTimeout(obj, 2*time.Second) - - // Start the tree walk go-routine. - prefix := "" - marker := "" - recursive := true - endWalkCh := make(chan struct{}) - twResultCh := startTreeWalk(obj, bucket, prefix, marker, recursive, endWalkCh) - - // Pull one result entry from the tree walk result channel. - <-twResultCh - - // Put the treewalk go-routine into tree walk pool - putbackTreeWalk(obj, listParams{bucket, recursive, marker, prefix}, twResultCh, endWalkCh) - - // Confirm that endWalkCh is closed on tree walk pool timer expiry - if _, open := <-endWalkCh; open { - t.Error("Expected tree walk endWalk channel to be closed, found to be open") - } - - // Drain the buffered channel result channel of entries that were pushed before - // it was signalled to abort. - for range twResultCh { - } - if _, open := <-twResultCh; open { - t.Error("Expected tree walk result channel to be closed, found to be open") + listDir := listDirFactory(func(volume, prefix string) bool { + return !strings.HasSuffix(prefix, slashSeparator) + }, disk) + // Simple test for prefix based walk. + testTreeWalkPrefix(t, listDir) + // Simple test when marker is set. + testTreeWalkMarker(t, listDir) + err = removeAll(fsDir) + if err != nil { + t.Fatal(err) } } -// Helper function to get a slice of disks depending on the backend -func getPhysicalDisks(obj ObjectLayer) []string { - switch typ := obj.(type) { - case fsObjects: - return []string{typ.physicalDisk} - case xlObjects: - return typ.physicalDisks +// Test if tree walk go-routine exits cleanly if tree walk is aborted because of timeout. +func TestTreeWalkTimeout(t *testing.T) { + fsDir, err := ioutil.TempDir("", "minio-") + if err != nil { + t.Errorf("Unable to create tmp directory: %s", err) } - return []string{} -} - -// Wrapper for testTreeWalkFailedDisks to run the unit test for both FS and XL backend. -func TestTreeWalkFailedDisks(t *testing.T) { - ExecObjectLayerTest(t, testTreeWalkFailedDisks) -} - -// Test if tree walk go routine exits cleanly when more than quorum number of disks fail -// in XL and the single disk in FS. -func testTreeWalkFailedDisks(obj ObjectLayer, instanceType string, t *testing.T) { - bucket := "abc" - objects := []string{ - "d/e", - "d/f", - "d/g/h", - "i/j/k", - "lmn", + disk, err := newStorageAPI(fsDir) + if err != nil { + t.Errorf("Unable to create StorageAPI: %s", err) } - - err := createObjNamespace(obj, bucket, objects) + var files []string + // Create maxObjectsList+1 number of entries. + for i := 0; i < maxObjectList+1; i++ { + files = append(files, fmt.Sprintf("file.%d", i)) + } + err = createNamespace(disk, volume, files) if err != nil { t.Fatal(err) } - // Simulate disk failures by removing the directories backing them - disks := getPhysicalDisks(obj) - switch obj.(type) { - case fsObjects: - removeDiskN(disks, 1) - case xlObjects: - removeDiskN(disks, len(disks)) - } + listDir := listDirFactory(func(volume, prefix string) bool { + return !strings.HasSuffix(prefix, slashSeparator) + }, disk) - // Start the tree walk go-routine. + // TreeWalk pool with 2 seconds timeout for tree-walk go routines. + pool := newTreeWalkPool(2 * time.Second) + + endWalkCh := make(chan struct{}) prefix := "" marker := "" recursive := true - endWalkCh := make(chan struct{}) - twResultCh := startTreeWalk(obj, bucket, prefix, marker, recursive, endWalkCh) + resultCh := startTreeWalk(volume, prefix, marker, recursive, listDir, endWalkCh) - if res := <-twResultCh; res.err.Error() != "disk not found" { - t.Error("Expected disk not found error") + params := listParams{ + bucket: volume, + recursive: recursive, + } + // Add Treewalk to the pool. + pool.Set(params, resultCh, endWalkCh) + + // Wait for the Treewalk to timeout. + <-time.After(3 * time.Second) + + // Read maxObjectList number of entries from the channel. + // maxObjectsList number of entries would have been filled into the resultCh + // buffered channel. After the timeout resultCh would get closed and hence the + // maxObjectsList+1 entry would not be sent in the channel. + i := 0 + for range resultCh { + i++ + if i == maxObjectList { + break + } } + // The last entry will not be received as the Treewalk goroutine would have exited. + _, ok := <-resultCh + if ok { + t.Error("Tree-walk go routine has not exited after timeout.") + } + err = removeAll(fsDir) + if err != nil { + t.Error(err) + } } -// FIXME: Test the abort timeout when the tree-walk go routine is 'parked' in -// the pool. Currently, we need to create objects greater than maxObjectList -// (== 1000) which would increase time to run the test. If (and when) we decide -// to make maxObjectList configurable we can re-evaluate adding a unit test for -// this. +// Test ListDir - listDir should list entries from the first disk, if the first disk is down, +// it should list from the next disk. +func TestListDir(t *testing.T) { + file1 := "file1" + file2 := "file2" + // Create two backend directories fsDir1 and fsDir2. + fsDir1, err := ioutil.TempDir("", "minio-") + if err != nil { + t.Errorf("Unable to create tmp directory: %s", err) + } + fsDir2, err := ioutil.TempDir("", "minio-") + if err != nil { + t.Errorf("Unable to create tmp directory: %s", err) + } + + // Create two StorageAPIs disk1 and disk2. + disk1, err := newStorageAPI(fsDir1) + if err != nil { + t.Errorf("Unable to create StorageAPI: %s", err) + } + disk2, err := newStorageAPI(fsDir2) + if err != nil { + t.Errorf("Unable to create StorageAPI: %s", err) + } + + // create listDir function. + listDir := listDirFactory(func(volume, prefix string) bool { + return !strings.HasSuffix(prefix, slashSeparator) + }, disk1, disk2) + + // Create file1 in fsDir1 and file2 in fsDir2. + disks := []StorageAPI{disk1, disk2} + for i, disk := range disks { + err = createNamespace(disk, volume, []string{fmt.Sprintf("file%d", i+1)}) + if err != nil { + t.Fatal(err) + } + } + + // Should list "file1" from fsDir1. + entries, err := listDir(volume, "", "") + if err != nil { + t.Error(err) + } + if len(entries) != 1 { + t.Fatal("Expected the number of entries to be 1") + } + if entries[0] != file1 { + t.Fatal("Expected the entry to be file1") + } + + // Remove fsDir1 to test failover. + err = removeAll(fsDir1) + if err != nil { + t.Error(err) + } + + // Should list "file2" from fsDir2. + entries, err = listDir(volume, "", "") + if err != nil { + t.Error(err) + } + if len(entries) != 1 { + t.Fatal("Expected the number of entries to be 1") + } + if entries[0] != file2 { + t.Fatal("Expected the entry to be file2") + } + err = removeAll(fsDir2) + if err != nil { + t.Error(err) + } + // None of the disks are available, should get errDiskNotFound. + entries, err = listDir(volume, "", "") + if err != errDiskNotFound { + t.Error("expected errDiskNotFound error.") + } +} diff --git a/xl-v1-list-objects.go b/xl-v1-list-objects.go index 8dfe13fd7..96172740e 100644 --- a/xl-v1-list-objects.go +++ b/xl-v1-list-objects.go @@ -29,7 +29,8 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix}) if walkResultCh == nil { endWalkCh = make(chan struct{}) - walkResultCh = xl.startTreeWalk(bucket, prefix, marker, recursive, xl.isObject, endWalkCh) + listDir := listDirFactory(xl.isObject, xl.getLoadBalancedQuorumDisks()...) + walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, endWalkCh) } var objInfos []ObjectInfo diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index b931aed0d..7e769da1a 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -84,7 +84,8 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark walkerCh, walkerDoneCh := xl.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) if walkerCh == nil { walkerDoneCh = make(chan struct{}) - walkerCh = xl.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, xl.isMultipartUpload, walkerDoneCh) + listDir := listDirFactory(xl.isMultipartUpload, xl.getLoadBalancedQuorumDisks()...) + walkerCh = startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, walkerDoneCh) } // Collect uploads until we have reached maxUploads count to 0. for maxUploads > 0 {