tree-walk: optimize tree walk such that leaf detection of entries is delayed till the entry is sent on the treeWalkResult channel. (#2220)

This commit is contained in:
Krishna Srinivas 2016-07-18 03:46:52 +05:30 committed by Harshavardhana
parent aeac902747
commit 27a5b61f40
6 changed files with 224 additions and 60 deletions

View File

@ -72,8 +72,9 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
walkResultCh, endWalkCh = fs.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath})
if walkResultCh == nil {
endWalkCh = make(chan struct{})
listDir := listDirFactory(fs.isMultipartUpload, fs.storage)
walkResultCh = startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, endWalkCh)
isLeaf := fs.isMultipartUpload
listDir := listDirFactory(isLeaf, fs.storage)
walkResultCh = startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, isLeaf, endWalkCh)
}
for maxUploads > 0 {
walkResult, ok := <-walkResultCh

View File

@ -532,10 +532,14 @@ func (fs fsObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
walkResultCh, endWalkCh := fs.listPool.Release(listParams{bucket, recursive, marker, prefix})
if walkResultCh == nil {
endWalkCh = make(chan struct{})
listDir := listDirFactory(func(bucket, object string) bool {
isLeaf := func(bucket, object string) bool {
// bucket argument is unused as we don't need to StatFile
// to figure if it's a file, just need to check that the
// object string does not end with "/".
return !strings.HasSuffix(object, slashSeparator)
}, fs.storage)
walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, endWalkCh)
}
listDir := listDirFactory(isLeaf, fs.storage)
walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh)
}
var fileInfos []FileInfo
var eof bool

View File

