From 247e835d7bb01d3592765946e8296b494a0aa846 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Fri, 6 May 2016 01:21:56 +0530 Subject: [PATCH] object: move go-routine listing from posix to objectLayer. (#1491) --- fs-objects-multipart.go | 5 +- fs-objects.go | 83 ++++- object-api-listobjects_test.go | 13 +- object-common-multipart.go | 295 +++++++++--------- posix-dir-common.go | 228 -------------- posix-dir-nix.go => posix-list-dir-nix.go | 89 +++--- ...-dir-others.go => posix-list-dir-others.go | 48 ++- posix.go | 208 ++---------- rpc-client.go | 51 ++- rpc-server-datatypes.go | 21 +- rpc-server.go | 36 +-- storage-api-interface.go | 2 +- tree-walk.go | 284 +++++++++++++++++ xl-erasure-v1.go | 262 ++-------------- xl-objects-multipart.go | 2 +- xl-objects.go | 147 ++++++--- 16 files changed, 767 insertions(+), 1007 deletions(-) delete mode 100644 posix-dir-common.go rename posix-dir-nix.go => posix-list-dir-nix.go (60%) rename posix-dir-others.go => posix-list-dir-others.go (55%) create mode 100644 tree-walk.go diff --git a/fs-objects-multipart.go b/fs-objects-multipart.go index 2c714b8fd..f5387a948 100644 --- a/fs-objects-multipart.go +++ b/fs-objects-multipart.go @@ -24,7 +24,7 @@ import ( // ListMultipartUploads - list multipart uploads. func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { - return listMultipartUploadsCommon(fs.storage, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) + return listMultipartUploadsCommon(fs, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) } // NewMultipartUpload - initialize a new multipart upload, returns a unique id. @@ -100,8 +100,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload } // Cleanup all the parts. - recursive := false - if err = cleanupUploadedParts(fs.storage, mpartMetaPrefix, bucket, object, uploadID, recursive); err != nil { + if err = cleanupUploadedParts(fs.storage, mpartMetaPrefix, bucket, object, uploadID); err != nil { return "", err } diff --git a/fs-objects.go b/fs-objects.go index f10641e96..77369d170 100644 --- a/fs-objects.go +++ b/fs-objects.go @@ -20,15 +20,20 @@ import ( "io" "path/filepath" "strings" + "sync" + "github.com/Sirupsen/logrus" "github.com/minio/minio/pkg/mimedb" ) // fsObjects - Implements fs object layer. type fsObjects struct { - storage StorageAPI + storage StorageAPI + listObjectMap map[listParams][]*treeWalker + listObjectMapMutex *sync.Mutex } +// FIXME: constructor should return a pointer. // newFSObjects - initialize new fs object layer. func newFSObjects(exportPath string) (ObjectLayer, error) { var storage StorageAPI @@ -51,7 +56,11 @@ func newFSObjects(exportPath string) (ObjectLayer, error) { cleanupAllTmpEntries(storage) // Return successfully initialized object layer. - return fsObjects{storage}, nil + return fsObjects{ + storage: storage, + listObjectMap: make(map[listParams][]*treeWalker), + listObjectMapMutex: &sync.Mutex{}, + }, nil } /// Bucket operations @@ -151,6 +160,12 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey if !IsValidBucketName(bucket) { return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket} } + // Verify whether the bucket exists. + if isExist, err := isBucketExist(fs.storage, bucket); err != nil { + return ListObjectsInfo{}, err + } else if !isExist { + return ListObjectsInfo{}, BucketNotFound{Bucket: bucket} + } if !IsValidObjectPrefix(prefix) { return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix} } @@ -170,17 +185,71 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey } } + if maxKeys == 0 { + return ListObjectsInfo{}, nil + } + + // Over flowing count - reset to maxObjectList. + if maxKeys < 0 || maxKeys > maxObjectList { + maxKeys = maxObjectList + } + // Default is recursive, if delimiter is set then list non recursive. recursive := true if delimiter == slashSeparator { recursive = false } - fileInfos, eof, err := fs.storage.ListFiles(bucket, prefix, marker, recursive, maxKeys) - if err != nil { - return ListObjectsInfo{}, toObjectErr(err, bucket) + + walker := lookupTreeWalk(fs, listParams{bucket, recursive, marker, prefix}) + if walker == nil { + walker = startTreeWalk(fs, bucket, prefix, marker, recursive) } - if maxKeys == 0 { - return ListObjectsInfo{}, nil + var fileInfos []FileInfo + var eof bool + var nextMarker string + log.Debugf("Reading from the tree walk channel has begun.") + for i := 0; i < maxKeys; { + walkResult, ok := <-walker.ch + if !ok { + // Closed channel. + eof = true + break + } + // For any walk error return right away. + if walkResult.err != nil { + log.WithFields(logrus.Fields{ + "bucket": bucket, + "prefix": prefix, + "marker": marker, + "recursive": recursive, + }).Debugf("Walk resulted in an error %s", walkResult.err) + // File not found is a valid case. + if walkResult.err == errFileNotFound { + return ListObjectsInfo{}, nil + } + return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix) + } + fileInfo := walkResult.fileInfo + nextMarker = fileInfo.Name + fileInfos = append(fileInfos, fileInfo) + if walkResult.end { + eof = true + break + } + i++ + } + if len(fileInfos) == 0 { + eof = true + } + params := listParams{bucket, recursive, nextMarker, prefix} + log.WithFields(logrus.Fields{ + "bucket": params.bucket, + "recursive": params.recursive, + "marker": params.marker, + "prefix": params.prefix, + }).Debugf("Save the tree walk into map for subsequent requests.") + if !eof { + saveTreeWalk(fs, params, walker) } result := ListObjectsInfo{IsTruncated: !eof} diff --git a/object-api-listobjects_test.go b/object-api-listobjects_test.go index 90310f467..04836bf6b 100644 --- a/object-api-listobjects_test.go +++ b/object-api-listobjects_test.go @@ -535,6 +535,7 @@ func TestListObjects(t *testing.T) { t.Errorf("Test %d: Expected to pass, but failed with: %s", i+1, err.Error()) } if err == nil && !testCase.shouldPass { + t.Log(result) t.Errorf("Test %d: Expected to fail with \"%s\", but passed instead", i+1, testCase.err.Error()) } // Failed as expected, but does it fail for the expected reason. @@ -543,11 +544,15 @@ func TestListObjects(t *testing.T) { t.Errorf("Test %d: Expected to fail with error \"%s\", but instead failed with error \"%s\" instead", i+1, testCase.err.Error(), err.Error()) } } - // Since there are cases for which ListObjects fails, this is necessary. - // Test passes as expected, but the output values are verified for correctness here. + // Since there are cases for which ListObjects fails, this is + // necessary. Test passes as expected, but the output values + // are verified for correctness here. if err == nil && testCase.shouldPass { - // The length of the expected ListObjectsResult.Objects should match in both expected result from test cases and in the output. - // On failure calling t.Fatalf, otherwise it may lead to index out of range error in assertion following this. + // The length of the expected ListObjectsResult.Objects + // should match in both expected result from test cases + // and in the output. On failure calling t.Fatalf, + // otherwise it may lead to index out of range error in + // assertion following this. if len(testCase.result.Objects) != len(result.Objects) { t.Fatalf("Test %d: Expected number of object in the result to be '%d', but found '%d' objects instead", i+1, len(testCase.result.Objects), len(result.Objects)) } diff --git a/object-common-multipart.go b/object-common-multipart.go index 00912b3f8..ba9ab0a4e 100644 --- a/object-common-multipart.go +++ b/object-common-multipart.go @@ -23,9 +23,9 @@ import ( "io" "io/ioutil" "path" + "sort" "strconv" "strings" - "sync" "github.com/Sirupsen/logrus" "github.com/skyrings/skyring-common/tools/uuid" @@ -164,37 +164,24 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa // Cleanup all temp entries inside tmpMetaPrefix directory, upon server initialization. func cleanupAllTmpEntries(storage StorageAPI) error { - recursive := true // Recursively delete all files inside 'tmp' directory. - return cleanupUploadedParts(storage, tmpMetaPrefix, "", "", "", recursive) + return cleanupUploadedParts(storage, tmpMetaPrefix, "", "", "") } // Wrapper to which removes all the uploaded parts after a successful // complete multipart upload. -func cleanupUploadedParts(storage StorageAPI, prefix, bucket, object, uploadID string, recursive bool) error { - markerPath := "" - var wg = &sync.WaitGroup{} - for { - uploadIDPath := path.Join(prefix, bucket, object, uploadID) - fileInfos, eof, err := storage.ListFiles(minioMetaBucket, uploadIDPath, markerPath, recursive, 1000) - if err != nil { - return toObjectErr(err, bucket, object) - } - // Loop through all files and delete each in go-routine, while - // adding each operation to a wait group. - for _, fileInfo := range fileInfos { - wg.Add(1) - go func(fi FileInfo) { - defer wg.Done() - storage.DeleteFile(minioMetaBucket, fi.Name) - }(fileInfo) - } - if eof { - break - } - markerPath = fileInfos[len(fileInfos)-1].Name +func cleanupUploadedParts(storage StorageAPI, prefix, bucket, object, uploadID string) error { + multipartDir := path.Join(prefix, bucket, object) + entries, err := storage.ListDir(minioMetaBucket, multipartDir) + if err != nil { + return err + } + for _, entry := range entries { + if strings.HasPrefix(entry, uploadID) { + if err = storage.DeleteFile(minioMetaBucket, path.Join(multipartDir, entry)); err != nil { + return err + } + } } - // Wait for all the routines. - wg.Wait() return nil } @@ -213,47 +200,40 @@ func abortMultipartUploadCommon(storage StorageAPI, bucket, object, uploadID str } else if !status { return InvalidUploadID{UploadID: uploadID} } - recursive := false // Cleanup all the top level files and folders matching uploadID. - return cleanupUploadedParts(storage, mpartMetaPrefix, bucket, object, uploadID, recursive) + return cleanupUploadedParts(storage, mpartMetaPrefix, bucket, object, uploadID) } // listLeafEntries - lists all entries if a given prefixPath is a leaf // directory, returns error if any - returns empty list if prefixPath // is not a leaf directory. -func listLeafEntries(storage StorageAPI, prefixPath string) (entries []FileInfo, e error) { - var markerPath string - for { - fileInfos, eof, err := storage.ListFiles(minioMetaBucket, prefixPath, markerPath, false, 1000) - if err != nil { - log.WithFields(logrus.Fields{ - "prefixPath": prefixPath, - "markerPath": markerPath, - }).Errorf("%s", err) - return nil, err - } - for _, fileInfo := range fileInfos { - // Set marker for next batch of ListFiles. - markerPath = fileInfo.Name - if fileInfo.Mode.IsDir() { - // If a directory is found, doesn't return anything. - return nil, nil - } - fileName := path.Base(fileInfo.Name) - if !strings.Contains(fileName, ".") { - // Skip the entry if it is of the pattern bucket/object/uploadID.partNum.md5sum - // and retain entries of the pattern bucket/object/uploadID - entries = append(entries, fileInfo) - } - } - if eof { - break +func listLeafEntries(storage StorageAPI, prefixPath string) (entries []string, err error) { + entries, err = storage.ListDir(minioMetaBucket, prefixPath) + if err != nil { + return nil, err + } + for _, entry := range entries { + if strings.HasSuffix(entry, slashSeparator) { + return nil, nil } } return entries, nil } // listMetaBucketMultipartFiles - list all files at a given prefix inside minioMetaBucket. -func listMetaBucketMultipartFiles(storage StorageAPI, prefixPath string, markerPath string, recursive bool, maxKeys int) (allFileInfos []FileInfo, eof bool, err error) { +func listMetaBucketMultipartFiles(layer ObjectLayer, prefixPath string, markerPath string, recursive bool, maxKeys int) (fileInfos []FileInfo, eof bool, err error) { + var storage StorageAPI + switch l := layer.(type) { + case fsObjects: + storage = l.storage + case xlObjects: + storage = l.storage + } + + walker := lookupTreeWalk(layer, listParams{minioMetaBucket, recursive, markerPath, prefixPath}) + if walker == nil { + walker = startTreeWalk(layer, minioMetaBucket, prefixPath, markerPath, recursive) + } + // newMaxKeys tracks the size of entries which are going to be // returned back. var newMaxKeys int @@ -261,64 +241,53 @@ func listMetaBucketMultipartFiles(storage StorageAPI, prefixPath string, markerP // Following loop gathers and filters out special files inside // minio meta volume. for { - var fileInfos []FileInfo - // List files up to maxKeys-newMaxKeys, since we are skipping - // entries for special files. - fileInfos, eof, err = storage.ListFiles(minioMetaBucket, prefixPath, markerPath, recursive, maxKeys-newMaxKeys) - if err != nil { - log.WithFields(logrus.Fields{ - "prefixPath": prefixPath, - "markerPath": markerPath, - "recursive": recursive, - "maxKeys": maxKeys, - }).Errorf("%s", err) - return nil, true, err + walkResult, ok := <-walker.ch + if !ok { + // Closed channel. + eof = true + break } - // Loop through and validate individual file. - for _, fi := range fileInfos { - var entries []FileInfo - if fi.Mode.IsDir() { - // List all the entries if fi.Name is a leaf directory, if - // fi.Name is not a leaf directory then the resulting - // entries are empty. - entries, err = listLeafEntries(storage, fi.Name) + // For any walk error return right away. + if walkResult.err != nil { + log.WithFields(logrus.Fields{ + "bucket": minioMetaBucket, + "prefix": prefixPath, + "marker": markerPath, + "recursive": recursive, + }).Debugf("Walk resulted in an error %s", walkResult.err) + // File not found is a valid case. + if walkResult.err == errFileNotFound { + return nil, true, nil + } + return nil, false, toObjectErr(walkResult.err, minioMetaBucket, prefixPath) + } + fi := walkResult.fileInfo + var entries []string + if fi.Mode.IsDir() { + // List all the entries if fi.Name is a leaf directory, if + // fi.Name is not a leaf directory then the resulting + // entries are empty. + entries, err = listLeafEntries(storage, fi.Name) + if err != nil { + log.WithFields(logrus.Fields{ + "prefixPath": fi.Name, + }).Errorf("%s", err) + return nil, false, err + } + } + if len(entries) > 0 { + // We reach here for non-recursive case and a leaf entry. + sort.Strings(entries) + for _, entry := range entries { + if strings.ContainsRune(entry, '.') { + continue + } + var fileInfo FileInfo + fileInfo, err = storage.StatFile(minioMetaBucket, path.Join(fi.Name, entry)) if err != nil { - log.WithFields(logrus.Fields{ - "prefixPath": fi.Name, - }).Errorf("%s", err) return nil, false, err } - } - // Set markerPath for next batch of listing. - markerPath = fi.Name - if len(entries) > 0 { - // We reach here for non-recursive case and a leaf entry. - for _, entry := range entries { - allFileInfos = append(allFileInfos, entry) - newMaxKeys++ - // If we have reached the maxKeys, it means we have listed - // everything that was requested. Return right here. - if newMaxKeys == maxKeys { - // Return values: - // allFileInfos : "maxKeys" number of entries. - // eof : eof returned by fs.storage.ListFiles() - // error : nil - return - } - } - } else { - // We reach here for a non-recursive case non-leaf entry - // OR recursive case with fi.Name matching pattern bucket/object/uploadID[.partNum.md5sum] - if !fi.Mode.IsDir() { // Do not skip non-recursive case directory entries. - // Skip files matching pattern bucket/object/uploadID.partNum.md5sum - // and retain files matching pattern bucket/object/uploadID - specialFile := path.Base(fi.Name) - if strings.Contains(specialFile, ".") { - // Contains partnumber and md5sum info, skip this. - continue - } - } - allFileInfos = append(allFileInfos, fi) + fileInfos = append(fileInfos, fileInfo) newMaxKeys++ // If we have reached the maxKeys, it means we have listed // everything that was requested. Return right here. @@ -330,20 +299,39 @@ func listMetaBucketMultipartFiles(storage StorageAPI, prefixPath string, markerP return } } - } - // If we have reached eof then we break out. - if eof { - break + } else { + // We reach here for a non-recursive case non-leaf entry + // OR recursive case with fi.Name matching pattern bucket/object/uploadID[.partNum.md5sum] + if !fi.Mode.IsDir() { // Do not skip non-recursive case directory entries. + // Skip files matching pattern bucket/object/uploadID.partNum.md5sum + // and retain files matching pattern bucket/object/uploadID + specialFile := path.Base(fi.Name) + if strings.Contains(specialFile, ".") { + // Contains partnumber and md5sum info, skip this. + continue + } + } + fileInfos = append(fileInfos, fi) + newMaxKeys++ + // If we have reached the maxKeys, it means we have listed + // everything that was requested. Return right here. + if newMaxKeys == maxKeys { + // Return values: + // allFileInfos : "maxKeys" number of entries. + // eof : eof returned by fs.storage.ListFiles() + // error : nil + return + } } } // Return entries here. - return allFileInfos, eof, nil + return fileInfos, eof, nil } // listMultipartUploadsCommon - lists all multipart uploads, common // function for both object layers. -func listMultipartUploadsCommon(storage StorageAPI, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { +func listMultipartUploadsCommon(layer ObjectLayer, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { result := ListMultipartsInfo{} // Verify if bucket is valid. if !IsValidBucketName(bucket) { @@ -401,7 +389,7 @@ func listMultipartUploadsCommon(storage StorageAPI, bucket, prefix, keyMarker, u } // List all the multipart files at prefixPath, starting with marker keyMarkerPath. - fileInfos, eof, err := listMetaBucketMultipartFiles(storage, multipartPrefixPath, multipartMarkerPath, recursive, maxUploads) + fileInfos, eof, err := listMetaBucketMultipartFiles(layer, multipartPrefixPath, multipartMarkerPath, recursive, maxUploads) if err != nil { log.WithFields(logrus.Fields{ "prefixPath": multipartPrefixPath, @@ -446,61 +434,60 @@ func listMultipartUploadsCommon(storage StorageAPI, bucket, prefix, keyMarker, u func listObjectPartsCommon(storage StorageAPI, bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket} + return ListPartsInfo{}, (BucketNameInvalid{Bucket: bucket}) } if !IsValidObjectName(object) { - return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} + return ListPartsInfo{}, (ObjectNameInvalid{Bucket: bucket, Object: object}) } if status, err := isUploadIDExists(storage, bucket, object, uploadID); err != nil { return ListPartsInfo{}, err } else if !status { - return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID} + return ListPartsInfo{}, (InvalidUploadID{UploadID: uploadID}) } result := ListPartsInfo{} - var markerPath string - nextPartNumberMarker := 0 - uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) - // Figure out the marker for the next subsequent calls, if the - // partNumberMarker is already set. - if partNumberMarker > 0 { - partNumberMarkerPath := uploadIDPath + "." + fmt.Sprintf("%.5d", partNumberMarker) + "." - fileInfos, _, err := storage.ListFiles(minioMetaBucket, partNumberMarkerPath, "", false, 1) - if err != nil { - return result, toObjectErr(err, minioMetaBucket, partNumberMarkerPath) - } - if len(fileInfos) == 0 { - return result, InvalidPart{} - } - markerPath = fileInfos[0].Name - } - uploadIDPrefix := uploadIDPath + "." - fileInfos, eof, err := storage.ListFiles(minioMetaBucket, uploadIDPrefix, markerPath, false, maxParts) + entries, err := storage.ListDir(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) if err != nil { - return result, InvalidPart{} + return result, err } - for _, fileInfo := range fileInfos { - fileName := path.Base(fileInfo.Name) - splitResult := strings.Split(fileName, ".") - partNum, err := strconv.Atoi(splitResult[1]) - if err != nil { - return result, err + sort.Strings(entries) + var newEntries []string + for _, entry := range entries { + if !strings.Contains(entry, ".") { + continue + } + if !strings.HasPrefix(entry, uploadID) { + continue + } + newEntries = append(newEntries, entry) + } + idx := sort.SearchStrings(newEntries, fmt.Sprintf("%s.%.5d.", uploadID, partNumberMarker+1)) + newEntries = newEntries[idx:] + count := maxParts + for _, entry := range newEntries { + fi, err := storage.StatFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, entry)) + splitEntry := strings.Split(entry, ".") + partNum, err := strconv.Atoi(splitEntry[1]) + if err != nil { + return ListPartsInfo{}, err } - md5sum := splitResult[2] result.Parts = append(result.Parts, partInfo{ PartNumber: partNum, - LastModified: fileInfo.ModTime, - ETag: md5sum, - Size: fileInfo.Size, + LastModified: fi.ModTime, + ETag: splitEntry[2], + Size: fi.Size, }) - nextPartNumberMarker = partNum + count-- + if count == 0 { + break + } + } + if len(newEntries) > len(result.Parts) { + result.IsTruncated = true } result.Bucket = bucket result.Object = object result.UploadID = uploadID - result.PartNumberMarker = partNumberMarker - result.NextPartNumberMarker = nextPartNumberMarker result.MaxParts = maxParts - result.IsTruncated = !eof return result, nil } diff --git a/posix-dir-common.go b/posix-dir-common.go deleted file mode 100644 index 933399ac5..000000000 --- a/posix-dir-common.go +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "os" - "path" - "sort" - "strings" - "time" -) - -// fsDirent carries directory entries. -type fsDirent struct { - name string - modTime time.Time // On Solaris and older unix distros this is empty. - size int64 // On Solaris and older unix distros this is empty. - mode os.FileMode -} - -// IsDir - returns true if fsDirent is a directory -func (ent fsDirent) IsDir() bool { - return ent.mode.IsDir() -} - -// IsSymlink - returns true if fsDirent is a symbolic link -func (ent fsDirent) IsSymlink() bool { - return ent.mode&os.ModeSymlink == os.ModeSymlink -} - -// IsRegular - returns true if fsDirent is a regular file -func (ent fsDirent) IsRegular() bool { - return ent.mode.IsRegular() -} - -// byDirentName is a collection satisfying sort.Interface. -type byDirentName []fsDirent - -func (d byDirentName) Len() int { return len(d) } -func (d byDirentName) Swap(i, j int) { d[i], d[j] = d[j], d[i] } -func (d byDirentName) Less(i, j int) bool { return d[i].name < d[j].name } - -// Using sort.Search() internally to jump to the file entry containing -// the prefix. -func searchDirents(dirents []fsDirent, x string) int { - processFunc := func(i int) bool { - return dirents[i].name >= x - } - return sort.Search(len(dirents), processFunc) -} - -// Tree walk result carries results of tree walking. -type treeWalkResult struct { - fileInfo FileInfo - err error - end bool -} - -// Tree walk notify carries a channel which notifies tree walk -// results, additionally it also carries information if treeWalk -// should be timedOut. -type treeWalker struct { - ch <-chan treeWalkResult - timedOut bool -} - -// treeWalk walks FS directory tree recursively pushing fileInfo into the channel as and when it encounters files. -func treeWalk(bucketDir, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResult) bool, count *int) bool { - // Example: - // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively - // called with prefixDir="one/two/three/four/" and marker="five.txt" - - // Convert dirent to FileInfo - direntToFileInfo := func(dirent fsDirent) (FileInfo, error) { - fileInfo := FileInfo{} - // Convert to full object name. - fileInfo.Name = path.Join(prefixDir, dirent.name) - if dirent.modTime.IsZero() && dirent.size == 0 { - // ModifiedTime and Size are zero, Stat() and figure out - // the actual values that need to be set. - fi, err := os.Stat(path.Join(bucketDir, prefixDir, dirent.name)) - if err != nil { - return FileInfo{}, err - } - // Fill size and modtime. - fileInfo.ModTime = fi.ModTime() - fileInfo.Size = fi.Size() - fileInfo.Mode = fi.Mode() - } else { - // If ModTime or Size are set then use them - // without attempting another Stat operation. - fileInfo.ModTime = dirent.modTime - fileInfo.Size = dirent.size - fileInfo.Mode = dirent.mode - } - if fileInfo.Mode.IsDir() { - // Add "/" suffix again for directories as - fileInfo.Size = 0 - fileInfo.Name += "/" - } - return fileInfo, nil - } - - var markerBase, markerDir string - if marker != "" { - // Ex: if marker="four/five.txt", markerDir="four/" markerBase="five.txt" - markerSplit := strings.SplitN(marker, slashSeparator, 2) - markerDir = markerSplit[0] - if len(markerSplit) == 2 { - markerDir += slashSeparator - markerBase = markerSplit[1] - } - } - - // Entry prefix match function. - prefixMatchFn := func(dirent fsDirent) bool { - if dirent.IsDir() || dirent.IsRegular() { - // Does dirent name has reserved prefixes or suffixes. - hasReserved := hasReservedPrefix(dirent.name) || hasReservedSuffix(dirent.name) - // All dirents which match prefix and do not have reserved - // keywords in them are valid entries. - return strings.HasPrefix(dirent.name, entryPrefixMatch) && !hasReserved && isValidPath(dirent.name) - } - return false - } - - // scandir returns entries that begins with entryPrefixMatch - dirents, err := scandir(path.Join(bucketDir, prefixDir), prefixMatchFn, true) - if err != nil { - send(treeWalkResult{err: err}) - return false - } - - // example: - // If markerDir="four/" searchDirents() returns the index of "four/" in the sorted - // dirents list. We skip all the dirent entries till "four/" - dirents = dirents[searchDirents(dirents, markerDir):] - *count += len(dirents) - for i, dirent := range dirents { - if i == 0 && markerDir == dirent.name && !dirent.IsDir() { - // If the first entry is not a directory - // we need to skip this entry. - *count-- - continue - } - if dirent.IsDir() && recursive { - // If the entry is a directory, we will need recurse into it. - markerArg := "" - if dirent.name == markerDir { - // We need to pass "five.txt" as marker only if we are - // recursing into "four/" - markerArg = markerBase - } - *count-- - if !treeWalk(bucketDir, path.Join(prefixDir, dirent.name), "", markerArg, recursive, send, count) { - return false - } - continue - } - fileInfo, err := direntToFileInfo(dirent) - if err != nil { - send(treeWalkResult{err: err}) - return false - } - *count-- - if !send(treeWalkResult{fileInfo: fileInfo}) { - return false - } - } - return true -} - -// Initiate a new treeWalk in a goroutine. -func startTreeWalk(fsPath, bucket, prefix, marker string, recursive bool) *treeWalker { - // Example 1 - // If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt" - // treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt" - // and entryPrefixMatch="" - - // Example 2 - // if prefix is "one/two/th" and marker is "one/two/three/four/five.txt" - // treeWalk is called with prefixDir="one/two/" and marker="three/four/five.txt" - // and entryPrefixMatch="th" - ch := make(chan treeWalkResult, fsListLimit) - walkNotify := treeWalker{ch: ch} - entryPrefixMatch := prefix - prefixDir := "" - lastIndex := strings.LastIndex(prefix, slashSeparator) - if lastIndex != -1 { - entryPrefixMatch = prefix[lastIndex+1:] - prefixDir = prefix[:lastIndex+1] - } - count := 0 - marker = strings.TrimPrefix(marker, prefixDir) - go func() { - defer close(ch) - send := func(walkResult treeWalkResult) bool { - if count == 0 { - walkResult.end = true - } - timer := time.After(time.Second * 60) - select { - case ch <- walkResult: - return true - case <-timer: - walkNotify.timedOut = true - return false - } - } - bucketDir := path.Join(fsPath, bucket) - treeWalk(bucketDir, prefixDir, entryPrefixMatch, marker, recursive, send, &count) - }() - return &walkNotify -} diff --git a/posix-dir-nix.go b/posix-list-dir-nix.go similarity index 60% rename from posix-dir-nix.go rename to posix-list-dir-nix.go index 17c748b75..1991428e4 100644 --- a/posix-dir-nix.go +++ b/posix-list-dir-nix.go @@ -22,9 +22,11 @@ import ( "os" "path" "runtime" - "sort" + "strings" "syscall" "unsafe" + + "github.com/Sirupsen/logrus" ) const ( @@ -47,9 +49,8 @@ func clen(n []byte) int { // parseDirents - inspired from // https://golang.org/src/syscall/syscall_.go -func parseDirents(dirPath string, buf []byte) []fsDirent { +func parseDirents(dirPath string, buf []byte) (entries []string, err error) { bufidx := 0 - dirents := []fsDirent{} for bufidx < len(buf) { dirent := (*syscall.Dirent)(unsafe.Pointer(&buf[bufidx])) // On non-Linux operating systems for rec length of zero means @@ -58,7 +59,7 @@ func parseDirents(dirPath string, buf []byte) []fsDirent { break } bufidx += int(dirent.Reclen) - // Skip dirents if they are absent in directory. + // Skip if they are absent in directory. if isEmptyDirent(dirent) { continue } @@ -69,56 +70,57 @@ func parseDirents(dirPath string, buf []byte) []fsDirent { continue } - var mode os.FileMode switch dirent.Type { - case syscall.DT_BLK, syscall.DT_WHT: - mode = os.ModeDevice - case syscall.DT_CHR: - mode = os.ModeDevice | os.ModeCharDevice case syscall.DT_DIR: - mode = os.ModeDir - case syscall.DT_FIFO: - mode = os.ModeNamedPipe - case syscall.DT_LNK: - mode = os.ModeSymlink + entries = append(entries, name+slashSeparator) case syscall.DT_REG: - mode = 0 - case syscall.DT_SOCK: - mode = os.ModeSocket + entries = append(entries, name) case syscall.DT_UNKNOWN: // On Linux XFS does not implement d_type for on disk // format << v5. Fall back to Stat(). - if fi, err := os.Stat(path.Join(dirPath, name)); err == nil { - mode = fi.Mode() + var fi os.FileInfo + if fi, err = os.Stat(path.Join(dirPath, name)); err == nil { + if fi.IsDir() { + entries = append(entries, fi.Name()+slashSeparator) + } else if fi.Mode().IsRegular() { + entries = append(entries, fi.Name()) + } } else { - // Caller listing would fail, if Stat failed but we - // won't crash the server. - mode = 0xffffffff + // This is unexpected. + return } + default: + // Skip entries which are not file or directory. + // FIXME: should we handle symlinks? + continue } - - dirents = append(dirents, fsDirent{ - name: name, - mode: mode, - }) } - return dirents + return } -// scans the directory dirPath, calling filter() on each directory -// entry. Entries for which filter() returns true are stored, lexically -// sorted using sort.Sort(). If filter is NULL, all entries are selected. -// If namesOnly is true, dirPath is not appended into entry name. -func scandir(dirPath string, filter func(fsDirent) bool, namesOnly bool) ([]fsDirent, error) { +// Return all the entries at the directory dirPath. +func readDir(dirPath string) (entries []string, err error) { buf := make([]byte, readDirentBufSize) d, err := os.Open(dirPath) if err != nil { + log.WithFields(logrus.Fields{ + "dirPath": dirPath, + }).Debugf("Open failed with %s", err) + + // File is really not found. + if os.IsNotExist(err) { + return nil, errFileNotFound + } + + // File path cannot be verified since one of the parents is a file. + if strings.Contains(err.Error(), "not a directory") { + return nil, errFileNotFound + } return nil, err } defer d.Close() fd := int(d.Fd()) - dirents := []fsDirent{} for { nbuf, err := syscall.ReadDirent(fd, buf) if err != nil { @@ -127,20 +129,11 @@ func scandir(dirPath string, filter func(fsDirent) bool, namesOnly bool) ([]fsDi if nbuf <= 0 { break } - for _, dirent := range parseDirents(dirPath, buf[:nbuf]) { - if !namesOnly { - dirent.name = path.Join(dirPath, dirent.name) - } - if dirent.IsDir() { - dirent.name += "/" - dirent.size = 0 - } - if filter == nil || filter(dirent) { - dirents = append(dirents, dirent) - } + var tmpEntries []string + if tmpEntries, err = parseDirents(dirPath, buf[:nbuf]); err != nil { + return nil, err } + entries = append(entries, tmpEntries...) } - - sort.Sort(byDirentName(dirents)) - return dirents, nil + return } diff --git a/posix-dir-others.go b/posix-list-dir-others.go similarity index 55% rename from posix-dir-others.go rename to posix-list-dir-others.go index f8b34b466..25d74a714 100644 --- a/posix-dir-others.go +++ b/posix-list-dir-others.go @@ -21,22 +21,32 @@ package main import ( "io" "os" - "path" - "sort" + "strings" + + "github.com/Sirupsen/logrus" ) -// scans the directory dirPath, calling filter() on each directory -// entry. Entries for which filter() returns true are stored, lexically -// sorted using sort.Sort(). If filter is NULL, all entries are selected. -// If namesOnly is true, dirPath is not appended into entry name. -func scandir(dirPath string, filter func(fsDirent) bool, namesOnly bool) ([]fsDirent, error) { +// Return all the entries at the directory dirPath. +func readDir(dirPath string) (entries []string, err error) { d, err := os.Open(dirPath) if err != nil { + log.WithFields(logrus.Fields{ + "dirPath": dirPath, + }).Debugf("Open failed with %s", err) + + // File is really not found. + if os.IsNotExist(err) { + return nil, errFileNotFound + } + + // File path cannot be verified since one of the parents is a file. + if strings.Contains(err.Error(), "not a directory") { + return nil, errFileNotFound + } return nil, err } defer d.Close() - var dirents []fsDirent for { fis, err := d.Readdir(1000) if err != nil { @@ -46,25 +56,13 @@ func scandir(dirPath string, filter func(fsDirent) bool, namesOnly bool) ([]fsDi return nil, err } for _, fi := range fis { - dirent := fsDirent{ - name: fi.Name(), - modTime: fi.ModTime(), - size: fi.Size(), - mode: fi.Mode(), - } - if !namesOnly { - dirent.name = path.Join(dirPath, dirent.name) - } - if dirent.IsDir() { + if fi.Mode().IsDir() { // append "/" instead of "\" so that sorting is done as expected. - dirent.name += slashSeparator - } - if filter == nil || filter(dirent) { - dirents = append(dirents, dirent) + entries = append(entries, fi.Name()+slashSeparator) + } else if fi.Mode().IsRegular() { + entries = append(entries, fi.Name()) } } } - - sort.Sort(byDirentName(dirents)) - return dirents, nil + return } diff --git a/posix.go b/posix.go index d177e3c29..f1f2c4ddb 100644 --- a/posix.go +++ b/posix.go @@ -19,38 +19,24 @@ package main import ( "io" "os" - slashpath "path" + "path" "path/filepath" "strings" - "sync" "syscall" - "path" - "github.com/Sirupsen/logrus" "github.com/minio/minio/pkg/disk" "github.com/minio/minio/pkg/safe" ) const ( - fsListLimit = 1000 fsMinSpacePercent = 5 ) -// listParams - list object params used for list object map -type listParams struct { - bucket string - recursive bool - marker string - prefix string -} - // fsStorage - implements StorageAPI interface. type fsStorage struct { - diskPath string - minFreeDisk int64 - listObjectMap map[listParams][]*treeWalker - listObjectMapMutex *sync.Mutex + diskPath string + minFreeDisk int64 } // isDirEmpty - returns whether given directory is empty or not. @@ -98,10 +84,8 @@ func newPosix(diskPath string) (StorageAPI, error) { return nil, syscall.ENOTDIR } fs := fsStorage{ - diskPath: diskPath, - minFreeDisk: fsMinSpacePercent, // Minimum 5% disk should be free. - listObjectMap: make(map[listParams][]*treeWalker), - listObjectMapMutex: &sync.Mutex{}, + diskPath: diskPath, + minFreeDisk: fsMinSpacePercent, // Minimum 5% disk should be free. } log.WithFields(logrus.Fields{ "diskPath": diskPath, @@ -158,25 +142,23 @@ func removeDuplicateVols(volsInfo []VolInfo) []VolInfo { // gets all the unique directories from diskPath. func getAllUniqueVols(dirPath string) ([]VolInfo, error) { - volumeFn := func(dirent fsDirent) bool { - // Return all directories. - return dirent.IsDir() && isValidVolname(path.Clean(dirent.name)) - } - namesOnly := true // Returned are only names. - dirents, err := scandir(dirPath, volumeFn, namesOnly) + entries, err := readDir(dirPath) if err != nil { log.WithFields(logrus.Fields{ - "dirPath": dirPath, - "namesOnly": true, - }).Debugf("Scandir failed with error %s", err) + "dirPath": dirPath, + }).Debugf("readDir failed with error %s", err) return nil, err } var volsInfo []VolInfo - for _, dirent := range dirents { - fi, err := os.Stat(pathJoin(dirPath, dirent.name)) + for _, entry := range entries { + if !strings.HasSuffix(entry, slashSeparator) || !isValidVolname(filepath.Clean(entry)) { + // Skip if entry is neither a directory not a valid volume name. + continue + } + fi, err := os.Stat(pathJoin(dirPath, entry)) if err != nil { log.WithFields(logrus.Fields{ - "path": pathJoin(dirPath, dirent.name), + "path": pathJoin(dirPath, entry), }).Debugf("Stat failed with error %s", err) return nil, err } @@ -372,62 +354,9 @@ func (s fsStorage) DeleteVol(volume string) error { return nil } -// Save the goroutine reference in the map -func (s *fsStorage) saveTreeWalk(params listParams, walker *treeWalker) { - s.listObjectMapMutex.Lock() - defer s.listObjectMapMutex.Unlock() - - log.WithFields(logrus.Fields{ - "bucket": params.bucket, - "recursive": params.recursive, - "marker": params.marker, - "prefix": params.prefix, - }).Debugf("saveTreeWalk has been invoked.") - - walkers, _ := s.listObjectMap[params] - walkers = append(walkers, walker) - - s.listObjectMap[params] = walkers - log.Debugf("Successfully saved in listObjectMap.") -} - -// Lookup the goroutine reference from map -func (s *fsStorage) lookupTreeWalk(params listParams) *treeWalker { - s.listObjectMapMutex.Lock() - defer s.listObjectMapMutex.Unlock() - - log.WithFields(logrus.Fields{ - "bucket": params.bucket, - "recursive": params.recursive, - "marker": params.marker, - "prefix": params.prefix, - }).Debugf("lookupTreeWalk has been invoked.") - if walkChs, ok := s.listObjectMap[params]; ok { - for i, walkCh := range walkChs { - if !walkCh.timedOut { - newWalkChs := walkChs[i+1:] - if len(newWalkChs) > 0 { - s.listObjectMap[params] = newWalkChs - } else { - delete(s.listObjectMap, params) - } - log.WithFields(logrus.Fields{ - "bucket": params.bucket, - "recursive": params.recursive, - "marker": params.marker, - "prefix": params.prefix, - }).Debugf("Found the previous saved listsObjects params.") - return walkCh - } - } - // As all channels are timed out, delete the map entry - delete(s.listObjectMap, params) - } - return nil -} - -// List operation. -func (s fsStorage) ListFiles(volume, prefix, marker string, recursive bool, count int) ([]FileInfo, bool, error) { +// ListDir - return all the entries at the given directory path. +// If an entry is a directory it will be returned with a trailing "/". +func (s fsStorage) ListDir(volume, dirPath string) ([]string, error) { // Verify if volume is valid and it exists. volumeDir, err := s.getVolumeDir(volume) if err != nil { @@ -435,100 +364,21 @@ func (s fsStorage) ListFiles(volume, prefix, marker string, recursive bool, coun "diskPath": s.diskPath, "volume": volume, }).Debugf("getVolumeDir failed with %s", err) - return nil, true, err + return nil, err } - var fileInfos []FileInfo - - if marker != "" { - // Verify if marker has prefix. - if marker != "" && !strings.HasPrefix(marker, prefix) { - log.WithFields(logrus.Fields{ - "diskPath": s.diskPath, - "marker": marker, - "prefix": prefix, - }).Debugf("Marker doesn't have prefix in common.") - return nil, true, errInvalidArgument - } - } - - // Return empty response for a valid request when count is 0. - if count == 0 { - return nil, true, nil - } - - // Over flowing count - reset to fsListLimit. - if count < 0 || count > fsListLimit { - count = fsListLimit - } - - // Verify if prefix exists. - prefixDir := slashpath.Dir(prefix) - prefixRootDir := slashpath.Join(volumeDir, prefixDir) - if status, err := isDirExist(prefixRootDir); !status { - if err == nil { - // Prefix does not exist, not an error just respond empty list response. - return nil, true, nil - } else if err.Error() == syscall.ENOTDIR.Error() { - // Prefix exists as a file. - return nil, true, nil - } - + // Stat a volume entry. + _, err = os.Stat(volumeDir) + if err != nil { log.WithFields(logrus.Fields{ - "volumeDir": volumeDir, - "prefixRootDir": prefixRootDir, - }).Debugf("isDirExist returned an unhandled error %s", err) - - // Rest errors should be treated as failure. - return nil, true, err - } - - // Maximum 1000 files returned in a single call. - // Further calls will set right marker value to continue reading the rest of the files. - // popTreeWalker returns nil if the call to ListFiles is done for the first time. - // On further calls to ListFiles to retrive more files within the timeout period, - // popTreeWalker returns the channel from which rest of the objects can be retrieved. - walker := s.lookupTreeWalk(listParams{volume, recursive, marker, prefix}) - if walker == nil { - walker = startTreeWalk(filepath.ToSlash(s.diskPath), volume, prefix, marker, recursive) - } - nextMarker := "" - log.Debugf("Reading from the tree walk channel has begun.") - for i := 0; i < count; { - walkResult, ok := <-walker.ch - if !ok { - // Closed channel. - return fileInfos, true, nil + "diskPath": s.diskPath, + "volume": volume, + }).Debugf("Stat on the volume failed with %s", err) + if os.IsNotExist(err) { + return nil, errVolumeNotFound } - // For any walk error return right away. - if walkResult.err != nil { - log.WithFields(logrus.Fields{ - "diskPath": s.diskPath, - "volume": volume, - "prefix": prefix, - "marker": marker, - "recursive": recursive, - }).Debugf("Walk resulted in an error %s", walkResult.err) - return nil, true, walkResult.err - } - fileInfo := walkResult.fileInfo - fileInfo.Name = filepath.ToSlash(fileInfo.Name) - fileInfos = append(fileInfos, fileInfo) - // We have listed everything return. - if walkResult.end { - return fileInfos, true, nil - } - nextMarker = fileInfo.Name - i++ + return nil, err } - params := listParams{volume, recursive, nextMarker, prefix} - log.WithFields(logrus.Fields{ - "bucket": params.bucket, - "recursive": params.recursive, - "marker": params.marker, - "prefix": params.prefix, - }).Debugf("Save the tree walk into map for subsequent requests.") - s.saveTreeWalk(params, walker) - return fileInfos, false, nil + return readDir(pathJoin(volumeDir, dirPath)) } // ReadFile - read a file at a given offset. diff --git a/rpc-client.go b/rpc-client.go index 32068bf6b..513656646 100644 --- a/rpc-client.go +++ b/rpc-client.go @@ -31,7 +31,7 @@ import ( "github.com/Sirupsen/logrus" ) -type networkFS struct { +type networkStorage struct { netScheme string netAddr string netPath string @@ -109,7 +109,7 @@ func newRPCClient(networkPath string) (StorageAPI, error) { } // Initialize network storage. - ndisk := &networkFS{ + ndisk := &networkStorage{ netScheme: "http", // TODO: fix for ssl rpc support. netAddr: netAddr, netPath: netPath, @@ -122,7 +122,7 @@ func newRPCClient(networkPath string) (StorageAPI, error) { } // MakeVol - make a volume. -func (n networkFS) MakeVol(volume string) error { +func (n networkStorage) MakeVol(volume string) error { reply := GenericReply{} if err := n.rpcClient.Call("Storage.MakeVolHandler", volume, &reply); err != nil { log.WithFields(logrus.Fields{ @@ -134,7 +134,7 @@ func (n networkFS) MakeVol(volume string) error { } // ListVols - List all volumes. -func (n networkFS) ListVols() (vols []VolInfo, err error) { +func (n networkStorage) ListVols() (vols []VolInfo, err error) { ListVols := ListVolsReply{} err = n.rpcClient.Call("Storage.ListVolsHandler", "", &ListVols) if err != nil { @@ -145,7 +145,7 @@ func (n networkFS) ListVols() (vols []VolInfo, err error) { } // StatVol - get current Stat volume info. -func (n networkFS) StatVol(volume string) (volInfo VolInfo, err error) { +func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) { if err = n.rpcClient.Call("Storage.StatVolHandler", volume, &volInfo); err != nil { log.WithFields(logrus.Fields{ "volume": volume, @@ -156,7 +156,7 @@ func (n networkFS) StatVol(volume string) (volInfo VolInfo, err error) { } // DeleteVol - Delete a volume. -func (n networkFS) DeleteVol(volume string) error { +func (n networkStorage) DeleteVol(volume string) error { reply := GenericReply{} if err := n.rpcClient.Call("Storage.DeleteVolHandler", volume, &reply); err != nil { log.WithFields(logrus.Fields{ @@ -170,7 +170,7 @@ func (n networkFS) DeleteVol(volume string) error { // File operations. // CreateFile - create file. -func (n networkFS) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) { +func (n networkStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) { writeURL := new(url.URL) writeURL.Scheme = n.netScheme writeURL.Host = n.netAddr @@ -205,7 +205,7 @@ func (n networkFS) CreateFile(volume, path string) (writeCloser io.WriteCloser, } // StatFile - get latest Stat information for a file at path. -func (n networkFS) StatFile(volume, path string) (fileInfo FileInfo, err error) { +func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) { if err = n.rpcClient.Call("Storage.StatFileHandler", StatFileArgs{ Vol: volume, Path: path, @@ -220,7 +220,7 @@ func (n networkFS) StatFile(volume, path string) (fileInfo FileInfo, err error) } // ReadFile - reads a file. -func (n networkFS) ReadFile(volume string, path string, offset int64) (reader io.ReadCloser, err error) { +func (n networkStorage) ReadFile(volume string, path string, offset int64) (reader io.ReadCloser, err error) { readURL := new(url.URL) readURL.Scheme = n.netScheme readURL.Host = n.netAddr @@ -247,31 +247,24 @@ func (n networkFS) ReadFile(volume string, path string, offset int64) (reader io return resp.Body, nil } -// ListFiles - List all files in a volume. -func (n networkFS) ListFiles(volume, prefix, marker string, recursive bool, count int) (files []FileInfo, eof bool, err error) { - listFilesReply := ListFilesReply{} - if err = n.rpcClient.Call("Storage.ListFilesHandler", ListFilesArgs{ - Vol: volume, - Prefix: prefix, - Marker: marker, - Recursive: recursive, - Count: count, - }, &listFilesReply); err != nil { +// ListDir - list all entries at prefix. +func (n networkStorage) ListDir(volume, path string) (entries []string, err error) { + if err = n.rpcClient.Call("Storage.ListDirHandler", ListDirArgs{ + Vol: volume, + Path: path, + }, &entries); err != nil { log.WithFields(logrus.Fields{ - "volume": volume, - "prefix": prefix, - "marker": marker, - "recursive": recursive, - "count": count, - }).Debugf("Storage.ListFilesHandlers failed with %s", err) - return nil, true, toStorageErr(err) + "volume": volume, + "path": path, + }).Debugf("Storage.ListDirHandlers failed with %s", err) + return nil, toStorageErr(err) } // Return successfully unmarshalled results. - return listFilesReply.Files, listFilesReply.EOF, nil + return entries, nil } // DeleteFile - Delete a file at path. -func (n networkFS) DeleteFile(volume, path string) (err error) { +func (n networkStorage) DeleteFile(volume, path string) (err error) { reply := GenericReply{} if err = n.rpcClient.Call("Storage.DeleteFileHandler", DeleteFileArgs{ Vol: volume, @@ -287,7 +280,7 @@ func (n networkFS) DeleteFile(volume, path string) (err error) { } // RenameFile - Rename file. -func (n networkFS) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { +func (n networkStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { reply := GenericReply{} if err = n.rpcClient.Call("Storage.RenameFileHandler", RenameFileArgs{ SrcVol: srcVolume, diff --git a/rpc-server-datatypes.go b/rpc-server-datatypes.go index 60159b19a..7202e9668 100644 --- a/rpc-server-datatypes.go +++ b/rpc-server-datatypes.go @@ -27,21 +27,6 @@ type ListVolsReply struct { Vols []VolInfo } -// ListFilesArgs list file args. -type ListFilesArgs struct { - Vol string - Prefix string - Marker string - Recursive bool - Count int -} - -// ListFilesReply list file reply. -type ListFilesReply struct { - Files []FileInfo - EOF bool -} - // StatFileArgs stat file args. type StatFileArgs struct { Vol string @@ -54,6 +39,12 @@ type DeleteFileArgs struct { Path string } +// ListDirArgs list dir args. +type ListDirArgs struct { + Vol string + Path string +} + // RenameFileArgs rename file args. type RenameFileArgs struct { SrcVol string diff --git a/rpc-server.go b/rpc-server.go index 92f6749af..c65190993 100644 --- a/rpc-server.go +++ b/rpc-server.go @@ -69,28 +69,6 @@ func (s *storageServer) DeleteVolHandler(arg *string, reply *GenericReply) error /// File operations -// ListFilesHandler - list files handler. -func (s *storageServer) ListFilesHandler(arg *ListFilesArgs, reply *ListFilesReply) error { - files, eof, err := s.storage.ListFiles(arg.Vol, arg.Prefix, arg.Marker, arg.Recursive, arg.Count) - if err != nil { - log.WithFields(logrus.Fields{ - "volume": arg.Vol, - "prefix": arg.Prefix, - "marker": arg.Marker, - "recursive": arg.Recursive, - "count": arg.Count, - }).Debugf("ListFiles failed with error %s", err) - return err - } - - // Fill reply structure. - reply.Files = files - reply.EOF = eof - - // Return success. - return nil -} - // StatFileHandler - stat file handler is rpc wrapper to stat file. func (s *storageServer) StatFileHandler(arg *StatFileArgs, reply *FileInfo) error { fileInfo, err := s.storage.StatFile(arg.Vol, arg.Path) @@ -105,6 +83,20 @@ func (s *storageServer) StatFileHandler(arg *StatFileArgs, reply *FileInfo) erro return nil } +// ListDirHandler - list directory handler is rpc wrapper to list dir. +func (s *storageServer) ListDirHandler(arg *ListDirArgs, reply *[]string) error { + entries, err := s.storage.ListDir(arg.Vol, arg.Path) + if err != nil { + log.WithFields(logrus.Fields{ + "volume": arg.Vol, + "path": arg.Path, + }).Debugf("ListDir failed with error %s", err) + return err + } + *reply = entries + return nil +} + // DeleteFileHandler - delete file handler is rpc wrapper to delete file. func (s *storageServer) DeleteFileHandler(arg *DeleteFileArgs, reply *GenericReply) error { err := s.storage.DeleteFile(arg.Vol, arg.Path) diff --git a/storage-api-interface.go b/storage-api-interface.go index dff3865e7..c27c798c4 100644 --- a/storage-api-interface.go +++ b/storage-api-interface.go @@ -27,7 +27,7 @@ type StorageAPI interface { DeleteVol(volume string) (err error) // File operations. - ListFiles(volume, prefix, marker string, recursive bool, count int) (files []FileInfo, eof bool, err error) + ListDir(volume, dirPath string) ([]string, error) ReadFile(volume string, path string, offset int64) (readCloser io.ReadCloser, err error) CreateFile(volume string, path string) (writeCloser io.WriteCloser, err error) StatFile(volume string, path string) (file FileInfo, err error) diff --git a/tree-walk.go b/tree-walk.go new file mode 100644 index 000000000..3aaae2279 --- /dev/null +++ b/tree-walk.go @@ -0,0 +1,284 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "os" + "path" + "sort" + "strings" + "sync" + "time" + + "github.com/Sirupsen/logrus" +) + +// listParams - list object params used for list object map +type listParams struct { + bucket string + recursive bool + marker string + prefix string +} + +// Tree walk result carries results of tree walking. +type treeWalkResult struct { + fileInfo FileInfo + err error + end bool +} + +// Tree walk notify carries a channel which notifies tree walk +// results, additionally it also carries information if treeWalk +// should be timedOut. +type treeWalker struct { + ch <-chan treeWalkResult + timedOut bool +} + +// treeWalk walks FS directory tree recursively pushing fileInfo into the channel as and when it encounters files. +func treeWalk(disk StorageAPI, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResult) bool, count *int) bool { + // Example: + // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively + // called with prefixDir="one/two/three/four/" and marker="five.txt" + + // Convert entry to FileInfo + entryToFileInfo := func(entry string) (fileInfo FileInfo, err error) { + if strings.HasSuffix(entry, slashSeparator) { + // Object name needs to be full path. + fileInfo.Name = path.Join(prefixDir, entry) + fileInfo.Name += slashSeparator + fileInfo.Mode = os.ModeDir + return + } + if fileInfo, err = disk.StatFile(bucket, path.Join(prefixDir, entry)); err != nil { + return + } + // Object name needs to be full path. + fileInfo.Name = path.Join(prefixDir, entry) + return + } + + var markerBase, markerDir string + if marker != "" { + // Ex: if marker="four/five.txt", markerDir="four/" markerBase="five.txt" + markerSplit := strings.SplitN(marker, slashSeparator, 2) + markerDir = markerSplit[0] + if len(markerSplit) == 2 { + markerDir += slashSeparator + markerBase = markerSplit[1] + } + } + entries, err := disk.ListDir(bucket, prefixDir) + if err != nil { + send(treeWalkResult{err: err}) + return false + } + if entryPrefixMatch != "" { + for i, entry := range entries { + if !strings.HasPrefix(entry, entryPrefixMatch) { + entries[i] = "" + } + if hasReservedPrefix(entry) || hasReservedSuffix(entry) { + entries[i] = "" + } + } + } + sort.StringSlice(entries).Sort() + // Skip the empty strings + for len(entries) > 0 && entries[0] == "" { + entries = entries[1:] + } + if len(entries) == 0 { + return true + } + // example: + // If markerDir="four/" Search() returns the index of "four/" in the sorted + // entries list so we skip all the entries till "four/" + idx := sort.StringSlice(entries).Search(markerDir) + entries = entries[idx:] + *count += len(entries) + for i, entry := range entries { + if i == 0 && markerDir == entry { + if !recursive { + // Skip as the marker would already be listed in the previous listing. + *count-- + continue + } + if recursive && !strings.HasSuffix(entry, slashSeparator) { + // We should not skip for recursive listing and if markerDir is a directory + // for ex. if marker is "four/five.txt" markerDir will be "four/" which + // should not be skipped, instead it will need to be treeWalk()'ed into. + + // Skip if it is a file though as it would be listed in previous listing. + *count-- + continue + } + } + + if recursive && strings.HasSuffix(entry, slashSeparator) { + // If the entry is a directory, we will need recurse into it. + markerArg := "" + if entry == markerDir { + // We need to pass "five.txt" as marker only if we are + // recursing into "four/" + markerArg = markerBase + } + *count-- + prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories. + if !treeWalk(disk, bucket, path.Join(prefixDir, entry), prefixMatch, markerArg, recursive, send, count) { + return false + } + continue + } + *count-- + fileInfo, err := entryToFileInfo(entry) + if err != nil { + // The file got deleted in the interim between ListDir() and StatFile() + // Ignore error and continue. + continue + } + if !send(treeWalkResult{fileInfo: fileInfo}) { + return false + } + } + return true +} + +// Initiate a new treeWalk in a goroutine. +func startTreeWalk(layer ObjectLayer, bucket, prefix, marker string, recursive bool) *treeWalker { + // Example 1 + // If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt" + // treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt" + // and entryPrefixMatch="" + + // Example 2 + // if prefix is "one/two/th" and marker is "one/two/three/four/five.txt" + // treeWalk is called with prefixDir="one/two/" and marker="three/four/five.txt" + // and entryPrefixMatch="th" + var disk StorageAPI + switch l := layer.(type) { + case xlObjects: + disk = l.storage + case fsObjects: + disk = l.storage + } + + ch := make(chan treeWalkResult, maxObjectList) + walkNotify := treeWalker{ch: ch} + entryPrefixMatch := prefix + prefixDir := "" + lastIndex := strings.LastIndex(prefix, slashSeparator) + if lastIndex != -1 { + entryPrefixMatch = prefix[lastIndex+1:] + prefixDir = prefix[:lastIndex+1] + } + count := 0 + marker = strings.TrimPrefix(marker, prefixDir) + go func() { + defer close(ch) + send := func(walkResult treeWalkResult) bool { + if count == 0 { + walkResult.end = true + } + timer := time.After(time.Second * 60) + select { + case ch <- walkResult: + return true + case <-timer: + walkNotify.timedOut = true + return false + } + } + treeWalk(disk, bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count) + }() + return &walkNotify +} + +// Save the goroutine reference in the map +func saveTreeWalk(layer ObjectLayer, params listParams, walker *treeWalker) { + var listObjectMap map[listParams][]*treeWalker + var listObjectMapMutex *sync.Mutex + switch l := layer.(type) { + case xlObjects: + listObjectMap = l.listObjectMap + listObjectMapMutex = l.listObjectMapMutex + case fsObjects: + listObjectMap = l.listObjectMap + listObjectMapMutex = l.listObjectMapMutex + } + listObjectMapMutex.Lock() + defer listObjectMapMutex.Unlock() + + log.WithFields(logrus.Fields{ + "bucket": params.bucket, + "recursive": params.recursive, + "marker": params.marker, + "prefix": params.prefix, + }).Debugf("saveTreeWalk has been invoked.") + + walkers, _ := listObjectMap[params] + walkers = append(walkers, walker) + + listObjectMap[params] = walkers + log.Debugf("Successfully saved in listObjectMap.") +} + +// Lookup the goroutine reference from map +func lookupTreeWalk(layer ObjectLayer, params listParams) *treeWalker { + var listObjectMap map[listParams][]*treeWalker + var listObjectMapMutex *sync.Mutex + switch l := layer.(type) { + case xlObjects: + listObjectMap = l.listObjectMap + listObjectMapMutex = l.listObjectMapMutex + case fsObjects: + listObjectMap = l.listObjectMap + listObjectMapMutex = l.listObjectMapMutex + } + listObjectMapMutex.Lock() + defer listObjectMapMutex.Unlock() + + log.WithFields(logrus.Fields{ + "bucket": params.bucket, + "recursive": params.recursive, + "marker": params.marker, + "prefix": params.prefix, + }).Debugf("lookupTreeWalk has been invoked.") + if walkChs, ok := listObjectMap[params]; ok { + for i, walkCh := range walkChs { + if !walkCh.timedOut { + newWalkChs := walkChs[i+1:] + if len(newWalkChs) > 0 { + listObjectMap[params] = newWalkChs + } else { + delete(listObjectMap, params) + } + log.WithFields(logrus.Fields{ + "bucket": params.bucket, + "recursive": params.recursive, + "marker": params.marker, + "prefix": params.prefix, + }).Debugf("Found the previous saved listsObjects params.") + return walkCh + } + } + // As all channels are timed out, delete the map entry + delete(listObjectMap, params) + } + return nil +} diff --git a/xl-erasure-v1.go b/xl-erasure-v1.go index f52383992..83020af37 100644 --- a/xl-erasure-v1.go +++ b/xl-erasure-v1.go @@ -21,9 +21,10 @@ import ( "io" "os" slashpath "path" - "sort" "strings" + "path" + "github.com/Sirupsen/logrus" "github.com/klauspost/reedsolomon" ) @@ -377,254 +378,35 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) { return volInfo, nil } -// isLeafDirectory - check if a given path is leaf directory. i.e -// there are no more directories inside it. Erasure code backend -// format it means that the parent directory is the actual object name. -func isLeafDirectory(disk StorageAPI, volume, leafPath string) (isLeaf bool) { - var markerPath string - var xlListCount = 1000 // Count page. - for { - fileInfos, eof, err := disk.ListFiles(volume, leafPath, markerPath, false, xlListCount) - if err != nil { - log.WithFields(logrus.Fields{ - "volume": volume, - "leafPath": leafPath, - "markerPath": markerPath, - "recursive": false, - "count": xlListCount, - }).Errorf("ListFiles failed with %s", err) - break - } - for _, fileInfo := range fileInfos { - if fileInfo.Mode.IsDir() { - // Directory found, not a leaf directory, return right here. - return false - } - } - if eof { - break - } - // MarkerPath to get the next set of files. - markerPath = fileInfos[len(fileInfos)-1].Name - } - // Exhausted all the entries, no directories found must be leaf - // return right here. - return true +// isLeafDirectoryXL - check if a given path is leaf directory. i.e +// if it contains file xlMetaV1File +func isLeafDirectoryXL(disk StorageAPI, volume, leafPath string) (isLeaf bool) { + _, err := disk.StatFile(volume, path.Join(leafPath, xlMetaV1File)) + return err == nil } -// extractMetadata - extract xl metadata. -func extractMetadata(disk StorageAPI, volume, path string) (xlMetaV1, error) { - xlMetaV1FilePath := slashpath.Join(path, xlMetaV1File) - // We are not going to read partial data from metadata file, - // read the whole file always. - offset := int64(0) - metadataReader, err := disk.ReadFile(volume, xlMetaV1FilePath, offset) - if err != nil { - log.WithFields(logrus.Fields{ - "volume": volume, - "path": xlMetaV1FilePath, - "offset": offset, - }).Errorf("ReadFile failed with %s", err) - return xlMetaV1{}, err - } - // Close metadata reader. - defer metadataReader.Close() - - metadata, err := xlMetaV1Decode(metadataReader) - if err != nil { - log.WithFields(logrus.Fields{ - "volume": volume, - "path": xlMetaV1FilePath, - "offset": offset, - }).Errorf("xlMetaV1Decode failed with %s", err) - return xlMetaV1{}, err - } - return metadata, nil -} - -// Extract file info from paths. -func extractFileInfo(disk StorageAPI, volume, path string) (FileInfo, error) { - fileInfo := FileInfo{} - fileInfo.Volume = volume - fileInfo.Name = path - - metadata, err := extractMetadata(disk, volume, path) - if err != nil { - log.WithFields(logrus.Fields{ - "volume": volume, - "path": path, - }).Errorf("extractMetadata failed with %s", err) - return FileInfo{}, err - } - fileInfo.Size = metadata.Stat.Size - fileInfo.ModTime = metadata.Stat.ModTime - fileInfo.Mode = os.FileMode(0644) // This is a file already. - return fileInfo, nil -} - -// byFileInfoName is a collection satisfying sort.Interface. -type byFileInfoName []FileInfo - -func (d byFileInfoName) Len() int { return len(d) } -func (d byFileInfoName) Swap(i, j int) { d[i], d[j] = d[j], d[i] } -func (d byFileInfoName) Less(i, j int) bool { return d[i].Name < d[j].Name } - -// ListFiles files at prefix. -func (xl XL) ListFiles(volume, prefix, marker string, recursive bool, count int) (filesInfo []FileInfo, eof bool, err error) { +// ListDir - return all the entries at the given directory path. +// If an entry is a directory it will be returned with a trailing "/". +func (xl XL) ListDir(volume, dirPath string) (entries []string, err error) { if !isValidVolname(volume) { - return nil, true, errInvalidArgument + return nil, errInvalidArgument } - - // TODO: Fix: If readQuorum is met, its assumed that disks are in consistent file list. - // exclude disks those are not in consistent file list and check count of remaining disks - // are met readQuorum. - - // Treat empty file list specially - emptyCount := 0 - errCount := 0 - successCount := 0 - - var firstFilesInfo []FileInfo - var firstEOF bool - var firstErr error + // FIXME: need someway to figure out which disk has the latest namespace + // so that Listing can be done there. One option is always do Listing from + // the "local" disk - if it is down user has to list using another XL server. + // This way user knows from which disk he is listing from. for _, disk := range xl.storageDisks { - if filesInfo, eof, err = listFiles(disk, volume, prefix, marker, recursive, count); err == nil { - // we need to return first successful result - if firstFilesInfo == nil { - firstFilesInfo = filesInfo - firstEOF = eof + if entries, err = disk.ListDir(volume, dirPath); err != nil { + continue + } + for i, entry := range entries { + if strings.HasSuffix(entry, slashSeparator) && isLeafDirectoryXL(disk, volume, path.Join(dirPath, entry)) { + entries[i] = strings.TrimSuffix(entry, slashSeparator) } - - if len(filesInfo) == 0 { - emptyCount++ - } else { - successCount++ - } - } else { - if firstErr == nil { - firstErr = err - } - - errCount++ } } - - if errCount >= xl.readQuorum { - return nil, false, firstErr - } else if successCount >= xl.readQuorum { - return firstFilesInfo, firstEOF, nil - } else if emptyCount >= xl.readQuorum { - return []FileInfo{}, true, nil - } - - return nil, false, errReadQuorum -} - -func listFiles(disk StorageAPI, volume, prefix, marker string, recursive bool, count int) (filesInfo []FileInfo, eof bool, err error) { - var fsFilesInfo []FileInfo - var markerPath = marker - if marker != "" { - isLeaf := isLeafDirectory(disk, volume, retainSlash(marker)) - if isLeaf { - // For leaf for now we just point to the first block, make it - // dynamic in future based on the availability of storage disks. - markerPath = slashpath.Join(marker, xlMetaV1File) - } - } - - // Loop and capture the proper fileInfos, requires extraction and - // separation of XL related metadata information. - for { - fsFilesInfo, eof, err = disk.ListFiles(volume, prefix, markerPath, recursive, count) - if err != nil { - log.WithFields(logrus.Fields{ - "volume": volume, - "prefix": prefix, - "marker": markerPath, - "recursive": recursive, - "count": count, - }).Errorf("ListFiles failed with %s", err) - return nil, true, err - } - for _, fsFileInfo := range fsFilesInfo { - // Skip metadata files. - if strings.HasSuffix(fsFileInfo.Name, xlMetaV1File) { - continue - } - var fileInfo FileInfo - var isLeaf bool - if fsFileInfo.Mode.IsDir() { - isLeaf = isLeafDirectory(disk, volume, fsFileInfo.Name) - } - if isLeaf || !fsFileInfo.Mode.IsDir() { - // Extract the parent of leaf directory or file to get the - // actual name. - path := slashpath.Dir(fsFileInfo.Name) - fileInfo, err = extractFileInfo(disk, volume, path) - if err != nil { - log.WithFields(logrus.Fields{ - "volume": volume, - "path": path, - }).Errorf("extractFileInfo failed with %s", err) - // For a leaf directory, if err is FileNotFound then - // perhaps has a missing metadata. Ignore it and let - // healing finish its job it will become available soon. - if err == errFileNotFound { - continue - } - // For any other errors return to the caller. - return nil, true, err - } - } else { - fileInfo = fsFileInfo - } - filesInfo = append(filesInfo, fileInfo) - count-- - if count == 0 { - break - } - } - if len(fsFilesInfo) > 0 { - // markerPath for the next disk.ListFiles() iteration. - markerPath = fsFilesInfo[len(fsFilesInfo)-1].Name - } - if count == 0 && recursive && !strings.HasSuffix(markerPath, xlMetaV1File) { - // If last entry is not file.json then loop once more to check if we have reached eof. - fsFilesInfo, eof, err = disk.ListFiles(volume, prefix, markerPath, recursive, 1) - if err != nil { - log.WithFields(logrus.Fields{ - "volume": volume, - "prefix": prefix, - "marker": markerPath, - "recursive": recursive, - "count": 1, - }).Errorf("ListFiles failed with %s", err) - return nil, true, err - } - if !eof { - // file.N and file.json are always in pairs and hence this - // entry has to be file.json. If not better to manually investigate - // and fix it. - // For the next ListFiles() call we can safely assume that the - // marker is "object/file.json" - if !strings.HasSuffix(fsFilesInfo[0].Name, xlMetaV1File) { - log.WithFields(logrus.Fields{ - "volume": volume, - "prefix": prefix, - "fsFileInfo.Name": fsFilesInfo[0].Name, - }).Errorf("ListFiles failed with %s, expected %s to be a file.json file.", err, fsFilesInfo[0].Name) - return nil, true, errUnexpected - } - } - } - if count == 0 || eof { - break - } - } - // Sort to make sure we sort entries back properly. - sort.Sort(byFileInfoName(filesInfo)) - return filesInfo, eof, nil + return } // Object API. diff --git a/xl-objects-multipart.go b/xl-objects-multipart.go index 70db1492d..b4bd3c8ae 100644 --- a/xl-objects-multipart.go +++ b/xl-objects-multipart.go @@ -62,7 +62,7 @@ func partNumToPartFileName(partNum int) string { // ListMultipartUploads - list multipart uploads. func (xl xlObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { - return listMultipartUploadsCommon(xl.storage, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) + return listMultipartUploadsCommon(xl, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) } // NewMultipartUpload - initialize a new multipart upload, returns a unique id. diff --git a/xl-objects.go b/xl-objects.go index e6f147a31..f49fedd58 100644 --- a/xl-objects.go +++ b/xl-objects.go @@ -22,7 +22,9 @@ import ( "path" "path/filepath" "strings" + "sync" + "github.com/Sirupsen/logrus" "github.com/minio/minio/pkg/mimedb" ) @@ -33,9 +35,17 @@ const ( // xlObjects - Implements fs object layer. type xlObjects struct { - storage StorageAPI + storage StorageAPI + listObjectMap map[listParams][]*treeWalker + listObjectMapMutex *sync.Mutex } +func isLeafDirectory(disk StorageAPI, volume, leafPath string) bool { + _, err := disk.StatFile(volume, pathJoin(leafPath, multipartMetaFile)) + return err == nil +} + +// FIXME: constructor should return a pointer. // newXLObjects - initialize new xl object layer. func newXLObjects(exportPaths ...string) (ObjectLayer, error) { storage, err := newXL(exportPaths...) @@ -47,7 +57,11 @@ func newXLObjects(exportPaths ...string) (ObjectLayer, error) { cleanupAllTmpEntries(storage) // Return successfully initialized object layer. - return xlObjects{storage}, nil + return xlObjects{ + storage: storage, + listObjectMap: make(map[listParams][]*treeWalker), + listObjectMapMutex: &sync.Mutex{}, + }, nil } /// Bucket operations @@ -226,13 +240,17 @@ func (xl xlObjects) DeleteObject(bucket, object string) error { return nil } -// TODO - support non-recursive case, figure out file size for files uploaded using multipart. - func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket} } + // Verify whether the bucket exists. + if isExist, err := isBucketExist(xl.storage, bucket); err != nil { + return ListObjectsInfo{}, err + } else if !isExist { + return ListObjectsInfo{}, BucketNotFound{Bucket: bucket} + } if !IsValidObjectPrefix(prefix) { return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix} } @@ -261,64 +279,91 @@ func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey if delimiter == slashSeparator { recursive = false } - var allFileInfos, fileInfos []FileInfo + + walker := lookupTreeWalk(xl, listParams{bucket, recursive, marker, prefix}) + if walker == nil { + walker = startTreeWalk(xl, bucket, prefix, marker, recursive) + } + var fileInfos []FileInfo var eof bool var err error - for { - fileInfos, eof, err = xl.storage.ListFiles(bucket, prefix, marker, recursive, maxKeys) - if err != nil { - return ListObjectsInfo{}, toObjectErr(err, bucket) + var nextMarker string + log.Debugf("Reading from the tree walk channel has begun.") + for i := 0; i < maxKeys; { + walkResult, ok := <-walker.ch + if !ok { + // Closed channel. + eof = true + break } - for _, fileInfo := range fileInfos { - // FIXME: use fileInfo.Mode.IsDir() instead after fixing the bug in - // XL listing which is not reseting the Mode to 0 for leaf dirs. - if strings.HasSuffix(fileInfo.Name, slashSeparator) && isLeafDirectory(xl.storage, bucket, fileInfo.Name) { - // Set the Mode to a "regular" file. - var info MultipartObjectInfo - info, err = getMultipartObjectInfo(xl.storage, bucket, fileInfo.Name) - if err == nil { - fileInfo.Mode = 0 - - fileInfo.Name = strings.TrimSuffix(fileInfo.Name, slashSeparator) - fileInfo.Size = info.Size - fileInfo.ModTime = info.ModTime - fileInfo.MD5Sum = info.MD5Sum - } else if err != errFileNotFound { - return ListObjectsInfo{}, toObjectErr(err, bucket, fileInfo.Name) - } - allFileInfos = append(allFileInfos, fileInfo) - maxKeys-- - continue - } else if strings.HasSuffix(fileInfo.Name, multipartMetaFile) { - fileInfo.Name = path.Dir(fileInfo.Name) - var info MultipartObjectInfo - info, err = getMultipartObjectInfo(xl.storage, bucket, fileInfo.Name) - if err != nil { - return ListObjectsInfo{}, toObjectErr(err, bucket, fileInfo.Name) - } - fileInfo.Size = info.Size - fileInfo.ModTime = info.ModTime - fileInfo.MD5Sum = info.MD5Sum - allFileInfos = append(allFileInfos, fileInfo) - maxKeys-- - continue - } else if strings.HasSuffix(fileInfo.Name, multipartSuffix) { - continue + // For any walk error return right away. + if walkResult.err != nil { + log.WithFields(logrus.Fields{ + "bucket": bucket, + "prefix": prefix, + "marker": marker, + "recursive": recursive, + }).Debugf("Walk resulted in an error %s", walkResult.err) + // File not found is a valid case. + if walkResult.err == errFileNotFound { + return ListObjectsInfo{}, nil } - allFileInfos = append(allFileInfos, fileInfo) - maxKeys-- + return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix) } - if maxKeys == 0 { - break - } - if eof { + fileInfo := walkResult.fileInfo + if strings.HasSuffix(fileInfo.Name, slashSeparator) && isLeafDirectory(xl.storage, bucket, fileInfo.Name) { + // Code flow reaches here for non-recursive listing. + + var info MultipartObjectInfo + info, err = getMultipartObjectInfo(xl.storage, bucket, fileInfo.Name) + if err == nil { + // Set the Mode to a "regular" file. + fileInfo.Mode = 0 + fileInfo.Name = strings.TrimSuffix(fileInfo.Name, slashSeparator) + fileInfo.Size = info.Size + fileInfo.MD5Sum = info.MD5Sum + fileInfo.ModTime = info.ModTime + } else if err != errFileNotFound { + return ListObjectsInfo{}, toObjectErr(err, bucket, fileInfo.Name) + } + } else if strings.HasSuffix(fileInfo.Name, multipartMetaFile) { + // Code flow reaches here for recursive listing. + + // for object/00000.minio.multipart, strip the base name + // and calculate get the object size. + fileInfo.Name = path.Dir(fileInfo.Name) + var info MultipartObjectInfo + info, err = getMultipartObjectInfo(xl.storage, bucket, fileInfo.Name) + if err != nil { + return ListObjectsInfo{}, toObjectErr(err, bucket, fileInfo.Name) + } + fileInfo.Size = info.Size + } else if strings.HasSuffix(fileInfo.Name, multipartSuffix) { + // Ignore the part files like object/00001.minio.multipart + continue + } + nextMarker = fileInfo.Name + fileInfos = append(fileInfos, fileInfo) + if walkResult.end { + eof = true break } + i++ + } + params := listParams{bucket, recursive, nextMarker, prefix} + log.WithFields(logrus.Fields{ + "bucket": params.bucket, + "recursive": params.recursive, + "marker": params.marker, + "prefix": params.prefix, + }).Debugf("Save the tree walk into map for subsequent requests.") + if !eof { + saveTreeWalk(xl, params, walker) } result := ListObjectsInfo{IsTruncated: !eof} - for _, fileInfo := range allFileInfos { + for _, fileInfo := range fileInfos { // With delimiter set we fill in NextMarker and Prefixes. if delimiter == slashSeparator { result.NextMarker = fileInfo.Name