mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
tree-walk: unify FS and XL tree-walk with functional approach. (#2027)
This commit is contained in:
parent
a8a3e95835
commit
7a8b8cd0a1
@ -70,9 +70,8 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
|
||||
walkResultCh, endWalkCh := fs.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath})
|
||||
if walkResultCh == nil {
|
||||
endWalkCh = make(chan struct{})
|
||||
walkResultCh = fs.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, func(bucket, object string) bool {
|
||||
return fs.isMultipartUpload(bucket, object)
|
||||
}, endWalkCh)
|
||||
listDir := listDirFactory(fs.isMultipartUpload, fs.storage)
|
||||
walkResultCh = startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, endWalkCh)
|
||||
}
|
||||
for maxUploads > 0 {
|
||||
walkResult, ok := <-walkResultCh
|
||||
|
5
fs-v1.go
5
fs-v1.go
@ -468,9 +468,10 @@ func (fs fsObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
|
||||
walkResultCh, endWalkCh := fs.listPool.Release(listParams{bucket, recursive, marker, prefix})
|
||||
if walkResultCh == nil {
|
||||
endWalkCh = make(chan struct{})
|
||||
walkResultCh = fs.startTreeWalk(bucket, prefix, marker, recursive, func(bucket, object string) bool {
|
||||
listDir := listDirFactory(func(bucket, object string) bool {
|
||||
return !strings.HasSuffix(object, slashSeparator)
|
||||
}, endWalkCh)
|
||||
}, fs.storage)
|
||||
walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, endWalkCh)
|
||||
}
|
||||
var fileInfos []FileInfo
|
||||
var eof bool
|
||||
|
147
tree-walk-fs.go
147
tree-walk-fs.go
@ -1,147 +0,0 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// treeWalk walks FS directory tree recursively pushing fileInfo into the channel as and when it encounters files.
|
||||
func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error {
|
||||
// Example:
|
||||
// if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively
|
||||
// called with prefixDir="one/two/three/four/" and marker="five.txt"
|
||||
|
||||
var markerBase, markerDir string
|
||||
if marker != "" {
|
||||
// Ex: if marker="four/five.txt", markerDir="four/" markerBase="five.txt"
|
||||
markerSplit := strings.SplitN(marker, slashSeparator, 2)
|
||||
markerDir = markerSplit[0]
|
||||
if len(markerSplit) == 2 {
|
||||
markerDir += slashSeparator
|
||||
markerBase = markerSplit[1]
|
||||
}
|
||||
}
|
||||
entries, err := fs.storage.ListDir(bucket, prefixDir)
|
||||
if err != nil {
|
||||
select {
|
||||
case <-endWalkCh:
|
||||
return errWalkAbort
|
||||
case resultCh <- treeWalkResult{err: err}:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for i, entry := range entries {
|
||||
if entryPrefixMatch != "" {
|
||||
if !strings.HasPrefix(entry, entryPrefixMatch) {
|
||||
entries[i] = ""
|
||||
continue
|
||||
}
|
||||
}
|
||||
if isLeaf(bucket, pathJoin(prefixDir, entry)) {
|
||||
entries[i] = strings.TrimSuffix(entry, slashSeparator)
|
||||
}
|
||||
}
|
||||
sort.Strings(entries)
|
||||
// Skip the empty strings
|
||||
for len(entries) > 0 && entries[0] == "" {
|
||||
entries = entries[1:]
|
||||
}
|
||||
if len(entries) == 0 {
|
||||
return nil
|
||||
}
|
||||
// example:
|
||||
// If markerDir="four/" Search() returns the index of "four/" in the sorted
|
||||
// entries list so we skip all the entries till "four/"
|
||||
idx := sort.Search(len(entries), func(i int) bool {
|
||||
return entries[i] >= markerDir
|
||||
})
|
||||
entries = entries[idx:]
|
||||
for i, entry := range entries {
|
||||
if i == 0 && markerDir == entry {
|
||||
if !recursive {
|
||||
// Skip as the marker would already be listed in the previous listing.
|
||||
continue
|
||||
}
|
||||
if recursive && !strings.HasSuffix(entry, slashSeparator) {
|
||||
// We should not skip for recursive listing and if markerDir is a directory
|
||||
// for ex. if marker is "four/five.txt" markerDir will be "four/" which
|
||||
// should not be skipped, instead it will need to be treeWalk()'ed into.
|
||||
|
||||
// Skip if it is a file though as it would be listed in previous listing.
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if recursive && strings.HasSuffix(entry, slashSeparator) {
|
||||
// If the entry is a directory, we will need recurse into it.
|
||||
markerArg := ""
|
||||
if entry == markerDir {
|
||||
// We need to pass "five.txt" as marker only if we are
|
||||
// recursing into "four/"
|
||||
markerArg = markerBase
|
||||
}
|
||||
prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories.
|
||||
markIsEnd := i == len(entries)-1 && isEnd
|
||||
if tErr := fs.treeWalk(bucket, path.Join(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, resultCh, endWalkCh, markIsEnd); tErr != nil {
|
||||
return tErr
|
||||
}
|
||||
continue
|
||||
}
|
||||
// EOF is set if we are at last entry and the caller indicated we at the end.
|
||||
isEOF := ((i == len(entries)-1) && isEnd)
|
||||
select {
|
||||
case <-endWalkCh:
|
||||
return errWalkAbort
|
||||
case resultCh <- treeWalkResult{entry: pathJoin(prefixDir, entry), end: isEOF}:
|
||||
}
|
||||
}
|
||||
// Everything is listed
|
||||
return nil
|
||||
}
|
||||
|
||||
// Initiate a new treeWalk in a goroutine.
|
||||
func (fs fsObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool, endWalkCh chan struct{}) chan treeWalkResult {
|
||||
// Example 1
|
||||
// If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt"
|
||||
// treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt"
|
||||
// and entryPrefixMatch=""
|
||||
|
||||
// Example 2
|
||||
// if prefix is "one/two/th" and marker is "one/two/three/four/five.txt"
|
||||
// treeWalk is called with prefixDir="one/two/" and marker="three/four/five.txt"
|
||||
// and entryPrefixMatch="th"
|
||||
|
||||
resultCh := make(chan treeWalkResult, maxObjectList)
|
||||
entryPrefixMatch := prefix
|
||||
prefixDir := ""
|
||||
lastIndex := strings.LastIndex(prefix, slashSeparator)
|
||||
if lastIndex != -1 {
|
||||
entryPrefixMatch = prefix[lastIndex+1:]
|
||||
prefixDir = prefix[:lastIndex+1]
|
||||
}
|
||||
marker = strings.TrimPrefix(marker, prefixDir)
|
||||
go func() {
|
||||
isEnd := true // Indication to start walking the tree with end as true.
|
||||
fs.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, resultCh, endWalkCh, isEnd)
|
||||
close(resultCh)
|
||||
}()
|
||||
return resultCh
|
||||
}
|
@ -28,51 +28,60 @@ type treeWalkResult struct {
|
||||
end bool
|
||||
}
|
||||
|
||||
// listDir - lists all the entries at a given prefix, takes additional params as filter and leaf detection.
|
||||
// filter is required to filter out the listed entries usually this function is supposed to return
|
||||
// true or false.
|
||||
// isLeaf is required to differentiate between directories and objects, this is a special requirement for XL
|
||||
// backend since objects are kept as directories, the only way to know if a directory is truly an object
|
||||
// we validate if 'xl.json' exists at the leaf. isLeaf replies true/false based on the outcome of a Stat
|
||||
// operation.
|
||||
func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) bool, isLeaf func(string, string) bool) (entries []string, err error) {
|
||||
for _, disk := range xl.getLoadBalancedQuorumDisks() {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
entries, err = disk.ListDir(bucket, prefixDir)
|
||||
if err != nil {
|
||||
// For any reason disk was deleted or goes offline, continue
|
||||
// and list form other disks if possible.
|
||||
if err == errDiskNotFound || err == errFaultyDisk {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
// Skip the entries which do not match the filter.
|
||||
for i, entry := range entries {
|
||||
if !filter(entry) {
|
||||
entries[i] = ""
|
||||
continue
|
||||
}
|
||||
if strings.HasSuffix(entry, slashSeparator) && isLeaf(bucket, pathJoin(prefixDir, entry)) {
|
||||
entries[i] = strings.TrimSuffix(entry, slashSeparator)
|
||||
}
|
||||
}
|
||||
sort.Strings(entries)
|
||||
// Skip the empty strings
|
||||
for len(entries) > 0 && entries[0] == "" {
|
||||
entries = entries[1:]
|
||||
}
|
||||
return entries, nil
|
||||
}
|
||||
// "listDir" function of type listDirFunc returned by listDirFactory() - explained below.
|
||||
type listDirFunc func(bucket, prefixDir, prefixEntry string) (entries []string, err error)
|
||||
|
||||
// Return error at the end.
|
||||
return nil, err
|
||||
// Returns function "listDir" of the type listDirFunc.
|
||||
// isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry.
|
||||
// disks - used for doing disk.ListDir(). FS passes single disk argument, XL passes a list of disks.
|
||||
func listDirFactory(isLeaf func(string, string) bool, disks ...StorageAPI) listDirFunc {
|
||||
// listDir - lists all the entries at a given prefix and given entry in the prefix.
|
||||
// isLeaf is used to detect if an entry is a leaf entry. There are four scenarios where isLeaf
|
||||
// should behave differently:
|
||||
// 1. FS backend object listing - isLeaf is true if the entry has a trailing "/"
|
||||
// 2. FS backend multipart listing - isLeaf is true if the entry is a directory and contains uploads.json
|
||||
// 3. XL backend object listing - isLeaf is true if the entry is a directory and contains xl.json
|
||||
// 4. XL backend multipart listing - isLeaf is true if the entry is a directory and contains uploads.json
|
||||
listDir := func(bucket, prefixDir, prefixEntry string) (entries []string, err error) {
|
||||
for _, disk := range disks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
entries, err = disk.ListDir(bucket, prefixDir)
|
||||
if err != nil {
|
||||
// For any reason disk was deleted or goes offline, continue
|
||||
// and list from other disks if possible.
|
||||
if err == errDiskNotFound || err == errFaultyDisk {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
// Skip the entries which do not match the prefixEntry.
|
||||
for i, entry := range entries {
|
||||
if !strings.HasPrefix(entry, prefixEntry) {
|
||||
entries[i] = ""
|
||||
continue
|
||||
}
|
||||
if isLeaf(bucket, pathJoin(prefixDir, entry)) {
|
||||
entries[i] = strings.TrimSuffix(entry, slashSeparator)
|
||||
}
|
||||
}
|
||||
sort.Strings(entries)
|
||||
// Skip the empty strings
|
||||
for len(entries) > 0 && entries[0] == "" {
|
||||
entries = entries[1:]
|
||||
}
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
// Return error at the end.
|
||||
return nil, err
|
||||
}
|
||||
return listDir
|
||||
}
|
||||
|
||||
// treeWalk walks directory tree recursively pushing fileInfo into the channel as and when it encounters files.
|
||||
func (xl xlObjects) doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error {
|
||||
// treeWalk walks directory tree recursively pushing treeWalkResult into the channel as and when it encounters files.
|
||||
func doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir listDirFunc, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error {
|
||||
// Example:
|
||||
// if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively
|
||||
// called with prefixDir="one/two/three/four/" and marker="five.txt"
|
||||
@ -87,9 +96,7 @@ func (xl xlObjects) doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker strin
|
||||
markerBase = markerSplit[1]
|
||||
}
|
||||
}
|
||||
entries, err := xl.listDir(bucket, prefixDir, func(entry string) bool {
|
||||
return strings.HasPrefix(entry, entryPrefixMatch)
|
||||
}, isLeaf)
|
||||
entries, err := listDir(bucket, prefixDir, entryPrefixMatch)
|
||||
if err != nil {
|
||||
select {
|
||||
case <-endWalkCh:
|
||||
@ -141,7 +148,7 @@ func (xl xlObjects) doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker strin
|
||||
// markIsEnd is passed to this entry's treeWalk() so that treeWalker.end can be marked
|
||||
// true at the end of the treeWalk stream.
|
||||
markIsEnd := i == len(entries)-1 && isEnd
|
||||
if tErr := xl.doTreeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, resultCh, endWalkCh, markIsEnd); tErr != nil {
|
||||
if tErr := doTreeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, listDir, resultCh, endWalkCh, markIsEnd); tErr != nil {
|
||||
return tErr
|
||||
}
|
||||
continue
|
||||
@ -160,7 +167,7 @@ func (xl xlObjects) doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker strin
|
||||
}
|
||||
|
||||
// Initiate a new treeWalk in a goroutine.
|
||||
func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool, endWalkCh chan struct{}) chan treeWalkResult {
|
||||
func startTreeWalk(bucket, prefix, marker string, recursive bool, listDir listDirFunc, endWalkCh chan struct{}) chan treeWalkResult {
|
||||
// Example 1
|
||||
// If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt"
|
||||
// treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt"
|
||||
@ -182,7 +189,7 @@ func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool,
|
||||
marker = strings.TrimPrefix(marker, prefixDir)
|
||||
go func() {
|
||||
isEnd := true // Indication to start walking the tree with end as true.
|
||||
xl.doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, resultCh, endWalkCh, isEnd)
|
||||
doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, listDir, resultCh, endWalkCh, isEnd)
|
||||
close(resultCh)
|
||||
}()
|
||||
return resultCh
|
@ -17,43 +17,34 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"strconv"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Helper function that invokes startTreeWalk depending on the type implementing objectLayer.
|
||||
func startTreeWalk(obj ObjectLayer, bucket, prefix, marker string,
|
||||
recursive bool, endWalkCh chan struct{}) chan treeWalkResult {
|
||||
var twResultCh chan treeWalkResult
|
||||
switch typ := obj.(type) {
|
||||
case fsObjects:
|
||||
twResultCh = typ.startTreeWalk(bucket, prefix, marker, true,
|
||||
func(bucket, object string) bool {
|
||||
return !strings.HasSuffix(object, slashSeparator)
|
||||
}, endWalkCh)
|
||||
case xlObjects:
|
||||
twResultCh = typ.startTreeWalk(bucket, prefix, marker, true,
|
||||
typ.isObject, endWalkCh)
|
||||
}
|
||||
return twResultCh
|
||||
// Sample entries for the namespace.
|
||||
var volume = "testvolume"
|
||||
var files = []string{
|
||||
"d/e",
|
||||
"d/f",
|
||||
"d/g/h",
|
||||
"i/j/k",
|
||||
"lmn",
|
||||
}
|
||||
|
||||
// Helper function that creates a bucket, bucket and objects from objects []string.
|
||||
func createObjNamespace(obj ObjectLayer, bucket string, objects []string) error {
|
||||
// Make a bucket.
|
||||
var err error
|
||||
err = obj.MakeBucket(bucket)
|
||||
// Helper function that creates a volume and files in it.
|
||||
func createNamespace(disk StorageAPI, volume string, files []string) error {
|
||||
// Make a volume.
|
||||
err := disk.MakeVol(volume)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create objects.
|
||||
for _, object := range objects {
|
||||
_, err = obj.PutObject(bucket, object, int64(len("hello")),
|
||||
bytes.NewReader([]byte("hello")), nil)
|
||||
// Create files.
|
||||
for _, file := range files {
|
||||
err = disk.AppendFile(volume, file, []byte{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -61,32 +52,13 @@ func createObjNamespace(obj ObjectLayer, bucket string, objects []string) error
|
||||
return err
|
||||
}
|
||||
|
||||
// Wrapper for testTreeWalkPrefix to run the unit test for both FS and XL backend.
|
||||
func TestTreeWalkPrefix(t *testing.T) {
|
||||
ExecObjectLayerTest(t, testTreeWalkPrefix)
|
||||
}
|
||||
|
||||
// Test if tree walker returns entries matching prefix alone are received
|
||||
// when a non empty prefix is supplied.
|
||||
func testTreeWalkPrefix(obj ObjectLayer, instanceType string, t *testing.T) {
|
||||
bucket := "abc"
|
||||
objects := []string{
|
||||
"d/e",
|
||||
"d/f",
|
||||
"d/g/h",
|
||||
"i/j/k",
|
||||
"lmn",
|
||||
}
|
||||
|
||||
err := createObjNamespace(obj, bucket, objects)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
func testTreeWalkPrefix(t *testing.T, listDir listDirFunc) {
|
||||
// Start the tree walk go-routine.
|
||||
prefix := "d/"
|
||||
endWalkCh := make(chan struct{})
|
||||
twResultCh := startTreeWalk(obj, bucket, prefix, "", true, endWalkCh)
|
||||
twResultCh := startTreeWalk(volume, prefix, "", true, listDir, endWalkCh)
|
||||
|
||||
// Check if all entries received on the channel match the prefix.
|
||||
for res := range twResultCh {
|
||||
@ -96,31 +68,12 @@ func testTreeWalkPrefix(obj ObjectLayer, instanceType string, t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Wrapper for testTreeWalkMarker to run the unit test for both FS and XL backend.
|
||||
func TestTreeWalkMarker(t *testing.T) {
|
||||
ExecObjectLayerTest(t, testTreeWalkMarker)
|
||||
}
|
||||
|
||||
// Test if entries received on tree walk's channel appear after the supplied marker.
|
||||
func testTreeWalkMarker(obj ObjectLayer, instanceType string, t *testing.T) {
|
||||
bucket := "abc"
|
||||
objects := []string{
|
||||
"d/e",
|
||||
"d/f",
|
||||
"d/g/h",
|
||||
"i/j/k",
|
||||
"lmn",
|
||||
}
|
||||
|
||||
err := createObjNamespace(obj, bucket, objects)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
func testTreeWalkMarker(t *testing.T, listDir listDirFunc) {
|
||||
// Start the tree walk go-routine.
|
||||
prefix := ""
|
||||
endWalkCh := make(chan struct{})
|
||||
twResultCh := startTreeWalk(obj, bucket, prefix, "d/g", true, endWalkCh)
|
||||
twResultCh := startTreeWalk(volume, prefix, "d/g", true, listDir, endWalkCh)
|
||||
|
||||
// Check if only 3 entries, namely d/g/h, i/j/k, lmn are received on the channel.
|
||||
expectedCount := 3
|
||||
@ -131,142 +84,178 @@ func testTreeWalkMarker(obj ObjectLayer, instanceType string, t *testing.T) {
|
||||
if expectedCount != actualCount {
|
||||
t.Errorf("Expected %d entries, actual no. of entries were %d", expectedCount, actualCount)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Wrapper for testTreeWalkAbort to run the unit test for both FS and XL backend.
|
||||
func TestTreeWalkAbort(t *testing.T) {
|
||||
ExecObjectLayerTest(t, testTreeWalkAbort)
|
||||
}
|
||||
|
||||
// Extend treeWalk type to provide a method to reset timeout
|
||||
func (t *treeWalkPool) setTimeout(newTimeout time.Duration) {
|
||||
t.timeOut = newTimeout
|
||||
}
|
||||
|
||||
// Helper function to set treewalk (idle) timeout
|
||||
func setTimeout(obj ObjectLayer, newTimeout time.Duration) {
|
||||
switch typ := obj.(type) {
|
||||
case fsObjects:
|
||||
typ.listPool.setTimeout(newTimeout)
|
||||
case xlObjects:
|
||||
typ.listPool.setTimeout(newTimeout)
|
||||
|
||||
// Test tree-walk.
|
||||
func TestTreeWalk(t *testing.T) {
|
||||
fsDir, err := ioutil.TempDir("", "minio-")
|
||||
if err != nil {
|
||||
t.Errorf("Unable to create tmp directory: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to put the tree walk go-routine into the pool
|
||||
func putbackTreeWalk(obj ObjectLayer, params listParams, resultCh chan treeWalkResult, endWalkCh chan struct{}) {
|
||||
switch typ := obj.(type) {
|
||||
case fsObjects:
|
||||
typ.listPool.Set(params, resultCh, endWalkCh)
|
||||
case xlObjects:
|
||||
typ.listPool.Set(params, resultCh, endWalkCh)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// Test if tree walk go-routine exits cleanly if tree walk is aborted before compeletion.
|
||||
func testTreeWalkAbort(obj ObjectLayer, instanceType string, t *testing.T) {
|
||||
bucket := "abc"
|
||||
|
||||
var objects []string
|
||||
for i := 0; i < 1001; i++ {
|
||||
objects = append(objects, "obj"+strconv.Itoa(i))
|
||||
disk, err := newStorageAPI(fsDir)
|
||||
if err != nil {
|
||||
t.Errorf("Unable to create StorageAPI: %s", err)
|
||||
}
|
||||
|
||||
err := createObjNamespace(obj, bucket, objects)
|
||||
err = createNamespace(disk, volume, files)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Set treewalk pool timeout to be test friendly
|
||||
setTimeout(obj, 2*time.Second)
|
||||
|
||||
// Start the tree walk go-routine.
|
||||
prefix := ""
|
||||
marker := ""
|
||||
recursive := true
|
||||
endWalkCh := make(chan struct{})
|
||||
twResultCh := startTreeWalk(obj, bucket, prefix, marker, recursive, endWalkCh)
|
||||
|
||||
// Pull one result entry from the tree walk result channel.
|
||||
<-twResultCh
|
||||
|
||||
// Put the treewalk go-routine into tree walk pool
|
||||
putbackTreeWalk(obj, listParams{bucket, recursive, marker, prefix}, twResultCh, endWalkCh)
|
||||
|
||||
// Confirm that endWalkCh is closed on tree walk pool timer expiry
|
||||
if _, open := <-endWalkCh; open {
|
||||
t.Error("Expected tree walk endWalk channel to be closed, found to be open")
|
||||
}
|
||||
|
||||
// Drain the buffered channel result channel of entries that were pushed before
|
||||
// it was signalled to abort.
|
||||
for range twResultCh {
|
||||
}
|
||||
if _, open := <-twResultCh; open {
|
||||
t.Error("Expected tree walk result channel to be closed, found to be open")
|
||||
listDir := listDirFactory(func(volume, prefix string) bool {
|
||||
return !strings.HasSuffix(prefix, slashSeparator)
|
||||
}, disk)
|
||||
// Simple test for prefix based walk.
|
||||
testTreeWalkPrefix(t, listDir)
|
||||
// Simple test when marker is set.
|
||||
testTreeWalkMarker(t, listDir)
|
||||
err = removeAll(fsDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to get a slice of disks depending on the backend
|
||||
func getPhysicalDisks(obj ObjectLayer) []string {
|
||||
switch typ := obj.(type) {
|
||||
case fsObjects:
|
||||
return []string{typ.physicalDisk}
|
||||
case xlObjects:
|
||||
return typ.physicalDisks
|
||||
// Test if tree walk go-routine exits cleanly if tree walk is aborted because of timeout.
|
||||
func TestTreeWalkTimeout(t *testing.T) {
|
||||
fsDir, err := ioutil.TempDir("", "minio-")
|
||||
if err != nil {
|
||||
t.Errorf("Unable to create tmp directory: %s", err)
|
||||
}
|
||||
return []string{}
|
||||
}
|
||||
|
||||
// Wrapper for testTreeWalkFailedDisks to run the unit test for both FS and XL backend.
|
||||
func TestTreeWalkFailedDisks(t *testing.T) {
|
||||
ExecObjectLayerTest(t, testTreeWalkFailedDisks)
|
||||
}
|
||||
|
||||
// Test if tree walk go routine exits cleanly when more than quorum number of disks fail
|
||||
// in XL and the single disk in FS.
|
||||
func testTreeWalkFailedDisks(obj ObjectLayer, instanceType string, t *testing.T) {
|
||||
bucket := "abc"
|
||||
objects := []string{
|
||||
"d/e",
|
||||
"d/f",
|
||||
"d/g/h",
|
||||
"i/j/k",
|
||||
"lmn",
|
||||
disk, err := newStorageAPI(fsDir)
|
||||
if err != nil {
|
||||
t.Errorf("Unable to create StorageAPI: %s", err)
|
||||
}
|
||||
|
||||
err := createObjNamespace(obj, bucket, objects)
|
||||
var files []string
|
||||
// Create maxObjectsList+1 number of entries.
|
||||
for i := 0; i < maxObjectList+1; i++ {
|
||||
files = append(files, fmt.Sprintf("file.%d", i))
|
||||
}
|
||||
err = createNamespace(disk, volume, files)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Simulate disk failures by removing the directories backing them
|
||||
disks := getPhysicalDisks(obj)
|
||||
switch obj.(type) {
|
||||
case fsObjects:
|
||||
removeDiskN(disks, 1)
|
||||
case xlObjects:
|
||||
removeDiskN(disks, len(disks))
|
||||
}
|
||||
listDir := listDirFactory(func(volume, prefix string) bool {
|
||||
return !strings.HasSuffix(prefix, slashSeparator)
|
||||
}, disk)
|
||||
|
||||
// Start the tree walk go-routine.
|
||||
// TreeWalk pool with 2 seconds timeout for tree-walk go routines.
|
||||
pool := newTreeWalkPool(2 * time.Second)
|
||||
|
||||
endWalkCh := make(chan struct{})
|
||||
prefix := ""
|
||||
marker := ""
|
||||
recursive := true
|
||||
endWalkCh := make(chan struct{})
|
||||
twResultCh := startTreeWalk(obj, bucket, prefix, marker, recursive, endWalkCh)
|
||||
resultCh := startTreeWalk(volume, prefix, marker, recursive, listDir, endWalkCh)
|
||||
|
||||
if res := <-twResultCh; res.err.Error() != "disk not found" {
|
||||
t.Error("Expected disk not found error")
|
||||
params := listParams{
|
||||
bucket: volume,
|
||||
recursive: recursive,
|
||||
}
|
||||
// Add Treewalk to the pool.
|
||||
pool.Set(params, resultCh, endWalkCh)
|
||||
|
||||
// Wait for the Treewalk to timeout.
|
||||
<-time.After(3 * time.Second)
|
||||
|
||||
// Read maxObjectList number of entries from the channel.
|
||||
// maxObjectsList number of entries would have been filled into the resultCh
|
||||
// buffered channel. After the timeout resultCh would get closed and hence the
|
||||
// maxObjectsList+1 entry would not be sent in the channel.
|
||||
i := 0
|
||||
for range resultCh {
|
||||
i++
|
||||
if i == maxObjectList {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// The last entry will not be received as the Treewalk goroutine would have exited.
|
||||
_, ok := <-resultCh
|
||||
if ok {
|
||||
t.Error("Tree-walk go routine has not exited after timeout.")
|
||||
}
|
||||
err = removeAll(fsDir)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: Test the abort timeout when the tree-walk go routine is 'parked' in
|
||||
// the pool. Currently, we need to create objects greater than maxObjectList
|
||||
// (== 1000) which would increase time to run the test. If (and when) we decide
|
||||
// to make maxObjectList configurable we can re-evaluate adding a unit test for
|
||||
// this.
|
||||
// Test ListDir - listDir should list entries from the first disk, if the first disk is down,
|
||||
// it should list from the next disk.
|
||||
func TestListDir(t *testing.T) {
|
||||
file1 := "file1"
|
||||
file2 := "file2"
|
||||
// Create two backend directories fsDir1 and fsDir2.
|
||||
fsDir1, err := ioutil.TempDir("", "minio-")
|
||||
if err != nil {
|
||||
t.Errorf("Unable to create tmp directory: %s", err)
|
||||
}
|
||||
fsDir2, err := ioutil.TempDir("", "minio-")
|
||||
if err != nil {
|
||||
t.Errorf("Unable to create tmp directory: %s", err)
|
||||
}
|
||||
|
||||
// Create two StorageAPIs disk1 and disk2.
|
||||
disk1, err := newStorageAPI(fsDir1)
|
||||
if err != nil {
|
||||
t.Errorf("Unable to create StorageAPI: %s", err)
|
||||
}
|
||||
disk2, err := newStorageAPI(fsDir2)
|
||||
if err != nil {
|
||||
t.Errorf("Unable to create StorageAPI: %s", err)
|
||||
}
|
||||
|
||||
// create listDir function.
|
||||
listDir := listDirFactory(func(volume, prefix string) bool {
|
||||
return !strings.HasSuffix(prefix, slashSeparator)
|
||||
}, disk1, disk2)
|
||||
|
||||
// Create file1 in fsDir1 and file2 in fsDir2.
|
||||
disks := []StorageAPI{disk1, disk2}
|
||||
for i, disk := range disks {
|
||||
err = createNamespace(disk, volume, []string{fmt.Sprintf("file%d", i+1)})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Should list "file1" from fsDir1.
|
||||
entries, err := listDir(volume, "", "")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if len(entries) != 1 {
|
||||
t.Fatal("Expected the number of entries to be 1")
|
||||
}
|
||||
if entries[0] != file1 {
|
||||
t.Fatal("Expected the entry to be file1")
|
||||
}
|
||||
|
||||
// Remove fsDir1 to test failover.
|
||||
err = removeAll(fsDir1)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// Should list "file2" from fsDir2.
|
||||
entries, err = listDir(volume, "", "")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if len(entries) != 1 {
|
||||
t.Fatal("Expected the number of entries to be 1")
|
||||
}
|
||||
if entries[0] != file2 {
|
||||
t.Fatal("Expected the entry to be file2")
|
||||
}
|
||||
err = removeAll(fsDir2)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
// None of the disks are available, should get errDiskNotFound.
|
||||
entries, err = listDir(volume, "", "")
|
||||
if err != errDiskNotFound {
|
||||
t.Error("expected errDiskNotFound error.")
|
||||
}
|
||||
}
|
||||
|
@ -29,7 +29,8 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
|
||||
walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix})
|
||||
if walkResultCh == nil {
|
||||
endWalkCh = make(chan struct{})
|
||||
walkResultCh = xl.startTreeWalk(bucket, prefix, marker, recursive, xl.isObject, endWalkCh)
|
||||
listDir := listDirFactory(xl.isObject, xl.getLoadBalancedQuorumDisks()...)
|
||||
walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, endWalkCh)
|
||||
}
|
||||
|
||||
var objInfos []ObjectInfo
|
||||
|
@ -84,7 +84,8 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
|
||||
walkerCh, walkerDoneCh := xl.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath})
|
||||
if walkerCh == nil {
|
||||
walkerDoneCh = make(chan struct{})
|
||||
walkerCh = xl.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, xl.isMultipartUpload, walkerDoneCh)
|
||||
listDir := listDirFactory(xl.isMultipartUpload, xl.getLoadBalancedQuorumDisks()...)
|
||||
walkerCh = startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, walkerDoneCh)
|
||||
}
|
||||
// Collect uploads until we have reached maxUploads count to 0.
|
||||
for maxUploads > 0 {
|
||||
|
Loading…
Reference in New Issue
Block a user