@ -37,43 +37,108 @@ type treeWalkResult struct {
end bool
}
// "listDir" function of type listDirFunc returned by listDirFactory() - explained below.
type listDirFunc func(bucket, prefixDir, prefixEntry string) (entries []string, err error)
// posix.ListDir returns entries with trailing "/" for directories. At the object layer
// we need to remove this trailing "/" for objects and retain "/" for prefixes before
// sorting because the trailing "/" can affect the sorting results for certain cases.
// Ex. lets say entries = ["a-b/", "a/"] and both are objects.
// sorting with out trailing "/" = ["a", "a-b"]
// sorting with trailing "/" = ["a-b/", "a/"]
// Hence if entries[] does not have a case like the above example then isLeaf() check
// can be delayed till the entry is pushed into the treeWalkResult channel.
// delayIsLeafCheck() returns true if isLeaf can be delayed or false if
// isLeaf should be done in listDir()
func delayIsLeafCheck(entries []string) bool {
for i, entry := range entries {
if i == len(entries)-1 {
break
}
// If any byte in the "entry" string is less than '/' then the
// next "entry" should not contain '/' at the same same byte position.
for j := 0; j < len(entry); j++ {
if entry[j] < '/' {
if len(entries[i+1]) > j {
if entries[i+1][j] == '/' {
return false
}
}
}
}
}
return true
}
// Returns function "listDir" of the type listDirFunc.
// isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry.
// disks - used for doing disk.ListDir(). FS passes single disk argument, XL passes a list of disks.
func listDirFactory(isLeaf func(string, string) bool, disks ...StorageAPI) listDirFunc {
// listDir - lists all the entries at a given prefix and given entry in the prefix.
// isLeaf is used to detect if an entry is a leaf entry. There are four scenarios where isLeaf
// should behave differently:
// Return entries that have prefix prefixEntry.
// Note: input entries are expected to be sorted.
func filterMatchingPrefix(entries []string, prefixEntry string) []string {
start := 0
end := len(entries)
for {
if start == end {
break
}
if strings.HasPrefix(entries[start], prefixEntry) {
break
}
start++
}
for {
if start == end {
break
}
if strings.HasPrefix(entries[end-1], prefixEntry) {
break
}
end--
}
return entries[start:end]
}
// "listDir" function of type listDirFunc returned by listDirFactory() - explained below.
type listDirFunc func(bucket, prefixDir, prefixEntry string) (entries []string, delayIsLeaf bool, err error)
// A function isLeaf of type isLeafFunc is used to detect if an entry is a leaf entry. There are four scenarios
// where isLeaf should behave differently:
// 1. FS backend object listing - isLeaf is true if the entry has a trailing "/"
// 2. FS backend multipart listing - isLeaf is true if the entry is a directory and contains uploads.json
// 3. XL backend object listing - isLeaf is true if the entry is a directory and contains xl.json
// 4. XL backend multipart listing - isLeaf is true if the entry is a directory and contains uploads.json
listDir := func(bucket, prefixDir, prefixEntry string) (entries []string, err error) {
type isLeafFunc func(string, string) bool
// Returns function "listDir" of the type listDirFunc.
// isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry.
// disks - used for doing disk.ListDir(). FS passes single disk argument, XL passes a list of disks.
func listDirFactory(isLeaf isLeafFunc, disks ...StorageAPI) listDirFunc {
// listDir - lists all the entries at a given prefix and given entry in the prefix.
listDir := func(bucket, prefixDir, prefixEntry string) (entries []string, delayIsLeaf bool, err error) {
for _, disk := range disks {
if disk == nil {
continue
}
entries, err = disk.ListDir(bucket, prefixDir)
if err == nil {
// Skip the entries which do not match the prefixEntry.
for i, entry := range entries {
if !strings.HasPrefix(entry, prefixEntry) {
entries[i] = ""
continue
// Listing needs to be sorted.
sort.Strings(entries)
// Filter entries that have the prefix prefixEntry.
entries = filterMatchingPrefix(entries, prefixEntry)
// Can isLeaf() check be delayed till when it has to be sent down the
// treeWalkResult channel?
delayIsLeaf = delayIsLeafCheck(entries)
if delayIsLeaf {
return entries, delayIsLeaf, nil
}
// isLeaf() check has to happen here so that trailing "/" for objects can be removed.
for i, entry := range entries {
if isLeaf(bucket, pathJoin(prefixDir, entry)) {
entries[i] = strings.TrimSuffix(entry, slashSeparator)
}
}
// Sort again after removing trailing "/" for objects as the previous sort
// does not hold good anymore.
sort.Strings(entries)
// Skip the empty strings
for len(entries) > 0 && entries[0] == "" {
entries = entries[1:]
}
return entries, nil
return entries, delayIsLeaf, nil
}
// For any reason disk was deleted or goes offline, continue
// and list from other disks if possible.
@ -83,13 +148,13 @@ func listDirFactory(isLeaf func(string, string) bool, disks ...StorageAPI) listD
break
}
// Return error at the end.
return nil, err
return nil, false, err
}
return listDir
}
// treeWalk walks directory tree recursively pushing treeWalkResult into the channel as and when it encounters files.
func doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir listDirFunc, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error {
func doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir listDirFunc, isLeaf isLeafFunc, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error {
// Example:
// if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively
// called with prefixDir="one/two/three/four/" and marker="five.txt"
@ -104,7 +169,7 @@ func doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bo
markerBase = markerSplit[1]
}
}
entries, err := listDir(bucket, prefixDir, entryPrefixMatch)
entries, delayIsLeaf, err := listDir(bucket, prefixDir, entryPrefixMatch)
if err != nil {
select {
case <-endWalkCh:
@ -130,6 +195,11 @@ func doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bo
return nil
}
for i, entry := range entries {
// Decision to do isLeaf check was pushed from listDir() to here.
if delayIsLeaf && isLeaf(bucket, pathJoin(prefixDir, entry)) {
entry = strings.TrimSuffix(entry, slashSeparator)
}
if i == 0 && markerDir == entry {
if !recursive {
// Skip as the marker would already be listed in the previous listing.
@ -156,7 +226,7 @@ func doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bo
// markIsEnd is passed to this entry's treeWalk() so that treeWalker.end can be marked
// true at the end of the treeWalk stream.
markIsEnd := i == len(entries)-1 && isEnd
if tErr := doTreeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, listDir, resultCh, endWalkCh, markIsEnd); tErr != nil {
if tErr := doTreeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, listDir, isLeaf, resultCh, endWalkCh, markIsEnd); tErr != nil {
return tErr
}
continue
@ -175,7 +245,7 @@ func doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bo
}
// Initiate a new treeWalk in a goroutine.
func startTreeWalk(bucket, prefix, marker string, recursive bool, listDir listDirFunc, endWalkCh chan struct{}) chan treeWalkResult {
func startTreeWalk(bucket, prefix, marker string, recursive bool, listDir listDirFunc, isLeaf isLeafFunc, endWalkCh chan struct{}) chan treeWalkResult {
// Example 1
// If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt"
// treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt"
@ -197,7 +267,7 @@ func startTreeWalk(bucket, prefix, marker string, recursive bool, listDir listDi
marker = strings.TrimPrefix(marker, prefixDir)
go func() {
isEnd := true // Indication to start walking the tree with end as true.
doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, listDir, resultCh, endWalkCh, isEnd)
doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, listDir, isLeaf, resultCh, endWalkCh, isEnd)
close(resultCh)
}()
return resultCh

View File

@ -19,6 +19,7 @@ package main
import (
"fmt"
"io/ioutil"
"reflect"
"sort"
"strings"
"testing"
@ -28,6 +29,84 @@ import (
// Fixed volume name that could be used across tests
const volume = "testvolume"
// Test for delayIsLeafCheck.
func TestDelayIsLeafCheck(t *testing.T) {
testCases := []struct {
entries []string
delay bool
}{
// Test cases where isLeaf check can't be delayed.
{
[]string{"a-b/", "a/"},
false,
},
{
[]string{"a%b/", "a/"},
false,
},
{
[]string{"a-b-c", "a-b/"},
false,
},
// Test cases where isLeaf check can be delayed.
{
[]string{"a-b/", "aa/"},
true,
},
{
[]string{"a", "a-b"},
true,
},
{
[]string{"aaa", "bbb"},
true,
},
}
for i, testCase := range testCases {
expected := testCase.delay
got := delayIsLeafCheck(testCase.entries)
if expected != got {
t.Errorf("Test %d : Expected %t got %t", i+1, expected, got)
}
}
}
// Test for filterMatchingPrefix.
func TestFilterMatchingPrefix(t *testing.T) {
entries := []string{"a", "aab", "ab", "abbbb", "zzz"}
testCases := []struct {
prefixEntry string
result []string
}{
{
// Empty prefix should match all entries.
"",
[]string{"a", "aab", "ab", "abbbb", "zzz"},
},
{
"a",
[]string{"a", "aab", "ab", "abbbb"},
},
{
"aa",
[]string{"aab"},
},
{
// Does not match any of the entries.
"c",
[]string{},
},
}
for i, testCase := range testCases {
expected := testCase.result
got := filterMatchingPrefix(entries, testCase.prefixEntry)
if !reflect.DeepEqual(expected, got) {
t.Errorf("Test %d : expected %v, got %v", i+1, expected, got)
}
}
}
// Helper function that creates a volume and files in it.
func createNamespace(disk StorageAPI, volume string, files []string) error {
// Make a volume.
@ -48,11 +127,11 @@ func createNamespace(disk StorageAPI, volume string, files []string) error {
// Test if tree walker returns entries matching prefix alone are received
// when a non empty prefix is supplied.
func testTreeWalkPrefix(t *testing.T, listDir listDirFunc) {
func testTreeWalkPrefix(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc) {
// Start the tree walk go-routine.
prefix := "d/"
endWalkCh := make(chan struct{})
twResultCh := startTreeWalk(volume, prefix, "", true, listDir, endWalkCh)
twResultCh := startTreeWalk(volume, prefix, "", true, listDir, isLeaf, endWalkCh)
// Check if all entries received on the channel match the prefix.
for res := range twResultCh {
@ -63,11 +142,11 @@ func testTreeWalkPrefix(t *testing.T, listDir listDirFunc) {
}
// Test if entries received on tree walk's channel appear after the supplied marker.
func testTreeWalkMarker(t *testing.T, listDir listDirFunc) {
func testTreeWalkMarker(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc) {
// Start the tree walk go-routine.
prefix := ""
endWalkCh := make(chan struct{})
twResultCh := startTreeWalk(volume, prefix, "d/g", true, listDir, endWalkCh)
twResultCh := startTreeWalk(volume, prefix, "d/g", true, listDir, isLeaf, endWalkCh)
// Check if only 3 entries, namely d/g/h, i/j/k, lmn are received on the channel.
expectedCount := 3
@ -103,13 +182,14 @@ func TestTreeWalk(t *testing.T) {
t.Fatal(err)
}
listDir := listDirFactory(func(volume, prefix string) bool {
isLeaf := func(volume, prefix string) bool {
return !strings.HasSuffix(prefix, slashSeparator)
}, disk)
}
listDir := listDirFactory(isLeaf, disk)
// Simple test for prefix based walk.
testTreeWalkPrefix(t, listDir)
testTreeWalkPrefix(t, listDir, isLeaf)
// Simple test when marker is set.
testTreeWalkMarker(t, listDir)
testTreeWalkMarker(t, listDir, isLeaf)
err = removeAll(fsDir)
if err != nil {
t.Fatal(err)
@ -136,9 +216,10 @@ func TestTreeWalkTimeout(t *testing.T) {
t.Fatal(err)
}
listDir := listDirFactory(func(volume, prefix string) bool {
isLeaf := func(volume, prefix string) bool {
return !strings.HasSuffix(prefix, slashSeparator)
}, disk)
}
listDir := listDirFactory(isLeaf, disk)
// TreeWalk pool with 2 seconds timeout for tree-walk go routines.
pool := newTreeWalkPool(2 * time.Second)
@ -147,7 +228,7 @@ func TestTreeWalkTimeout(t *testing.T) {
prefix := ""
marker := ""
recursive := true
resultCh := startTreeWalk(volume, prefix, marker, recursive, listDir, endWalkCh)
resultCh := startTreeWalk(volume, prefix, marker, recursive, listDir, isLeaf, endWalkCh)
params := listParams{
bucket: volume,
@ -222,7 +303,7 @@ func TestListDir(t *testing.T) {
}
// Should list "file1" from fsDir1.
entries, err := listDir(volume, "", "")
entries, _, err := listDir(volume, "", "")
if err != nil {
t.Error(err)
}
@ -240,7 +321,7 @@ func TestListDir(t *testing.T) {
}
// Should list "file2" from fsDir2.
entries, err = listDir(volume, "", "")
entries, _, err = listDir(volume, "", "")
if err != nil {
t.Error(err)
}
@ -255,7 +336,7 @@ func TestListDir(t *testing.T) {
t.Error(err)
}
// None of the disks are available, should get errDiskNotFound.
entries, err = listDir(volume, "", "")
entries, _, err = listDir(volume, "", "")
if err != errDiskNotFound {
t.Error("expected errDiskNotFound error.")
}
@ -276,10 +357,13 @@ func TestRecursiveTreeWalk(t *testing.T) {
t.Errorf("Unable to create StorageAPI: %s", err)
}
// Create listDir function.
listDir := listDirFactory(func(volume, prefix string) bool {
// Simple isLeaf check, returns true if there is no trailing "/"
isLeaf := func(volume, prefix string) bool {
return !strings.HasSuffix(prefix, slashSeparator)
}, disk1)
}
// Create listDir function.
listDir := listDirFactory(isLeaf, disk1)
// Create the namespace.
var files = []string{
@ -355,7 +439,7 @@ func TestRecursiveTreeWalk(t *testing.T) {
for i, testCase := range testCases {
for entry := range startTreeWalk(volume,
testCase.prefix, testCase.marker, testCase.recursive,
listDir, endWalkCh) {
listDir, isLeaf, endWalkCh) {
if _, found := testCase.expected[entry.entry]; !found {
t.Errorf("Test %d: Expected %s, but couldn't find", i+1, entry.entry)
}
@ -380,10 +464,12 @@ func TestSortedness(t *testing.T) {
t.Errorf("Unable to create StorageAPI: %s", err)
}
// Create listDir function.
listDir := listDirFactory(func(volume, prefix string) bool {
// Simple isLeaf check, returns true if there is no trailing "/"
isLeaf := func(volume, prefix string) bool {
return !strings.HasSuffix(prefix, slashSeparator)
}, disk1)
}
// Create listDir function.
listDir := listDirFactory(isLeaf, disk1)
// Create the namespace.
var files = []string{
@ -425,7 +511,7 @@ func TestSortedness(t *testing.T) {
var actualEntries []string
for entry := range startTreeWalk(volume,
test.prefix, test.marker, test.recursive,
listDir, endWalkCh) {
listDir, isLeaf, endWalkCh) {
actualEntries = append(actualEntries, entry.entry)
}
if !sort.IsSorted(sort.StringSlice(actualEntries)) {
@ -453,10 +539,11 @@ func TestTreeWalkIsEnd(t *testing.T) {
t.Errorf("Unable to create StorageAPI: %s", err)
}
// Create listDir function.
listDir := listDirFactory(func(volume, prefix string) bool {
isLeaf := func(volume, prefix string) bool {
return !strings.HasSuffix(prefix, slashSeparator)
}, disk1)
}
// Create listDir function.
listDir := listDirFactory(isLeaf, disk1)
// Create the namespace.
var files = []string{
@ -497,7 +584,7 @@ func TestTreeWalkIsEnd(t *testing.T) {
}
for i, test := range testCases {
var entry treeWalkResult
for entry = range startTreeWalk(volume, test.prefix, test.marker, test.recursive, listDir, endWalkCh) {
for entry = range startTreeWalk(volume, test.prefix, test.marker, test.recursive, listDir, isLeaf, endWalkCh) {
}
if entry.entry != test.expectedEntry {
t.Errorf("Test %d: Expected entry %s, but received %s with the EOF marker", i, test.expectedEntry, entry.entry)

View File

@ -29,8 +29,9 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix})
if walkResultCh == nil {
endWalkCh = make(chan struct{})
listDir := listDirFactory(xl.isObject, xl.getLoadBalancedQuorumDisks()...)
walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, endWalkCh)
isLeaf := xl.isObject
listDir := listDirFactory(isLeaf, xl.getLoadBalancedQuorumDisks()...)
walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh)
}
var objInfos []ObjectInfo

View File

@ -90,8 +90,9 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
walkerCh, walkerDoneCh = xl.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath})
if walkerCh == nil {
walkerDoneCh = make(chan struct{})
listDir := listDirFactory(xl.isMultipartUpload, xl.getLoadBalancedQuorumDisks()...)
walkerCh = startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, walkerDoneCh)
isLeaf := xl.isMultipartUpload
listDir := listDirFactory(isLeaf, xl.getLoadBalancedQuorumDisks()...)
walkerCh = startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, isLeaf, walkerDoneCh)
}
// Collect uploads until we have reached maxUploads count to 0.
for maxUploads > 0 {