object: move go-routine listing from posix to objectLayer. (#1491)

This commit is contained in:
Krishna Srinivas 2016-05-06 01:21:56 +05:30 committed by Harshavardhana
parent 46680788f9
commit 247e835d7b
16 changed files with 767 additions and 1007 deletions

View File

@ -24,7 +24,7 @@ import (
// ListMultipartUploads - list multipart uploads.
func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
return listMultipartUploadsCommon(fs.storage, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
return listMultipartUploadsCommon(fs, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
}
// NewMultipartUpload - initialize a new multipart upload, returns a unique id.
@ -100,8 +100,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
}
// Cleanup all the parts.
recursive := false
if err = cleanupUploadedParts(fs.storage, mpartMetaPrefix, bucket, object, uploadID, recursive); err != nil {
if err = cleanupUploadedParts(fs.storage, mpartMetaPrefix, bucket, object, uploadID); err != nil {
return "", err
}

View File

@ -20,15 +20,20 @@ import (
"io"
"path/filepath"
"strings"
"sync"
"github.com/Sirupsen/logrus"
"github.com/minio/minio/pkg/mimedb"
)
// fsObjects - Implements fs object layer.
type fsObjects struct {
storage StorageAPI
storage StorageAPI
listObjectMap map[listParams][]*treeWalker
listObjectMapMutex *sync.Mutex
}
// FIXME: constructor should return a pointer.
// newFSObjects - initialize new fs object layer.
func newFSObjects(exportPath string) (ObjectLayer, error) {
var storage StorageAPI
@ -51,7 +56,11 @@ func newFSObjects(exportPath string) (ObjectLayer, error) {
cleanupAllTmpEntries(storage)
// Return successfully initialized object layer.
return fsObjects{storage}, nil
return fsObjects{
storage: storage,
listObjectMap: make(map[listParams][]*treeWalker),
listObjectMapMutex: &sync.Mutex{},
}, nil
}
/// Bucket operations
@ -151,6 +160,12 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
if !IsValidBucketName(bucket) {
return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket}
}
// Verify whether the bucket exists.
if isExist, err := isBucketExist(fs.storage, bucket); err != nil {
return ListObjectsInfo{}, err
} else if !isExist {
return ListObjectsInfo{}, BucketNotFound{Bucket: bucket}
}
if !IsValidObjectPrefix(prefix) {
return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix}
}
@ -170,17 +185,71 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
}
}
if maxKeys == 0 {
return ListObjectsInfo{}, nil
}
// Over flowing count - reset to maxObjectList.
if maxKeys < 0 || maxKeys > maxObjectList {
maxKeys = maxObjectList
}
// Default is recursive, if delimiter is set then list non recursive.
recursive := true
if delimiter == slashSeparator {
recursive = false
}
fileInfos, eof, err := fs.storage.ListFiles(bucket, prefix, marker, recursive, maxKeys)
if err != nil {
return ListObjectsInfo{}, toObjectErr(err, bucket)
walker := lookupTreeWalk(fs, listParams{bucket, recursive, marker, prefix})
if walker == nil {
walker = startTreeWalk(fs, bucket, prefix, marker, recursive)
}
if maxKeys == 0 {
return ListObjectsInfo{}, nil
var fileInfos []FileInfo
var eof bool
var nextMarker string
log.Debugf("Reading from the tree walk channel has begun.")
for i := 0; i < maxKeys; {
walkResult, ok := <-walker.ch
if !ok {
// Closed channel.
eof = true
break
}
// For any walk error return right away.
if walkResult.err != nil {
log.WithFields(logrus.Fields{
"bucket": bucket,
"prefix": prefix,
"marker": marker,
"recursive": recursive,
}).Debugf("Walk resulted in an error %s", walkResult.err)
// File not found is a valid case.
if walkResult.err == errFileNotFound {
return ListObjectsInfo{}, nil
}
return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix)
}
fileInfo := walkResult.fileInfo
nextMarker = fileInfo.Name
fileInfos = append(fileInfos, fileInfo)
if walkResult.end {
eof = true
break
}
i++
}
if len(fileInfos) == 0 {
eof = true
}
params := listParams{bucket, recursive, nextMarker, prefix}
log.WithFields(logrus.Fields{
"bucket": params.bucket,
"recursive": params.recursive,
"marker": params.marker,
"prefix": params.prefix,
}).Debugf("Save the tree walk into map for subsequent requests.")
if !eof {
saveTreeWalk(fs, params, walker)
}
result := ListObjectsInfo{IsTruncated: !eof}

View File

@ -535,6 +535,7 @@ func TestListObjects(t *testing.T) {
t.Errorf("Test %d: Expected to pass, but failed with: <ERROR> %s", i+1, err.Error())
}
if err == nil && !testCase.shouldPass {
t.Log(result)
t.Errorf("Test %d: Expected to fail with <ERROR> \"%s\", but passed instead", i+1, testCase.err.Error())
}
// Failed as expected, but does it fail for the expected reason.
@ -543,11 +544,15 @@ func TestListObjects(t *testing.T) {
t.Errorf("Test %d: Expected to fail with error \"%s\", but instead failed with error \"%s\" instead", i+1, testCase.err.Error(), err.Error())
}
}
// Since there are cases for which ListObjects fails, this is necessary.
// Test passes as expected, but the output values are verified for correctness here.
// Since there are cases for which ListObjects fails, this is
// necessary. Test passes as expected, but the output values
// are verified for correctness here.
if err == nil && testCase.shouldPass {
// The length of the expected ListObjectsResult.Objects should match in both expected result from test cases and in the output.
// On failure calling t.Fatalf, otherwise it may lead to index out of range error in assertion following this.
// The length of the expected ListObjectsResult.Objects
// should match in both expected result from test cases
// and in the output. On failure calling t.Fatalf,
// otherwise it may lead to index out of range error in
// assertion following this.
if len(testCase.result.Objects) != len(result.Objects) {
t.Fatalf("Test %d: Expected number of object in the result to be '%d', but found '%d' objects instead", i+1, len(testCase.result.Objects), len(result.Objects))
}

View File

@ -23,9 +23,9 @@ import (
"io"
"io/ioutil"
"path"
"sort"
"strconv"
"strings"
"sync"
"github.com/Sirupsen/logrus"
"github.com/skyrings/skyring-common/tools/uuid"
@ -164,37 +164,24 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa
// Cleanup all temp entries inside tmpMetaPrefix directory, upon server initialization.
func cleanupAllTmpEntries(storage StorageAPI) error {
recursive := true // Recursively delete all files inside 'tmp' directory.
return cleanupUploadedParts(storage, tmpMetaPrefix, "", "", "", recursive)
return cleanupUploadedParts(storage, tmpMetaPrefix, "", "", "")
}
// Wrapper to which removes all the uploaded parts after a successful
// complete multipart upload.
func cleanupUploadedParts(storage StorageAPI, prefix, bucket, object, uploadID string, recursive bool) error {
markerPath := ""
var wg = &sync.WaitGroup{}
for {
uploadIDPath := path.Join(prefix, bucket, object, uploadID)
fileInfos, eof, err := storage.ListFiles(minioMetaBucket, uploadIDPath, markerPath, recursive, 1000)
if err != nil {
return toObjectErr(err, bucket, object)
}
// Loop through all files and delete each in go-routine, while
// adding each operation to a wait group.
for _, fileInfo := range fileInfos {
wg.Add(1)
go func(fi FileInfo) {
defer wg.Done()
storage.DeleteFile(minioMetaBucket, fi.Name)
}(fileInfo)
}
if eof {
break
}
markerPath = fileInfos[len(fileInfos)-1].Name
func cleanupUploadedParts(storage StorageAPI, prefix, bucket, object, uploadID string) error {
multipartDir := path.Join(prefix, bucket, object)
entries, err := storage.ListDir(minioMetaBucket, multipartDir)
if err != nil {
return err
}
for _, entry := range entries {
if strings.HasPrefix(entry, uploadID) {
if err = storage.DeleteFile(minioMetaBucket, path.Join(multipartDir, entry)); err != nil {
return err
}
}
}
// Wait for all the routines.
wg.Wait()
return nil
}
@ -213,47 +200,40 @@ func abortMultipartUploadCommon(storage StorageAPI, bucket, object, uploadID str
} else if !status {
return InvalidUploadID{UploadID: uploadID}
}
recursive := false // Cleanup all the top level files and folders matching uploadID.
return cleanupUploadedParts(storage, mpartMetaPrefix, bucket, object, uploadID, recursive)
return cleanupUploadedParts(storage, mpartMetaPrefix, bucket, object, uploadID)
}
// listLeafEntries - lists all entries if a given prefixPath is a leaf
// directory, returns error if any - returns empty list if prefixPath
// is not a leaf directory.
func listLeafEntries(storage StorageAPI, prefixPath string) (entries []FileInfo, e error) {
var markerPath string
for {
fileInfos, eof, err := storage.ListFiles(minioMetaBucket, prefixPath, markerPath, false, 1000)
if err != nil {
log.WithFields(logrus.Fields{
"prefixPath": prefixPath,
"markerPath": markerPath,
}).Errorf("%s", err)
return nil, err
}
for _, fileInfo := range fileInfos {
// Set marker for next batch of ListFiles.
markerPath = fileInfo.Name
if fileInfo.Mode.IsDir() {
// If a directory is found, doesn't return anything.
return nil, nil
}
fileName := path.Base(fileInfo.Name)
if !strings.Contains(fileName, ".") {
// Skip the entry if it is of the pattern bucket/object/uploadID.partNum.md5sum
// and retain entries of the pattern bucket/object/uploadID
entries = append(entries, fileInfo)
}
}
if eof {
break
func listLeafEntries(storage StorageAPI, prefixPath string) (entries []string, err error) {
entries, err = storage.ListDir(minioMetaBucket, prefixPath)
if err != nil {
return nil, err
}
for _, entry := range entries {
if strings.HasSuffix(entry, slashSeparator) {
return nil, nil
}
}
return entries, nil
}
// listMetaBucketMultipartFiles - list all files at a given prefix inside minioMetaBucket.
func listMetaBucketMultipartFiles(storage StorageAPI, prefixPath string, markerPath string, recursive bool, maxKeys int) (allFileInfos []FileInfo, eof bool, err error) {
func listMetaBucketMultipartFiles(layer ObjectLayer, prefixPath string, markerPath string, recursive bool, maxKeys int) (fileInfos []FileInfo, eof bool, err error) {
var storage StorageAPI
switch l := layer.(type) {
case fsObjects:
storage = l.storage
case xlObjects:
storage = l.storage
}
walker := lookupTreeWalk(layer, listParams{minioMetaBucket, recursive, markerPath, prefixPath})
if walker == nil {
walker = startTreeWalk(layer, minioMetaBucket, prefixPath, markerPath, recursive)
}
// newMaxKeys tracks the size of entries which are going to be
// returned back.
var newMaxKeys int
@ -261,64 +241,53 @@ func listMetaBucketMultipartFiles(storage StorageAPI, prefixPath string, markerP
// Following loop gathers and filters out special files inside
// minio meta volume.
for {
var fileInfos []FileInfo
// List files up to maxKeys-newMaxKeys, since we are skipping
// entries for special files.
fileInfos, eof, err = storage.ListFiles(minioMetaBucket, prefixPath, markerPath, recursive, maxKeys-newMaxKeys)
if err != nil {
log.WithFields(logrus.Fields{
"prefixPath": prefixPath,
"markerPath": markerPath,
"recursive": recursive,
"maxKeys": maxKeys,
}).Errorf("%s", err)
return nil, true, err
walkResult, ok := <-walker.ch
if !ok {
// Closed channel.
eof = true
break
}
// Loop through and validate individual file.
for _, fi := range fileInfos {
var entries []FileInfo
if fi.Mode.IsDir() {
// List all the entries if fi.Name is a leaf directory, if
// fi.Name is not a leaf directory then the resulting
// entries are empty.
entries, err = listLeafEntries(storage, fi.Name)
// For any walk error return right away.
if walkResult.err != nil {
log.WithFields(logrus.Fields{
"bucket": minioMetaBucket,
"prefix": prefixPath,
"marker": markerPath,
"recursive": recursive,
}).Debugf("Walk resulted in an error %s", walkResult.err)
// File not found is a valid case.
if walkResult.err == errFileNotFound {
return nil, true, nil
}
return nil, false, toObjectErr(walkResult.err, minioMetaBucket, prefixPath)
}
fi := walkResult.fileInfo
var entries []string
if fi.Mode.IsDir() {
// List all the entries if fi.Name is a leaf directory, if
// fi.Name is not a leaf directory then the resulting
// entries are empty.
entries, err = listLeafEntries(storage, fi.Name)
if err != nil {
log.WithFields(logrus.Fields{
"prefixPath": fi.Name,
}).Errorf("%s", err)
return nil, false, err
}
}
if len(entries) > 0 {
// We reach here for non-recursive case and a leaf entry.
sort.Strings(entries)
for _, entry := range entries {
if strings.ContainsRune(entry, '.') {
continue
}
var fileInfo FileInfo
fileInfo, err = storage.StatFile(minioMetaBucket, path.Join(fi.Name, entry))
if err != nil {
log.WithFields(logrus.Fields{
"prefixPath": fi.Name,
}).Errorf("%s", err)
return nil, false, err
}
}
// Set markerPath for next batch of listing.
markerPath = fi.Name
if len(entries) > 0 {
// We reach here for non-recursive case and a leaf entry.
for _, entry := range entries {
allFileInfos = append(allFileInfos, entry)
newMaxKeys++
// If we have reached the maxKeys, it means we have listed
// everything that was requested. Return right here.
if newMaxKeys == maxKeys {
// Return values:
// allFileInfos : "maxKeys" number of entries.
// eof : eof returned by fs.storage.ListFiles()
// error : nil
return
}
}
} else {
// We reach here for a non-recursive case non-leaf entry
// OR recursive case with fi.Name matching pattern bucket/object/uploadID[.partNum.md5sum]
if !fi.Mode.IsDir() { // Do not skip non-recursive case directory entries.
// Skip files matching pattern bucket/object/uploadID.partNum.md5sum
// and retain files matching pattern bucket/object/uploadID
specialFile := path.Base(fi.Name)
if strings.Contains(specialFile, ".") {
// Contains partnumber and md5sum info, skip this.
continue
}
}
allFileInfos = append(allFileInfos, fi)
fileInfos = append(fileInfos, fileInfo)
newMaxKeys++
// If we have reached the maxKeys, it means we have listed
// everything that was requested. Return right here.
@ -330,20 +299,39 @@ func listMetaBucketMultipartFiles(storage StorageAPI, prefixPath string, markerP
return
}
}
}
// If we have reached eof then we break out.
if eof {
break
} else {
// We reach here for a non-recursive case non-leaf entry
// OR recursive case with fi.Name matching pattern bucket/object/uploadID[.partNum.md5sum]
if !fi.Mode.IsDir() { // Do not skip non-recursive case directory entries.
// Skip files matching pattern bucket/object/uploadID.partNum.md5sum
// and retain files matching pattern bucket/object/uploadID
specialFile := path.Base(fi.Name)
if strings.Contains(specialFile, ".") {
// Contains partnumber and md5sum info, skip this.
continue
}
}
fileInfos = append(fileInfos, fi)
newMaxKeys++
// If we have reached the maxKeys, it means we have listed
// everything that was requested. Return right here.
if newMaxKeys == maxKeys {
// Return values:
// allFileInfos : "maxKeys" number of entries.
// eof : eof returned by fs.storage.ListFiles()
// error : nil
return
}
}
}
// Return entries here.
return allFileInfos, eof, nil
return fileInfos, eof, nil
}
// listMultipartUploadsCommon - lists all multipart uploads, common
// function for both object layers.
func listMultipartUploadsCommon(storage StorageAPI, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
func listMultipartUploadsCommon(layer ObjectLayer, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
result := ListMultipartsInfo{}
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
@ -401,7 +389,7 @@ func listMultipartUploadsCommon(storage StorageAPI, bucket, prefix, keyMarker, u
}
// List all the multipart files at prefixPath, starting with marker keyMarkerPath.
fileInfos, eof, err := listMetaBucketMultipartFiles(storage, multipartPrefixPath, multipartMarkerPath, recursive, maxUploads)
fileInfos, eof, err := listMetaBucketMultipartFiles(layer, multipartPrefixPath, multipartMarkerPath, recursive, maxUploads)
if err != nil {
log.WithFields(logrus.Fields{
"prefixPath": multipartPrefixPath,
@ -446,61 +434,60 @@ func listMultipartUploadsCommon(storage StorageAPI, bucket, prefix, keyMarker, u
func listObjectPartsCommon(storage StorageAPI, bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket}
return ListPartsInfo{}, (BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object}
return ListPartsInfo{}, (ObjectNameInvalid{Bucket: bucket, Object: object})
}
if status, err := isUploadIDExists(storage, bucket, object, uploadID); err != nil {
return ListPartsInfo{}, err
} else if !status {
return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID}
return ListPartsInfo{}, (InvalidUploadID{UploadID: uploadID})
}
result := ListPartsInfo{}
var markerPath string
nextPartNumberMarker := 0
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
// Figure out the marker for the next subsequent calls, if the
// partNumberMarker is already set.
if partNumberMarker > 0 {
partNumberMarkerPath := uploadIDPath + "." + fmt.Sprintf("%.5d", partNumberMarker) + "."
fileInfos, _, err := storage.ListFiles(minioMetaBucket, partNumberMarkerPath, "", false, 1)
if err != nil {
return result, toObjectErr(err, minioMetaBucket, partNumberMarkerPath)
}
if len(fileInfos) == 0 {
return result, InvalidPart{}
}
markerPath = fileInfos[0].Name
}
uploadIDPrefix := uploadIDPath + "."
fileInfos, eof, err := storage.ListFiles(minioMetaBucket, uploadIDPrefix, markerPath, false, maxParts)
entries, err := storage.ListDir(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
if err != nil {
return result, InvalidPart{}
return result, err
}
for _, fileInfo := range fileInfos {
fileName := path.Base(fileInfo.Name)
splitResult := strings.Split(fileName, ".")
partNum, err := strconv.Atoi(splitResult[1])
if err != nil {
return result, err
sort.Strings(entries)
var newEntries []string
for _, entry := range entries {
if !strings.Contains(entry, ".") {
continue
}
if !strings.HasPrefix(entry, uploadID) {
continue
}
newEntries = append(newEntries, entry)
}
idx := sort.SearchStrings(newEntries, fmt.Sprintf("%s.%.5d.", uploadID, partNumberMarker+1))
newEntries = newEntries[idx:]
count := maxParts
for _, entry := range newEntries {
fi, err := storage.StatFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, entry))
splitEntry := strings.Split(entry, ".")
partNum, err := strconv.Atoi(splitEntry[1])
if err != nil {
return ListPartsInfo{}, err
}
md5sum := splitResult[2]
result.Parts = append(result.Parts, partInfo{
PartNumber: partNum,
LastModified: fileInfo.ModTime,
ETag: md5sum,
Size: fileInfo.Size,
LastModified: fi.ModTime,
ETag: splitEntry[2],
Size: fi.Size,
})
nextPartNumberMarker = partNum
count--
if count == 0 {
break
}
}
if len(newEntries) > len(result.Parts) {
result.IsTruncated = true
}
result.Bucket = bucket
result.Object = object
result.UploadID = uploadID
result.PartNumberMarker = partNumberMarker
result.NextPartNumberMarker = nextPartNumberMarker
result.MaxParts = maxParts
result.IsTruncated = !eof
return result, nil
}

View File

@ -1,228 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"os"
"path"
"sort"
"strings"
"time"
)
// fsDirent carries directory entries.
type fsDirent struct {
name string
modTime time.Time // On Solaris and older unix distros this is empty.
size int64 // On Solaris and older unix distros this is empty.
mode os.FileMode
}
// IsDir - returns true if fsDirent is a directory
func (ent fsDirent) IsDir() bool {
return ent.mode.IsDir()
}
// IsSymlink - returns true if fsDirent is a symbolic link
func (ent fsDirent) IsSymlink() bool {
return ent.mode&os.ModeSymlink == os.ModeSymlink
}
// IsRegular - returns true if fsDirent is a regular file
func (ent fsDirent) IsRegular() bool {
return ent.mode.IsRegular()
}
// byDirentName is a collection satisfying sort.Interface.
type byDirentName []fsDirent
func (d byDirentName) Len() int { return len(d) }
func (d byDirentName) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
func (d byDirentName) Less(i, j int) bool { return d[i].name < d[j].name }
// Using sort.Search() internally to jump to the file entry containing
// the prefix.
func searchDirents(dirents []fsDirent, x string) int {
processFunc := func(i int) bool {
return dirents[i].name >= x
}
return sort.Search(len(dirents), processFunc)
}
// Tree walk result carries results of tree walking.
type treeWalkResult struct {
fileInfo FileInfo
err error
end bool
}
// Tree walk notify carries a channel which notifies tree walk
// results, additionally it also carries information if treeWalk
// should be timedOut.
type treeWalker struct {
ch <-chan treeWalkResult
timedOut bool
}
// treeWalk walks FS directory tree recursively pushing fileInfo into the channel as and when it encounters files.
func treeWalk(bucketDir, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResult) bool, count *int) bool {
// Example:
// if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively
// called with prefixDir="one/two/three/four/" and marker="five.txt"
// Convert dirent to FileInfo
direntToFileInfo := func(dirent fsDirent) (FileInfo, error) {
fileInfo := FileInfo{}
// Convert to full object name.
fileInfo.Name = path.Join(prefixDir, dirent.name)
if dirent.modTime.IsZero() && dirent.size == 0 {
// ModifiedTime and Size are zero, Stat() and figure out
// the actual values that need to be set.
fi, err := os.Stat(path.Join(bucketDir, prefixDir, dirent.name))
if err != nil {
return FileInfo{}, err
}
// Fill size and modtime.
fileInfo.ModTime = fi.ModTime()
fileInfo.Size = fi.Size()
fileInfo.Mode = fi.Mode()
} else {
// If ModTime or Size are set then use them
// without attempting another Stat operation.
fileInfo.ModTime = dirent.modTime
fileInfo.Size = dirent.size
fileInfo.Mode = dirent.mode
}
if fileInfo.Mode.IsDir() {
// Add "/" suffix again for directories as
fileInfo.Size = 0
fileInfo.Name += "/"
}
return fileInfo, nil
}
var markerBase, markerDir string
if marker != "" {
// Ex: if marker="four/five.txt", markerDir="four/" markerBase="five.txt"
markerSplit := strings.SplitN(marker, slashSeparator, 2)
markerDir = markerSplit[0]
if len(markerSplit) == 2 {
markerDir += slashSeparator
markerBase = markerSplit[1]
}
}
// Entry prefix match function.
prefixMatchFn := func(dirent fsDirent) bool {
if dirent.IsDir() || dirent.IsRegular() {
// Does dirent name has reserved prefixes or suffixes.
hasReserved := hasReservedPrefix(dirent.name) || hasReservedSuffix(dirent.name)
// All dirents which match prefix and do not have reserved
// keywords in them are valid entries.
return strings.HasPrefix(dirent.name, entryPrefixMatch) && !hasReserved && isValidPath(dirent.name)
}
return false
}
// scandir returns entries that begins with entryPrefixMatch
dirents, err := scandir(path.Join(bucketDir, prefixDir), prefixMatchFn, true)
if err != nil {
send(treeWalkResult{err: err})
return false
}
// example:
// If markerDir="four/" searchDirents() returns the index of "four/" in the sorted
// dirents list. We skip all the dirent entries till "four/"
dirents = dirents[searchDirents(dirents, markerDir):]
*count += len(dirents)
for i, dirent := range dirents {
if i == 0 && markerDir == dirent.name && !dirent.IsDir() {
// If the first entry is not a directory
// we need to skip this entry.
*count--
continue
}
if dirent.IsDir() && recursive {
// If the entry is a directory, we will need recurse into it.
markerArg := ""
if dirent.name == markerDir {
// We need to pass "five.txt" as marker only if we are
// recursing into "four/"
markerArg = markerBase
}
*count--
if !treeWalk(bucketDir, path.Join(prefixDir, dirent.name), "", markerArg, recursive, send, count) {
return false
}
continue
}
fileInfo, err := direntToFileInfo(dirent)
if err != nil {
send(treeWalkResult{err: err})
return false
}
*count--
if !send(treeWalkResult{fileInfo: fileInfo}) {
return false
}
}
return true
}
// Initiate a new treeWalk in a goroutine.
func startTreeWalk(fsPath, bucket, prefix, marker string, recursive bool) *treeWalker {
// Example 1
// If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt"
// treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt"
// and entryPrefixMatch=""
// Example 2
// if prefix is "one/two/th" and marker is "one/two/three/four/five.txt"
// treeWalk is called with prefixDir="one/two/" and marker="three/four/five.txt"
// and entryPrefixMatch="th"
ch := make(chan treeWalkResult, fsListLimit)
walkNotify := treeWalker{ch: ch}
entryPrefixMatch := prefix
prefixDir := ""
lastIndex := strings.LastIndex(prefix, slashSeparator)
if lastIndex != -1 {
entryPrefixMatch = prefix[lastIndex+1:]
prefixDir = prefix[:lastIndex+1]
}
count := 0
marker = strings.TrimPrefix(marker, prefixDir)
go func() {
defer close(ch)
send := func(walkResult treeWalkResult) bool {
if count == 0 {
walkResult.end = true
}
timer := time.After(time.Second * 60)
select {
case ch <- walkResult:
return true
case <-timer:
walkNotify.timedOut = true
return false
}
}
bucketDir := path.Join(fsPath, bucket)
treeWalk(bucketDir, prefixDir, entryPrefixMatch, marker, recursive, send, &count)
}()
return &walkNotify
}

View File

@ -22,9 +22,11 @@ import (
"os"
"path"
"runtime"
"sort"
"strings"
"syscall"
"unsafe"
"github.com/Sirupsen/logrus"
)
const (
@ -47,9 +49,8 @@ func clen(n []byte) int {
// parseDirents - inspired from
// https://golang.org/src/syscall/syscall_<os>.go
func parseDirents(dirPath string, buf []byte) []fsDirent {
func parseDirents(dirPath string, buf []byte) (entries []string, err error) {
bufidx := 0
dirents := []fsDirent{}
for bufidx < len(buf) {
dirent := (*syscall.Dirent)(unsafe.Pointer(&buf[bufidx]))
// On non-Linux operating systems for rec length of zero means
@ -58,7 +59,7 @@ func parseDirents(dirPath string, buf []byte) []fsDirent {
break
}
bufidx += int(dirent.Reclen)
// Skip dirents if they are absent in directory.
// Skip if they are absent in directory.
if isEmptyDirent(dirent) {
continue
}
@ -69,56 +70,57 @@ func parseDirents(dirPath string, buf []byte) []fsDirent {
continue
}
var mode os.FileMode
switch dirent.Type {
case syscall.DT_BLK, syscall.DT_WHT:
mode = os.ModeDevice
case syscall.DT_CHR:
mode = os.ModeDevice | os.ModeCharDevice
case syscall.DT_DIR:
mode = os.ModeDir
case syscall.DT_FIFO:
mode = os.ModeNamedPipe
case syscall.DT_LNK:
mode = os.ModeSymlink
entries = append(entries, name+slashSeparator)
case syscall.DT_REG:
mode = 0
case syscall.DT_SOCK:
mode = os.ModeSocket
entries = append(entries, name)
case syscall.DT_UNKNOWN:
// On Linux XFS does not implement d_type for on disk
// format << v5. Fall back to Stat().
if fi, err := os.Stat(path.Join(dirPath, name)); err == nil {
mode = fi.Mode()
var fi os.FileInfo
if fi, err = os.Stat(path.Join(dirPath, name)); err == nil {
if fi.IsDir() {
entries = append(entries, fi.Name()+slashSeparator)
} else if fi.Mode().IsRegular() {
entries = append(entries, fi.Name())
}
} else {
// Caller listing would fail, if Stat failed but we
// won't crash the server.
mode = 0xffffffff
// This is unexpected.
return
}
default:
// Skip entries which are not file or directory.
// FIXME: should we handle symlinks?
continue
}
dirents = append(dirents, fsDirent{
name: name,
mode: mode,
})
}
return dirents
return
}
// scans the directory dirPath, calling filter() on each directory
// entry. Entries for which filter() returns true are stored, lexically
// sorted using sort.Sort(). If filter is NULL, all entries are selected.
// If namesOnly is true, dirPath is not appended into entry name.
func scandir(dirPath string, filter func(fsDirent) bool, namesOnly bool) ([]fsDirent, error) {
// Return all the entries at the directory dirPath.
func readDir(dirPath string) (entries []string, err error) {
buf := make([]byte, readDirentBufSize)
d, err := os.Open(dirPath)
if err != nil {
log.WithFields(logrus.Fields{
"dirPath": dirPath,
}).Debugf("Open failed with %s", err)
// File is really not found.
if os.IsNotExist(err) {
return nil, errFileNotFound
}
// File path cannot be verified since one of the parents is a file.
if strings.Contains(err.Error(), "not a directory") {
return nil, errFileNotFound
}
return nil, err
}
defer d.Close()
fd := int(d.Fd())
dirents := []fsDirent{}
for {
nbuf, err := syscall.ReadDirent(fd, buf)
if err != nil {
@ -127,20 +129,11 @@ func scandir(dirPath string, filter func(fsDirent) bool, namesOnly bool) ([]fsDi
if nbuf <= 0 {
break
}
for _, dirent := range parseDirents(dirPath, buf[:nbuf]) {
if !namesOnly {
dirent.name = path.Join(dirPath, dirent.name)
}
if dirent.IsDir() {
dirent.name += "/"
dirent.size = 0
}
if filter == nil || filter(dirent) {
dirents = append(dirents, dirent)
}
var tmpEntries []string
if tmpEntries, err = parseDirents(dirPath, buf[:nbuf]); err != nil {
return nil, err
}
entries = append(entries, tmpEntries...)
}
sort.Sort(byDirentName(dirents))
return dirents, nil
return
}

View File

@ -21,22 +21,32 @@ package main
import (
"io"
"os"
"path"
"sort"
"strings"
"github.com/Sirupsen/logrus"
)
// scans the directory dirPath, calling filter() on each directory
// entry. Entries for which filter() returns true are stored, lexically
// sorted using sort.Sort(). If filter is NULL, all entries are selected.
// If namesOnly is true, dirPath is not appended into entry name.
func scandir(dirPath string, filter func(fsDirent) bool, namesOnly bool) ([]fsDirent, error) {
// Return all the entries at the directory dirPath.
func readDir(dirPath string) (entries []string, err error) {
d, err := os.Open(dirPath)
if err != nil {
log.WithFields(logrus.Fields{
"dirPath": dirPath,
}).Debugf("Open failed with %s", err)
// File is really not found.
if os.IsNotExist(err) {
return nil, errFileNotFound
}
// File path cannot be verified since one of the parents is a file.
if strings.Contains(err.Error(), "not a directory") {
return nil, errFileNotFound
}
return nil, err
}
defer d.Close()
var dirents []fsDirent
for {
fis, err := d.Readdir(1000)
if err != nil {
@ -46,25 +56,13 @@ func scandir(dirPath string, filter func(fsDirent) bool, namesOnly bool) ([]fsDi
return nil, err
}
for _, fi := range fis {
dirent := fsDirent{
name: fi.Name(),
modTime: fi.ModTime(),
size: fi.Size(),
mode: fi.Mode(),
}
if !namesOnly {
dirent.name = path.Join(dirPath, dirent.name)
}
if dirent.IsDir() {
if fi.Mode().IsDir() {
// append "/" instead of "\" so that sorting is done as expected.
dirent.name += slashSeparator
}
if filter == nil || filter(dirent) {
dirents = append(dirents, dirent)
entries = append(entries, fi.Name()+slashSeparator)
} else if fi.Mode().IsRegular() {
entries = append(entries, fi.Name())
}
}
}
sort.Sort(byDirentName(dirents))
return dirents, nil
return
}

208
posix.go
View File

@ -19,38 +19,24 @@ package main
import (
"io"
"os"
slashpath "path"
"path"
"path/filepath"
"strings"
"sync"
"syscall"
"path"
"github.com/Sirupsen/logrus"
"github.com/minio/minio/pkg/disk"
"github.com/minio/minio/pkg/safe"
)
const (
fsListLimit = 1000
fsMinSpacePercent = 5
)
// listParams - list object params used for list object map
type listParams struct {
bucket string
recursive bool
marker string
prefix string
}
// fsStorage - implements StorageAPI interface.
type fsStorage struct {
diskPath string
minFreeDisk int64
listObjectMap map[listParams][]*treeWalker
listObjectMapMutex *sync.Mutex
diskPath string
minFreeDisk int64
}
// isDirEmpty - returns whether given directory is empty or not.
@ -98,10 +84,8 @@ func newPosix(diskPath string) (StorageAPI, error) {
return nil, syscall.ENOTDIR
}
fs := fsStorage{
diskPath: diskPath,
minFreeDisk: fsMinSpacePercent, // Minimum 5% disk should be free.
listObjectMap: make(map[listParams][]*treeWalker),
listObjectMapMutex: &sync.Mutex{},
diskPath: diskPath,
minFreeDisk: fsMinSpacePercent, // Minimum 5% disk should be free.
}
log.WithFields(logrus.Fields{
"diskPath": diskPath,
@ -158,25 +142,23 @@ func removeDuplicateVols(volsInfo []VolInfo) []VolInfo {
// gets all the unique directories from diskPath.
func getAllUniqueVols(dirPath string) ([]VolInfo, error) {
volumeFn := func(dirent fsDirent) bool {
// Return all directories.
return dirent.IsDir() && isValidVolname(path.Clean(dirent.name))
}
namesOnly := true // Returned are only names.
dirents, err := scandir(dirPath, volumeFn, namesOnly)
entries, err := readDir(dirPath)
if err != nil {
log.WithFields(logrus.Fields{
"dirPath": dirPath,
"namesOnly": true,
}).Debugf("Scandir failed with error %s", err)
"dirPath": dirPath,
}).Debugf("readDir failed with error %s", err)
return nil, err
}
var volsInfo []VolInfo
for _, dirent := range dirents {
fi, err := os.Stat(pathJoin(dirPath, dirent.name))
for _, entry := range entries {
if !strings.HasSuffix(entry, slashSeparator) || !isValidVolname(filepath.Clean(entry)) {
// Skip if entry is neither a directory not a valid volume name.
continue
}
fi, err := os.Stat(pathJoin(dirPath, entry))
if err != nil {
log.WithFields(logrus.Fields{
"path": pathJoin(dirPath, dirent.name),
"path": pathJoin(dirPath, entry),
}).Debugf("Stat failed with error %s", err)
return nil, err
}
@ -372,62 +354,9 @@ func (s fsStorage) DeleteVol(volume string) error {
return nil
}
// Save the goroutine reference in the map
func (s *fsStorage) saveTreeWalk(params listParams, walker *treeWalker) {
s.listObjectMapMutex.Lock()
defer s.listObjectMapMutex.Unlock()
log.WithFields(logrus.Fields{
"bucket": params.bucket,
"recursive": params.recursive,
"marker": params.marker,
"prefix": params.prefix,
}).Debugf("saveTreeWalk has been invoked.")
walkers, _ := s.listObjectMap[params]
walkers = append(walkers, walker)
s.listObjectMap[params] = walkers
log.Debugf("Successfully saved in listObjectMap.")
}
// Lookup the goroutine reference from map
func (s *fsStorage) lookupTreeWalk(params listParams) *treeWalker {
s.listObjectMapMutex.Lock()
defer s.listObjectMapMutex.Unlock()
log.WithFields(logrus.Fields{
"bucket": params.bucket,
"recursive": params.recursive,
"marker": params.marker,
"prefix": params.prefix,
}).Debugf("lookupTreeWalk has been invoked.")
if walkChs, ok := s.listObjectMap[params]; ok {
for i, walkCh := range walkChs {
if !walkCh.timedOut {
newWalkChs := walkChs[i+1:]
if len(newWalkChs) > 0 {
s.listObjectMap[params] = newWalkChs
} else {
delete(s.listObjectMap, params)
}
log.WithFields(logrus.Fields{
"bucket": params.bucket,
"recursive": params.recursive,
"marker": params.marker,
"prefix": params.prefix,
}).Debugf("Found the previous saved listsObjects params.")
return walkCh
}
}
// As all channels are timed out, delete the map entry
delete(s.listObjectMap, params)
}
return nil
}
// List operation.
func (s fsStorage) ListFiles(volume, prefix, marker string, recursive bool, count int) ([]FileInfo, bool, error) {
// ListDir - return all the entries at the given directory path.
// If an entry is a directory it will be returned with a trailing "/".
func (s fsStorage) ListDir(volume, dirPath string) ([]string, error) {
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolumeDir(volume)
if err != nil {
@ -435,100 +364,21 @@ func (s fsStorage) ListFiles(volume, prefix, marker string, recursive bool, coun
"diskPath": s.diskPath,
"volume": volume,
}).Debugf("getVolumeDir failed with %s", err)
return nil, true, err
return nil, err
}
var fileInfos []FileInfo
if marker != "" {
// Verify if marker has prefix.
if marker != "" && !strings.HasPrefix(marker, prefix) {
log.WithFields(logrus.Fields{
"diskPath": s.diskPath,
"marker": marker,
"prefix": prefix,
}).Debugf("Marker doesn't have prefix in common.")
return nil, true, errInvalidArgument
}
}
// Return empty response for a valid request when count is 0.
if count == 0 {
return nil, true, nil
}
// Over flowing count - reset to fsListLimit.
if count < 0 || count > fsListLimit {
count = fsListLimit
}
// Verify if prefix exists.
prefixDir := slashpath.Dir(prefix)
prefixRootDir := slashpath.Join(volumeDir, prefixDir)
if status, err := isDirExist(prefixRootDir); !status {
if err == nil {
// Prefix does not exist, not an error just respond empty list response.
return nil, true, nil
} else if err.Error() == syscall.ENOTDIR.Error() {
// Prefix exists as a file.
return nil, true, nil
}
// Stat a volume entry.
_, err = os.Stat(volumeDir)
if err != nil {
log.WithFields(logrus.Fields{
"volumeDir": volumeDir,
"prefixRootDir": prefixRootDir,
}).Debugf("isDirExist returned an unhandled error %s", err)
// Rest errors should be treated as failure.
return nil, true, err
}
// Maximum 1000 files returned in a single call.
// Further calls will set right marker value to continue reading the rest of the files.
// popTreeWalker returns nil if the call to ListFiles is done for the first time.
// On further calls to ListFiles to retrive more files within the timeout period,
// popTreeWalker returns the channel from which rest of the objects can be retrieved.
walker := s.lookupTreeWalk(listParams{volume, recursive, marker, prefix})
if walker == nil {
walker = startTreeWalk(filepath.ToSlash(s.diskPath), volume, prefix, marker, recursive)
}
nextMarker := ""
log.Debugf("Reading from the tree walk channel has begun.")
for i := 0; i < count; {
walkResult, ok := <-walker.ch
if !ok {
// Closed channel.
return fileInfos, true, nil
"diskPath": s.diskPath,
"volume": volume,
}).Debugf("Stat on the volume failed with %s", err)
if os.IsNotExist(err) {
return nil, errVolumeNotFound
}
// For any walk error return right away.
if walkResult.err != nil {
log.WithFields(logrus.Fields{
"diskPath": s.diskPath,
"volume": volume,
"prefix": prefix,
"marker": marker,
"recursive": recursive,
}).Debugf("Walk resulted in an error %s", walkResult.err)
return nil, true, walkResult.err
}
fileInfo := walkResult.fileInfo
fileInfo.Name = filepath.ToSlash(fileInfo.Name)
fileInfos = append(fileInfos, fileInfo)
// We have listed everything return.
if walkResult.end {
return fileInfos, true, nil
}
nextMarker = fileInfo.Name
i++
return nil, err
}
params := listParams{volume, recursive, nextMarker, prefix}
log.WithFields(logrus.Fields{
"bucket": params.bucket,
"recursive": params.recursive,
"marker": params.marker,
"prefix": params.prefix,
}).Debugf("Save the tree walk into map for subsequent requests.")
s.saveTreeWalk(params, walker)
return fileInfos, false, nil
return readDir(pathJoin(volumeDir, dirPath))
}
// ReadFile - read a file at a given offset.

View File

@ -31,7 +31,7 @@ import (
"github.com/Sirupsen/logrus"
)
type networkFS struct {
type networkStorage struct {
netScheme string
netAddr string
netPath string
@ -109,7 +109,7 @@ func newRPCClient(networkPath string) (StorageAPI, error) {
}
// Initialize network storage.
ndisk := &networkFS{
ndisk := &networkStorage{
netScheme: "http", // TODO: fix for ssl rpc support.
netAddr: netAddr,
netPath: netPath,
@ -122,7 +122,7 @@ func newRPCClient(networkPath string) (StorageAPI, error) {
}
// MakeVol - make a volume.
func (n networkFS) MakeVol(volume string) error {
func (n networkStorage) MakeVol(volume string) error {
reply := GenericReply{}
if err := n.rpcClient.Call("Storage.MakeVolHandler", volume, &reply); err != nil {
log.WithFields(logrus.Fields{
@ -134,7 +134,7 @@ func (n networkFS) MakeVol(volume string) error {
}
// ListVols - List all volumes.
func (n networkFS) ListVols() (vols []VolInfo, err error) {
func (n networkStorage) ListVols() (vols []VolInfo, err error) {
ListVols := ListVolsReply{}
err = n.rpcClient.Call("Storage.ListVolsHandler", "", &ListVols)
if err != nil {
@ -145,7 +145,7 @@ func (n networkFS) ListVols() (vols []VolInfo, err error) {
}
// StatVol - get current Stat volume info.
func (n networkFS) StatVol(volume string) (volInfo VolInfo, err error) {
func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) {
if err = n.rpcClient.Call("Storage.StatVolHandler", volume, &volInfo); err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
@ -156,7 +156,7 @@ func (n networkFS) StatVol(volume string) (volInfo VolInfo, err error) {
}
// DeleteVol - Delete a volume.
func (n networkFS) DeleteVol(volume string) error {
func (n networkStorage) DeleteVol(volume string) error {
reply := GenericReply{}
if err := n.rpcClient.Call("Storage.DeleteVolHandler", volume, &reply); err != nil {
log.WithFields(logrus.Fields{
@ -170,7 +170,7 @@ func (n networkFS) DeleteVol(volume string) error {
// File operations.
// CreateFile - create file.
func (n networkFS) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) {
func (n networkStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) {
writeURL := new(url.URL)
writeURL.Scheme = n.netScheme
writeURL.Host = n.netAddr
@ -205,7 +205,7 @@ func (n networkFS) CreateFile(volume, path string) (writeCloser io.WriteCloser,
}
// StatFile - get latest Stat information for a file at path.
func (n networkFS) StatFile(volume, path string) (fileInfo FileInfo, err error) {
func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) {
if err = n.rpcClient.Call("Storage.StatFileHandler", StatFileArgs{
Vol: volume,
Path: path,
@ -220,7 +220,7 @@ func (n networkFS) StatFile(volume, path string) (fileInfo FileInfo, err error)
}
// ReadFile - reads a file.
func (n networkFS) ReadFile(volume string, path string, offset int64) (reader io.ReadCloser, err error) {
func (n networkStorage) ReadFile(volume string, path string, offset int64) (reader io.ReadCloser, err error) {
readURL := new(url.URL)
readURL.Scheme = n.netScheme
readURL.Host = n.netAddr
@ -247,31 +247,24 @@ func (n networkFS) ReadFile(volume string, path string, offset int64) (reader io
return resp.Body, nil
}
// ListFiles - List all files in a volume.
func (n networkFS) ListFiles(volume, prefix, marker string, recursive bool, count int) (files []FileInfo, eof bool, err error) {
listFilesReply := ListFilesReply{}
if err = n.rpcClient.Call("Storage.ListFilesHandler", ListFilesArgs{
Vol: volume,
Prefix: prefix,
Marker: marker,
Recursive: recursive,
Count: count,
}, &listFilesReply); err != nil {
// ListDir - list all entries at prefix.
func (n networkStorage) ListDir(volume, path string) (entries []string, err error) {
if err = n.rpcClient.Call("Storage.ListDirHandler", ListDirArgs{
Vol: volume,
Path: path,
}, &entries); err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"prefix": prefix,
"marker": marker,
"recursive": recursive,
"count": count,
}).Debugf("Storage.ListFilesHandlers failed with %s", err)
return nil, true, toStorageErr(err)
"volume": volume,
"path": path,
}).Debugf("Storage.ListDirHandlers failed with %s", err)
return nil, toStorageErr(err)
}
// Return successfully unmarshalled results.
return listFilesReply.Files, listFilesReply.EOF, nil
return entries, nil
}
// DeleteFile - Delete a file at path.
func (n networkFS) DeleteFile(volume, path string) (err error) {
func (n networkStorage) DeleteFile(volume, path string) (err error) {
reply := GenericReply{}
if err = n.rpcClient.Call("Storage.DeleteFileHandler", DeleteFileArgs{
Vol: volume,
@ -287,7 +280,7 @@ func (n networkFS) DeleteFile(volume, path string) (err error) {
}
// RenameFile - Rename file.
func (n networkFS) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) {
func (n networkStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) {
reply := GenericReply{}
if err = n.rpcClient.Call("Storage.RenameFileHandler", RenameFileArgs{
SrcVol: srcVolume,

View File

@ -27,21 +27,6 @@ type ListVolsReply struct {
Vols []VolInfo
}
// ListFilesArgs list file args.
type ListFilesArgs struct {
Vol string
Prefix string
Marker string
Recursive bool
Count int
}
// ListFilesReply list file reply.
type ListFilesReply struct {
Files []FileInfo
EOF bool
}
// StatFileArgs stat file args.
type StatFileArgs struct {
Vol string
@ -54,6 +39,12 @@ type DeleteFileArgs struct {
Path string
}
// ListDirArgs list dir args.
type ListDirArgs struct {
Vol string
Path string
}
// RenameFileArgs rename file args.
type RenameFileArgs struct {
SrcVol string

View File

@ -69,28 +69,6 @@ func (s *storageServer) DeleteVolHandler(arg *string, reply *GenericReply) error
/// File operations
// ListFilesHandler - list files handler.
func (s *storageServer) ListFilesHandler(arg *ListFilesArgs, reply *ListFilesReply) error {
files, eof, err := s.storage.ListFiles(arg.Vol, arg.Prefix, arg.Marker, arg.Recursive, arg.Count)
if err != nil {
log.WithFields(logrus.Fields{
"volume": arg.Vol,
"prefix": arg.Prefix,
"marker": arg.Marker,
"recursive": arg.Recursive,
"count": arg.Count,
}).Debugf("ListFiles failed with error %s", err)
return err
}
// Fill reply structure.
reply.Files = files
reply.EOF = eof
// Return success.
return nil
}
// StatFileHandler - stat file handler is rpc wrapper to stat file.
func (s *storageServer) StatFileHandler(arg *StatFileArgs, reply *FileInfo) error {
fileInfo, err := s.storage.StatFile(arg.Vol, arg.Path)
@ -105,6 +83,20 @@ func (s *storageServer) StatFileHandler(arg *StatFileArgs, reply *FileInfo) erro
return nil
}
// ListDirHandler - list directory handler is rpc wrapper to list dir.
func (s *storageServer) ListDirHandler(arg *ListDirArgs, reply *[]string) error {
entries, err := s.storage.ListDir(arg.Vol, arg.Path)
if err != nil {
log.WithFields(logrus.Fields{
"volume": arg.Vol,
"path": arg.Path,
}).Debugf("ListDir failed with error %s", err)
return err
}
*reply = entries
return nil
}
// DeleteFileHandler - delete file handler is rpc wrapper to delete file.
func (s *storageServer) DeleteFileHandler(arg *DeleteFileArgs, reply *GenericReply) error {
err := s.storage.DeleteFile(arg.Vol, arg.Path)

View File

@ -27,7 +27,7 @@ type StorageAPI interface {
DeleteVol(volume string) (err error)
// File operations.
ListFiles(volume, prefix, marker string, recursive bool, count int) (files []FileInfo, eof bool, err error)
ListDir(volume, dirPath string) ([]string, error)
ReadFile(volume string, path string, offset int64) (readCloser io.ReadCloser, err error)
CreateFile(volume string, path string) (writeCloser io.WriteCloser, err error)
StatFile(volume string, path string) (file FileInfo, err error)

284
tree-walk.go Normal file
View File

@ -0,0 +1,284 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"os"
"path"
"sort"
"strings"
"sync"
"time"
"github.com/Sirupsen/logrus"
)
// listParams - list object params used for list object map
type listParams struct {
bucket string
recursive bool
marker string
prefix string
}
// Tree walk result carries results of tree walking.
type treeWalkResult struct {
fileInfo FileInfo
err error
end bool
}
// Tree walk notify carries a channel which notifies tree walk
// results, additionally it also carries information if treeWalk
// should be timedOut.
type treeWalker struct {
ch <-chan treeWalkResult
timedOut bool
}
// treeWalk walks FS directory tree recursively pushing fileInfo into the channel as and when it encounters files.
func treeWalk(disk StorageAPI, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResult) bool, count *int) bool {
// Example:
// if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively
// called with prefixDir="one/two/three/four/" and marker="five.txt"
// Convert entry to FileInfo
entryToFileInfo := func(entry string) (fileInfo FileInfo, err error) {
if strings.HasSuffix(entry, slashSeparator) {
// Object name needs to be full path.
fileInfo.Name = path.Join(prefixDir, entry)
fileInfo.Name += slashSeparator
fileInfo.Mode = os.ModeDir
return
}
if fileInfo, err = disk.StatFile(bucket, path.Join(prefixDir, entry)); err != nil {
return
}
// Object name needs to be full path.
fileInfo.Name = path.Join(prefixDir, entry)
return
}
var markerBase, markerDir string
if marker != "" {
// Ex: if marker="four/five.txt", markerDir="four/" markerBase="five.txt"
markerSplit := strings.SplitN(marker, slashSeparator, 2)
markerDir = markerSplit[0]
if len(markerSplit) == 2 {
markerDir += slashSeparator
markerBase = markerSplit[1]
}
}
entries, err := disk.ListDir(bucket, prefixDir)
if err != nil {
send(treeWalkResult{err: err})
return false
}
if entryPrefixMatch != "" {
for i, entry := range entries {
if !strings.HasPrefix(entry, entryPrefixMatch) {
entries[i] = ""
}
if hasReservedPrefix(entry) || hasReservedSuffix(entry) {
entries[i] = ""
}
}
}
sort.StringSlice(entries).Sort()
// Skip the empty strings
for len(entries) > 0 && entries[0] == "" {
entries = entries[1:]
}
if len(entries) == 0 {
return true
}
// example:
// If markerDir="four/" Search() returns the index of "four/" in the sorted
// entries list so we skip all the entries till "four/"
idx := sort.StringSlice(entries).Search(markerDir)
entries = entries[idx:]
*count += len(entries)
for i, entry := range entries {
if i == 0 && markerDir == entry {
if !recursive {
// Skip as the marker would already be listed in the previous listing.
*count--
continue
}
if recursive && !strings.HasSuffix(entry, slashSeparator) {
// We should not skip for recursive listing and if markerDir is a directory
// for ex. if marker is "four/five.txt" markerDir will be "four/" which
// should not be skipped, instead it will need to be treeWalk()'ed into.
// Skip if it is a file though as it would be listed in previous listing.
*count--
continue
}
}
if recursive && strings.HasSuffix(entry, slashSeparator) {
// If the entry is a directory, we will need recurse into it.
markerArg := ""
if entry == markerDir {
// We need to pass "five.txt" as marker only if we are
// recursing into "four/"
markerArg = markerBase
}
*count--
prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories.
if !treeWalk(disk, bucket, path.Join(prefixDir, entry), prefixMatch, markerArg, recursive, send, count) {
return false
}
continue
}
*count--
fileInfo, err := entryToFileInfo(entry)
if err != nil {
// The file got deleted in the interim between ListDir() and StatFile()
// Ignore error and continue.
continue
}
if !send(treeWalkResult{fileInfo: fileInfo}) {
return false
}
}
return true
}
// Initiate a new treeWalk in a goroutine.
func startTreeWalk(layer ObjectLayer, bucket, prefix, marker string, recursive bool) *treeWalker {
// Example 1
// If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt"
// treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt"
// and entryPrefixMatch=""
// Example 2
// if prefix is "one/two/th" and marker is "one/two/three/four/five.txt"
// treeWalk is called with prefixDir="one/two/" and marker="three/four/five.txt"
// and entryPrefixMatch="th"
var disk StorageAPI
switch l := layer.(type) {
case xlObjects:
disk = l.storage
case fsObjects:
disk = l.storage
}
ch := make(chan treeWalkResult, maxObjectList)
walkNotify := treeWalker{ch: ch}
entryPrefixMatch := prefix
prefixDir := ""
lastIndex := strings.LastIndex(prefix, slashSeparator)
if lastIndex != -1 {
entryPrefixMatch = prefix[lastIndex+1:]
prefixDir = prefix[:lastIndex+1]
}
count := 0
marker = strings.TrimPrefix(marker, prefixDir)
go func() {
defer close(ch)
send := func(walkResult treeWalkResult) bool {
if count == 0 {
walkResult.end = true
}
timer := time.After(time.Second * 60)
select {
case ch <- walkResult:
return true
case <-timer:
walkNotify.timedOut = true
return false
}
}
treeWalk(disk, bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count)
}()
return &walkNotify
}
// Save the goroutine reference in the map
func saveTreeWalk(layer ObjectLayer, params listParams, walker *treeWalker) {
var listObjectMap map[listParams][]*treeWalker
var listObjectMapMutex *sync.Mutex
switch l := layer.(type) {
case xlObjects:
listObjectMap = l.listObjectMap
listObjectMapMutex = l.listObjectMapMutex
case fsObjects:
listObjectMap = l.listObjectMap
listObjectMapMutex = l.listObjectMapMutex
}
listObjectMapMutex.Lock()
defer listObjectMapMutex.Unlock()
log.WithFields(logrus.Fields{
"bucket": params.bucket,
"recursive": params.recursive,
"marker": params.marker,
"prefix": params.prefix,
}).Debugf("saveTreeWalk has been invoked.")
walkers, _ := listObjectMap[params]
walkers = append(walkers, walker)
listObjectMap[params] = walkers
log.Debugf("Successfully saved in listObjectMap.")
}
// Lookup the goroutine reference from map
func lookupTreeWalk(layer ObjectLayer, params listParams) *treeWalker {
var listObjectMap map[listParams][]*treeWalker
var listObjectMapMutex *sync.Mutex
switch l := layer.(type) {
case xlObjects:
listObjectMap = l.listObjectMap
listObjectMapMutex = l.listObjectMapMutex
case fsObjects:
listObjectMap = l.listObjectMap
listObjectMapMutex = l.listObjectMapMutex
}
listObjectMapMutex.Lock()
defer listObjectMapMutex.Unlock()
log.WithFields(logrus.Fields{
"bucket": params.bucket,
"recursive": params.recursive,
"marker": params.marker,
"prefix": params.prefix,
}).Debugf("lookupTreeWalk has been invoked.")
if walkChs, ok := listObjectMap[params]; ok {
for i, walkCh := range walkChs {
if !walkCh.timedOut {
newWalkChs := walkChs[i+1:]
if len(newWalkChs) > 0 {
listObjectMap[params] = newWalkChs
} else {
delete(listObjectMap, params)
}
log.WithFields(logrus.Fields{
"bucket": params.bucket,
"recursive": params.recursive,
"marker": params.marker,
"prefix": params.prefix,
}).Debugf("Found the previous saved listsObjects params.")
return walkCh
}
}
// As all channels are timed out, delete the map entry
delete(listObjectMap, params)
}
return nil
}

View File

@ -21,9 +21,10 @@ import (
"io"
"os"
slashpath "path"
"sort"
"strings"
"path"
"github.com/Sirupsen/logrus"
"github.com/klauspost/reedsolomon"
)
@ -377,254 +378,35 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) {
return volInfo, nil
}
// isLeafDirectory - check if a given path is leaf directory. i.e
// there are no more directories inside it. Erasure code backend
// format it means that the parent directory is the actual object name.
func isLeafDirectory(disk StorageAPI, volume, leafPath string) (isLeaf bool) {
var markerPath string
var xlListCount = 1000 // Count page.
for {
fileInfos, eof, err := disk.ListFiles(volume, leafPath, markerPath, false, xlListCount)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"leafPath": leafPath,
"markerPath": markerPath,
"recursive": false,
"count": xlListCount,
}).Errorf("ListFiles failed with %s", err)
break
}
for _, fileInfo := range fileInfos {
if fileInfo.Mode.IsDir() {
// Directory found, not a leaf directory, return right here.
return false
}
}
if eof {
break
}
// MarkerPath to get the next set of files.
markerPath = fileInfos[len(fileInfos)-1].Name
}
// Exhausted all the entries, no directories found must be leaf
// return right here.
return true
// isLeafDirectoryXL - check if a given path is leaf directory. i.e
// if it contains file xlMetaV1File
func isLeafDirectoryXL(disk StorageAPI, volume, leafPath string) (isLeaf bool) {
_, err := disk.StatFile(volume, path.Join(leafPath, xlMetaV1File))
return err == nil
}
// extractMetadata - extract xl metadata.
func extractMetadata(disk StorageAPI, volume, path string) (xlMetaV1, error) {
xlMetaV1FilePath := slashpath.Join(path, xlMetaV1File)
// We are not going to read partial data from metadata file,
// read the whole file always.
offset := int64(0)
metadataReader, err := disk.ReadFile(volume, xlMetaV1FilePath, offset)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": xlMetaV1FilePath,
"offset": offset,
}).Errorf("ReadFile failed with %s", err)
return xlMetaV1{}, err
}
// Close metadata reader.
defer metadataReader.Close()
metadata, err := xlMetaV1Decode(metadataReader)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": xlMetaV1FilePath,
"offset": offset,
}).Errorf("xlMetaV1Decode failed with %s", err)
return xlMetaV1{}, err
}
return metadata, nil
}
// Extract file info from paths.
func extractFileInfo(disk StorageAPI, volume, path string) (FileInfo, error) {
fileInfo := FileInfo{}
fileInfo.Volume = volume
fileInfo.Name = path
metadata, err := extractMetadata(disk, volume, path)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Errorf("extractMetadata failed with %s", err)
return FileInfo{}, err
}
fileInfo.Size = metadata.Stat.Size
fileInfo.ModTime = metadata.Stat.ModTime
fileInfo.Mode = os.FileMode(0644) // This is a file already.
return fileInfo, nil
}
// byFileInfoName is a collection satisfying sort.Interface.
type byFileInfoName []FileInfo
func (d byFileInfoName) Len() int { return len(d) }
func (d byFileInfoName) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
func (d byFileInfoName) Less(i, j int) bool { return d[i].Name < d[j].Name }
// ListFiles files at prefix.
func (xl XL) ListFiles(volume, prefix, marker string, recursive bool, count int) (filesInfo []FileInfo, eof bool, err error) {
// ListDir - return all the entries at the given directory path.
// If an entry is a directory it will be returned with a trailing "/".
func (xl XL) ListDir(volume, dirPath string) (entries []string, err error) {
if !isValidVolname(volume) {
return nil, true, errInvalidArgument
return nil, errInvalidArgument
}
// TODO: Fix: If readQuorum is met, its assumed that disks are in consistent file list.
// exclude disks those are not in consistent file list and check count of remaining disks
// are met readQuorum.
// Treat empty file list specially
emptyCount := 0
errCount := 0
successCount := 0
var firstFilesInfo []FileInfo
var firstEOF bool
var firstErr error
// FIXME: need someway to figure out which disk has the latest namespace
// so that Listing can be done there. One option is always do Listing from
// the "local" disk - if it is down user has to list using another XL server.
// This way user knows from which disk he is listing from.
for _, disk := range xl.storageDisks {
if filesInfo, eof, err = listFiles(disk, volume, prefix, marker, recursive, count); err == nil {
// we need to return first successful result
if firstFilesInfo == nil {
firstFilesInfo = filesInfo
firstEOF = eof
if entries, err = disk.ListDir(volume, dirPath); err != nil {
continue
}
for i, entry := range entries {
if strings.HasSuffix(entry, slashSeparator) && isLeafDirectoryXL(disk, volume, path.Join(dirPath, entry)) {
entries[i] = strings.TrimSuffix(entry, slashSeparator)
}
if len(filesInfo) == 0 {
emptyCount++
} else {
successCount++
}
} else {
if firstErr == nil {
firstErr = err
}
errCount++
}
}
if errCount >= xl.readQuorum {
return nil, false, firstErr
} else if successCount >= xl.readQuorum {
return firstFilesInfo, firstEOF, nil
} else if emptyCount >= xl.readQuorum {
return []FileInfo{}, true, nil
}
return nil, false, errReadQuorum
}
func listFiles(disk StorageAPI, volume, prefix, marker string, recursive bool, count int) (filesInfo []FileInfo, eof bool, err error) {
var fsFilesInfo []FileInfo
var markerPath = marker
if marker != "" {
isLeaf := isLeafDirectory(disk, volume, retainSlash(marker))
if isLeaf {
// For leaf for now we just point to the first block, make it
// dynamic in future based on the availability of storage disks.
markerPath = slashpath.Join(marker, xlMetaV1File)
}
}
// Loop and capture the proper fileInfos, requires extraction and
// separation of XL related metadata information.
for {
fsFilesInfo, eof, err = disk.ListFiles(volume, prefix, markerPath, recursive, count)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"prefix": prefix,
"marker": markerPath,
"recursive": recursive,
"count": count,
}).Errorf("ListFiles failed with %s", err)
return nil, true, err
}
for _, fsFileInfo := range fsFilesInfo {
// Skip metadata files.
if strings.HasSuffix(fsFileInfo.Name, xlMetaV1File) {
continue
}
var fileInfo FileInfo
var isLeaf bool
if fsFileInfo.Mode.IsDir() {
isLeaf = isLeafDirectory(disk, volume, fsFileInfo.Name)
}
if isLeaf || !fsFileInfo.Mode.IsDir() {
// Extract the parent of leaf directory or file to get the
// actual name.
path := slashpath.Dir(fsFileInfo.Name)
fileInfo, err = extractFileInfo(disk, volume, path)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Errorf("extractFileInfo failed with %s", err)
// For a leaf directory, if err is FileNotFound then
// perhaps has a missing metadata. Ignore it and let
// healing finish its job it will become available soon.
if err == errFileNotFound {
continue
}
// For any other errors return to the caller.
return nil, true, err
}
} else {
fileInfo = fsFileInfo
}
filesInfo = append(filesInfo, fileInfo)
count--
if count == 0 {
break
}
}
if len(fsFilesInfo) > 0 {
// markerPath for the next disk.ListFiles() iteration.
markerPath = fsFilesInfo[len(fsFilesInfo)-1].Name
}
if count == 0 && recursive && !strings.HasSuffix(markerPath, xlMetaV1File) {
// If last entry is not file.json then loop once more to check if we have reached eof.
fsFilesInfo, eof, err = disk.ListFiles(volume, prefix, markerPath, recursive, 1)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"prefix": prefix,
"marker": markerPath,
"recursive": recursive,
"count": 1,
}).Errorf("ListFiles failed with %s", err)
return nil, true, err
}
if !eof {
// file.N and file.json are always in pairs and hence this
// entry has to be file.json. If not better to manually investigate
// and fix it.
// For the next ListFiles() call we can safely assume that the
// marker is "object/file.json"
if !strings.HasSuffix(fsFilesInfo[0].Name, xlMetaV1File) {
log.WithFields(logrus.Fields{
"volume": volume,
"prefix": prefix,
"fsFileInfo.Name": fsFilesInfo[0].Name,
}).Errorf("ListFiles failed with %s, expected %s to be a file.json file.", err, fsFilesInfo[0].Name)
return nil, true, errUnexpected
}
}
}
if count == 0 || eof {
break
}
}
// Sort to make sure we sort entries back properly.
sort.Sort(byFileInfoName(filesInfo))
return filesInfo, eof, nil
return
}
// Object API.

View File

@ -62,7 +62,7 @@ func partNumToPartFileName(partNum int) string {
// ListMultipartUploads - list multipart uploads.
func (xl xlObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
return listMultipartUploadsCommon(xl.storage, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
return listMultipartUploadsCommon(xl, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
}
// NewMultipartUpload - initialize a new multipart upload, returns a unique id.

View File

@ -22,7 +22,9 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"github.com/Sirupsen/logrus"
"github.com/minio/minio/pkg/mimedb"
)
@ -33,9 +35,17 @@ const (
// xlObjects - Implements fs object layer.
type xlObjects struct {
storage StorageAPI
storage StorageAPI
listObjectMap map[listParams][]*treeWalker
listObjectMapMutex *sync.Mutex
}
func isLeafDirectory(disk StorageAPI, volume, leafPath string) bool {
_, err := disk.StatFile(volume, pathJoin(leafPath, multipartMetaFile))
return err == nil
}
// FIXME: constructor should return a pointer.
// newXLObjects - initialize new xl object layer.
func newXLObjects(exportPaths ...string) (ObjectLayer, error) {
storage, err := newXL(exportPaths...)
@ -47,7 +57,11 @@ func newXLObjects(exportPaths ...string) (ObjectLayer, error) {
cleanupAllTmpEntries(storage)
// Return successfully initialized object layer.
return xlObjects{storage}, nil
return xlObjects{
storage: storage,
listObjectMap: make(map[listParams][]*treeWalker),
listObjectMapMutex: &sync.Mutex{},
}, nil
}
/// Bucket operations
@ -226,13 +240,17 @@ func (xl xlObjects) DeleteObject(bucket, object string) error {
return nil
}
// TODO - support non-recursive case, figure out file size for files uploaded using multipart.
func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket}
}
// Verify whether the bucket exists.
if isExist, err := isBucketExist(xl.storage, bucket); err != nil {
return ListObjectsInfo{}, err
} else if !isExist {
return ListObjectsInfo{}, BucketNotFound{Bucket: bucket}
}
if !IsValidObjectPrefix(prefix) {
return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix}
}
@ -261,64 +279,91 @@ func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
if delimiter == slashSeparator {
recursive = false
}
var allFileInfos, fileInfos []FileInfo
walker := lookupTreeWalk(xl, listParams{bucket, recursive, marker, prefix})
if walker == nil {
walker = startTreeWalk(xl, bucket, prefix, marker, recursive)
}
var fileInfos []FileInfo
var eof bool
var err error
for {
fileInfos, eof, err = xl.storage.ListFiles(bucket, prefix, marker, recursive, maxKeys)
if err != nil {
return ListObjectsInfo{}, toObjectErr(err, bucket)
var nextMarker string
log.Debugf("Reading from the tree walk channel has begun.")
for i := 0; i < maxKeys; {
walkResult, ok := <-walker.ch
if !ok {
// Closed channel.
eof = true
break
}
for _, fileInfo := range fileInfos {
// FIXME: use fileInfo.Mode.IsDir() instead after fixing the bug in
// XL listing which is not reseting the Mode to 0 for leaf dirs.
if strings.HasSuffix(fileInfo.Name, slashSeparator) && isLeafDirectory(xl.storage, bucket, fileInfo.Name) {
// Set the Mode to a "regular" file.
var info MultipartObjectInfo
info, err = getMultipartObjectInfo(xl.storage, bucket, fileInfo.Name)
if err == nil {
fileInfo.Mode = 0
fileInfo.Name = strings.TrimSuffix(fileInfo.Name, slashSeparator)
fileInfo.Size = info.Size
fileInfo.ModTime = info.ModTime
fileInfo.MD5Sum = info.MD5Sum
} else if err != errFileNotFound {
return ListObjectsInfo{}, toObjectErr(err, bucket, fileInfo.Name)
}
allFileInfos = append(allFileInfos, fileInfo)
maxKeys--
continue
} else if strings.HasSuffix(fileInfo.Name, multipartMetaFile) {
fileInfo.Name = path.Dir(fileInfo.Name)
var info MultipartObjectInfo
info, err = getMultipartObjectInfo(xl.storage, bucket, fileInfo.Name)
if err != nil {
return ListObjectsInfo{}, toObjectErr(err, bucket, fileInfo.Name)
}
fileInfo.Size = info.Size
fileInfo.ModTime = info.ModTime
fileInfo.MD5Sum = info.MD5Sum
allFileInfos = append(allFileInfos, fileInfo)
maxKeys--
continue
} else if strings.HasSuffix(fileInfo.Name, multipartSuffix) {
continue
// For any walk error return right away.
if walkResult.err != nil {
log.WithFields(logrus.Fields{
"bucket": bucket,
"prefix": prefix,
"marker": marker,
"recursive": recursive,
}).Debugf("Walk resulted in an error %s", walkResult.err)
// File not found is a valid case.
if walkResult.err == errFileNotFound {
return ListObjectsInfo{}, nil
}
allFileInfos = append(allFileInfos, fileInfo)
maxKeys--
return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix)
}
if maxKeys == 0 {
break
}
if eof {
fileInfo := walkResult.fileInfo
if strings.HasSuffix(fileInfo.Name, slashSeparator) && isLeafDirectory(xl.storage, bucket, fileInfo.Name) {
// Code flow reaches here for non-recursive listing.
var info MultipartObjectInfo
info, err = getMultipartObjectInfo(xl.storage, bucket, fileInfo.Name)
if err == nil {
// Set the Mode to a "regular" file.
fileInfo.Mode = 0
fileInfo.Name = strings.TrimSuffix(fileInfo.Name, slashSeparator)
fileInfo.Size = info.Size
fileInfo.MD5Sum = info.MD5Sum
fileInfo.ModTime = info.ModTime
} else if err != errFileNotFound {
return ListObjectsInfo{}, toObjectErr(err, bucket, fileInfo.Name)
}
} else if strings.HasSuffix(fileInfo.Name, multipartMetaFile) {
// Code flow reaches here for recursive listing.
// for object/00000.minio.multipart, strip the base name
// and calculate get the object size.
fileInfo.Name = path.Dir(fileInfo.Name)
var info MultipartObjectInfo
info, err = getMultipartObjectInfo(xl.storage, bucket, fileInfo.Name)
if err != nil {
return ListObjectsInfo{}, toObjectErr(err, bucket, fileInfo.Name)
}
fileInfo.Size = info.Size
} else if strings.HasSuffix(fileInfo.Name, multipartSuffix) {
// Ignore the part files like object/00001.minio.multipart
continue
}
nextMarker = fileInfo.Name
fileInfos = append(fileInfos, fileInfo)
if walkResult.end {
eof = true
break
}
i++
}
params := listParams{bucket, recursive, nextMarker, prefix}
log.WithFields(logrus.Fields{
"bucket": params.bucket,
"recursive": params.recursive,
"marker": params.marker,
"prefix": params.prefix,
}).Debugf("Save the tree walk into map for subsequent requests.")
if !eof {
saveTreeWalk(xl, params, walker)
}
result := ListObjectsInfo{IsTruncated: !eof}
for _, fileInfo := range allFileInfos {
for _, fileInfo := range fileInfos {
// With delimiter set we fill in NextMarker and Prefixes.
if delimiter == slashSeparator {
result.NextMarker = fileInfo.Name