diff --git a/fs-objects.go b/fs-objects.go index 1737018a6..39a646a87 100644 --- a/fs-objects.go +++ b/fs-objects.go @@ -22,7 +22,6 @@ import ( "strings" "sync" - "github.com/Sirupsen/logrus" "github.com/minio/minio/pkg/mimedb" ) @@ -157,118 +156,5 @@ func (fs fsObjects) DeleteObject(bucket, object string) error { } func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket} - } - // Verify whether the bucket exists. - if isExist, err := isBucketExist(fs.storage, bucket); err != nil { - return ListObjectsInfo{}, err - } else if !isExist { - return ListObjectsInfo{}, BucketNotFound{Bucket: bucket} - } - if !IsValidObjectPrefix(prefix) { - return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix} - } - // Verify if delimiter is anything other than '/', which we do not support. - if delimiter != "" && delimiter != slashSeparator { - return ListObjectsInfo{}, UnsupportedDelimiter{ - Delimiter: delimiter, - } - } - // Verify if marker has prefix. - if marker != "" { - if !strings.HasPrefix(marker, prefix) { - return ListObjectsInfo{}, InvalidMarkerPrefixCombination{ - Marker: marker, - Prefix: prefix, - } - } - } - - if maxKeys == 0 { - return ListObjectsInfo{}, nil - } - - // Over flowing count - reset to maxObjectList. - if maxKeys < 0 || maxKeys > maxObjectList { - maxKeys = maxObjectList - } - - // Default is recursive, if delimiter is set then list non recursive. - recursive := true - if delimiter == slashSeparator { - recursive = false - } - - walker := lookupTreeWalk(fs, listParams{bucket, recursive, marker, prefix}) - if walker == nil { - walker = startTreeWalk(fs, bucket, prefix, marker, recursive) - } - var fileInfos []FileInfo - var eof bool - var nextMarker string - log.Debugf("Reading from the tree walk channel has begun.") - for i := 0; i < maxKeys; { - walkResult, ok := <-walker.ch - if !ok { - // Closed channel. - eof = true - break - } - // For any walk error return right away. - if walkResult.err != nil { - log.WithFields(logrus.Fields{ - "bucket": bucket, - "prefix": prefix, - "marker": marker, - "recursive": recursive, - }).Debugf("Walk resulted in an error %s", walkResult.err) - // File not found is a valid case. - if walkResult.err == errFileNotFound { - return ListObjectsInfo{}, nil - } - return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix) - } - fileInfo := walkResult.fileInfo - nextMarker = fileInfo.Name - fileInfos = append(fileInfos, fileInfo) - if walkResult.end { - eof = true - break - } - i++ - } - if len(fileInfos) == 0 { - eof = true - } - params := listParams{bucket, recursive, nextMarker, prefix} - log.WithFields(logrus.Fields{ - "bucket": params.bucket, - "recursive": params.recursive, - "marker": params.marker, - "prefix": params.prefix, - }).Debugf("Save the tree walk into map for subsequent requests.") - if !eof { - saveTreeWalk(fs, params, walker) - } - - result := ListObjectsInfo{IsTruncated: !eof} - for _, fileInfo := range fileInfos { - // With delimiter set we fill in NextMarker and Prefixes. - if delimiter == slashSeparator { - result.NextMarker = fileInfo.Name - if fileInfo.Mode.IsDir() { - result.Prefixes = append(result.Prefixes, fileInfo.Name) - continue - } - } - result.Objects = append(result.Objects, ObjectInfo{ - Name: fileInfo.Name, - ModTime: fileInfo.ModTime, - Size: fileInfo.Size, - IsDir: false, - }) - } - return result, nil + return listObjectsCommon(fs, bucket, prefix, marker, delimiter, maxKeys) } diff --git a/object-common.go b/object-common.go index d41136dc1..bad4eb548 100644 --- a/object-common.go +++ b/object-common.go @@ -22,6 +22,9 @@ import ( "io" "path" "sort" + "strings" + + "github.com/Sirupsen/logrus" ) // Common initialization needed for both object layers. @@ -191,6 +194,130 @@ func putObjectCommon(storage StorageAPI, bucket string, object string, size int6 return newMD5Hex, nil } +func listObjectsCommon(layer ObjectLayer, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { + var disk StorageAPI + switch l := layer.(type) { + case xlObjects: + disk = l.storage + case fsObjects: + disk = l.storage + } + + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket} + } + + // Verify whether the bucket exists. + if isExist, err := isBucketExist(disk, bucket); err != nil { + return ListObjectsInfo{}, err + } else if !isExist { + return ListObjectsInfo{}, BucketNotFound{Bucket: bucket} + } + + if !IsValidObjectPrefix(prefix) { + return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix} + } + // Verify if delimiter is anything other than '/', which we do not support. + if delimiter != "" && delimiter != slashSeparator { + return ListObjectsInfo{}, UnsupportedDelimiter{ + Delimiter: delimiter, + } + } + // Verify if marker has prefix. + if marker != "" { + if !strings.HasPrefix(marker, prefix) { + return ListObjectsInfo{}, InvalidMarkerPrefixCombination{ + Marker: marker, + Prefix: prefix, + } + } + } + + if maxKeys == 0 { + return ListObjectsInfo{}, nil + } + + // Over flowing count - reset to maxObjectList. + if maxKeys < 0 || maxKeys > maxObjectList { + maxKeys = maxObjectList + } + + // Default is recursive, if delimiter is set then list non recursive. + recursive := true + if delimiter == slashSeparator { + recursive = false + } + + walker := lookupTreeWalk(layer, listParams{bucket, recursive, marker, prefix}) + if walker == nil { + walker = startTreeWalk(layer, bucket, prefix, marker, recursive) + } + var fileInfos []FileInfo + var eof bool + var nextMarker string + log.Debugf("Reading from the tree walk channel has begun.") + for i := 0; i < maxKeys; { + walkResult, ok := <-walker.ch + if !ok { + // Closed channel. + eof = true + break + } + // For any walk error return right away. + if walkResult.err != nil { + log.WithFields(logrus.Fields{ + "bucket": bucket, + "prefix": prefix, + "marker": marker, + "recursive": recursive, + }).Debugf("Walk resulted in an error %s", walkResult.err) + // File not found is a valid case. + if walkResult.err == errFileNotFound { + return ListObjectsInfo{}, nil + } + return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix) + } + fileInfo := walkResult.fileInfo + nextMarker = fileInfo.Name + fileInfos = append(fileInfos, fileInfo) + if walkResult.end { + eof = true + break + } + i++ + } + params := listParams{bucket, recursive, nextMarker, prefix} + log.WithFields(logrus.Fields{ + "bucket": params.bucket, + "recursive": params.recursive, + "marker": params.marker, + "prefix": params.prefix, + }).Debugf("Save the tree walk into map for subsequent requests.") + if !eof { + saveTreeWalk(layer, params, walker) + } + + result := ListObjectsInfo{IsTruncated: !eof} + for _, fileInfo := range fileInfos { + // With delimiter set we fill in NextMarker and Prefixes. + if delimiter == slashSeparator { + result.NextMarker = fileInfo.Name + if fileInfo.Mode.IsDir() { + result.Prefixes = append(result.Prefixes, fileInfo.Name) + continue + } + } + result.Objects = append(result.Objects, ObjectInfo{ + Name: fileInfo.Name, + ModTime: fileInfo.ModTime, + Size: fileInfo.Size, + IsDir: false, + }) + } + return result, nil +} + // checks whether bucket exists. func isBucketExist(storage StorageAPI, bucketName string) (bool, error) { // Check whether bucket exists. diff --git a/tree-walk.go b/tree-walk.go index 3aaae2279..efd29b130 100644 --- a/tree-walk.go +++ b/tree-walk.go @@ -51,11 +51,21 @@ type treeWalker struct { } // treeWalk walks FS directory tree recursively pushing fileInfo into the channel as and when it encounters files. -func treeWalk(disk StorageAPI, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResult) bool, count *int) bool { +func treeWalk(layer ObjectLayer, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResult) bool, count *int) bool { // Example: // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively // called with prefixDir="one/two/three/four/" and marker="five.txt" + var isXL bool + var disk StorageAPI + switch l := layer.(type) { + case xlObjects: + isXL = true + disk = l.storage + case fsObjects: + disk = l.storage + } + // Convert entry to FileInfo entryToFileInfo := func(entry string) (fileInfo FileInfo, err error) { if strings.HasSuffix(entry, slashSeparator) { @@ -65,6 +75,25 @@ func treeWalk(disk StorageAPI, bucket, prefixDir, entryPrefixMatch, marker strin fileInfo.Mode = os.ModeDir return } + if isXL && strings.HasSuffix(entry, multipartSuffix) { + // If the entry was detected as a multipart file we use + // getMultipartObjectInfo() to fill the FileInfo structure. + entry = strings.Trim(entry, multipartSuffix) + var info MultipartObjectInfo + info, err = getMultipartObjectInfo(disk, bucket, path.Join(prefixDir, entry)) + if err != nil { + return + } + // Set the Mode to a "regular" file. + fileInfo.Mode = 0 + // Trim the suffix that was temporarily added to indicate that this + // is a multipart file. + fileInfo.Name = path.Join(prefixDir, entry) + fileInfo.Size = info.Size + fileInfo.MD5Sum = info.MD5Sum + fileInfo.ModTime = info.ModTime + return + } if fileInfo, err = disk.StatFile(bucket, path.Join(prefixDir, entry)); err != nil { return } @@ -88,6 +117,7 @@ func treeWalk(disk StorageAPI, bucket, prefixDir, entryPrefixMatch, marker strin send(treeWalkResult{err: err}) return false } + if entryPrefixMatch != "" { for i, entry := range entries { if !strings.HasPrefix(entry, entryPrefixMatch) { @@ -98,7 +128,14 @@ func treeWalk(disk StorageAPI, bucket, prefixDir, entryPrefixMatch, marker strin } } } - sort.StringSlice(entries).Sort() + // For XL multipart files strip the trailing "/" and append ".minio.multipart" to the entry so that + // entryToFileInfo() can call StatFile for regular files or getMultipartObjectInfo() for multipart files. + for i, entry := range entries { + if isXL && strings.HasSuffix(entry, slashSeparator) && isLeafDirectory(disk, bucket, path.Join(prefixDir, entry)) { + entries[i] = strings.TrimSuffix(entry, slashSeparator) + multipartSuffix + } + } + sort.Sort(byMultipartFiles(entries)) // Skip the empty strings for len(entries) > 0 && entries[0] == "" { entries = entries[1:] @@ -109,7 +146,7 @@ func treeWalk(disk StorageAPI, bucket, prefixDir, entryPrefixMatch, marker strin // example: // If markerDir="four/" Search() returns the index of "four/" in the sorted // entries list so we skip all the entries till "four/" - idx := sort.StringSlice(entries).Search(markerDir) + idx := sort.Search(len(entries), func(i int) bool { return strings.TrimSuffix(entries[i], multipartSuffix) >= markerDir }) entries = entries[idx:] *count += len(entries) for i, entry := range entries { @@ -140,7 +177,7 @@ func treeWalk(disk StorageAPI, bucket, prefixDir, entryPrefixMatch, marker strin } *count-- prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories. - if !treeWalk(disk, bucket, path.Join(prefixDir, entry), prefixMatch, markerArg, recursive, send, count) { + if !treeWalk(layer, bucket, path.Join(prefixDir, entry), prefixMatch, markerArg, recursive, send, count) { return false } continue @@ -170,13 +207,6 @@ func startTreeWalk(layer ObjectLayer, bucket, prefix, marker string, recursive b // if prefix is "one/two/th" and marker is "one/two/three/four/five.txt" // treeWalk is called with prefixDir="one/two/" and marker="three/four/five.txt" // and entryPrefixMatch="th" - var disk StorageAPI - switch l := layer.(type) { - case xlObjects: - disk = l.storage - case fsObjects: - disk = l.storage - } ch := make(chan treeWalkResult, maxObjectList) walkNotify := treeWalker{ch: ch} @@ -204,7 +234,7 @@ func startTreeWalk(layer ObjectLayer, bucket, prefix, marker string, recursive b return false } } - treeWalk(disk, bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count) + treeWalk(layer, bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count) }() return &walkNotify } diff --git a/xl-objects-multipart.go b/xl-objects-multipart.go index 5f580096a..806df8602 100644 --- a/xl-objects-multipart.go +++ b/xl-objects-multipart.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "path" + "strings" "time" ) @@ -41,6 +42,16 @@ type MultipartObjectInfo struct { MD5Sum string } +type byMultipartFiles []string + +func (files byMultipartFiles) Len() int { return len(files) } +func (files byMultipartFiles) Less(i, j int) bool { + first := strings.TrimSuffix(files[i], multipartSuffix) + second := strings.TrimSuffix(files[j], multipartSuffix) + return first < second +} +func (files byMultipartFiles) Swap(i, j int) { files[i], files[j] = files[j], files[i] } + // GetPartNumberOffset - given an offset for the whole object, return the part and offset in that part. func (m MultipartObjectInfo) GetPartNumberOffset(offset int64) (partIndex int, partOffset int64, err error) { partOffset = offset diff --git a/xl-objects.go b/xl-objects.go index 8fce20b37..d0b7b9b0c 100644 --- a/xl-objects.go +++ b/xl-objects.go @@ -19,12 +19,10 @@ package main import ( "encoding/json" "io" - "path" "path/filepath" "strings" "sync" - "github.com/Sirupsen/logrus" "github.com/minio/minio/pkg/mimedb" ) @@ -163,17 +161,17 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { } fi, err := xl.storage.StatFile(bucket, object) if err != nil { - if err == errFileNotFound { - var info MultipartObjectInfo - info, err = getMultipartObjectInfo(xl.storage, bucket, object) - if err != nil { - return ObjectInfo{}, toObjectErr(err, bucket, object) - } - fi.Size = info.Size - fi.ModTime = info.ModTime - fi.MD5Sum = info.MD5Sum + if err != errFileNotFound { + return ObjectInfo{}, toObjectErr(err, bucket, object) } - return ObjectInfo{}, toObjectErr(err, bucket, object) + var info MultipartObjectInfo + info, err = getMultipartObjectInfo(xl.storage, bucket, object) + if err != nil { + return ObjectInfo{}, toObjectErr(err, bucket, object) + } + fi.Size = info.Size + fi.ModTime = info.ModTime + fi.MD5Sum = info.MD5Sum } contentType := "application/octet-stream" if objectExt := filepath.Ext(object); objectExt != "" { @@ -246,143 +244,5 @@ func (xl xlObjects) DeleteObject(bucket, object string) error { } func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket} - } - // Verify whether the bucket exists. - if isExist, err := isBucketExist(xl.storage, bucket); err != nil { - return ListObjectsInfo{}, err - } else if !isExist { - return ListObjectsInfo{}, BucketNotFound{Bucket: bucket} - } - if !IsValidObjectPrefix(prefix) { - return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix} - } - // Verify if delimiter is anything other than '/', which we do not support. - if delimiter != "" && delimiter != slashSeparator { - return ListObjectsInfo{}, UnsupportedDelimiter{ - Delimiter: delimiter, - } - } - // Verify if marker has prefix. - if marker != "" { - if !strings.HasPrefix(marker, prefix) { - return ListObjectsInfo{}, InvalidMarkerPrefixCombination{ - Marker: marker, - Prefix: prefix, - } - } - } - - if maxKeys == 0 { - return ListObjectsInfo{}, nil - } - - // Default is recursive, if delimiter is set then list non recursive. - recursive := true - if delimiter == slashSeparator { - recursive = false - } - - walker := lookupTreeWalk(xl, listParams{bucket, recursive, marker, prefix}) - if walker == nil { - walker = startTreeWalk(xl, bucket, prefix, marker, recursive) - } - var fileInfos []FileInfo - var eof bool - var err error - var nextMarker string - log.Debugf("Reading from the tree walk channel has begun.") - for i := 0; i < maxKeys; { - walkResult, ok := <-walker.ch - if !ok { - // Closed channel. - eof = true - break - } - // For any walk error return right away. - if walkResult.err != nil { - log.WithFields(logrus.Fields{ - "bucket": bucket, - "prefix": prefix, - "marker": marker, - "recursive": recursive, - }).Debugf("Walk resulted in an error %s", walkResult.err) - // File not found is a valid case. - if walkResult.err == errFileNotFound { - return ListObjectsInfo{}, nil - } - return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix) - } - fileInfo := walkResult.fileInfo - if strings.HasSuffix(fileInfo.Name, slashSeparator) && isLeafDirectory(xl.storage, bucket, fileInfo.Name) { - // Code flow reaches here for non-recursive listing. - - var info MultipartObjectInfo - info, err = getMultipartObjectInfo(xl.storage, bucket, fileInfo.Name) - if err == nil { - // Set the Mode to a "regular" file. - fileInfo.Mode = 0 - fileInfo.Name = strings.TrimSuffix(fileInfo.Name, slashSeparator) - fileInfo.Size = info.Size - fileInfo.MD5Sum = info.MD5Sum - fileInfo.ModTime = info.ModTime - } else if err != errFileNotFound { - return ListObjectsInfo{}, toObjectErr(err, bucket, fileInfo.Name) - } - } else if strings.HasSuffix(fileInfo.Name, multipartMetaFile) { - // Code flow reaches here for recursive listing. - - // for object/00000.minio.multipart, strip the base name - // and calculate get the object size. - fileInfo.Name = path.Dir(fileInfo.Name) - var info MultipartObjectInfo - info, err = getMultipartObjectInfo(xl.storage, bucket, fileInfo.Name) - if err != nil { - return ListObjectsInfo{}, toObjectErr(err, bucket, fileInfo.Name) - } - fileInfo.Size = info.Size - } else if strings.HasSuffix(fileInfo.Name, multipartSuffix) { - // Ignore the part files like object/00001.minio.multipart - continue - } - nextMarker = fileInfo.Name - fileInfos = append(fileInfos, fileInfo) - if walkResult.end { - eof = true - break - } - i++ - } - params := listParams{bucket, recursive, nextMarker, prefix} - log.WithFields(logrus.Fields{ - "bucket": params.bucket, - "recursive": params.recursive, - "marker": params.marker, - "prefix": params.prefix, - }).Debugf("Save the tree walk into map for subsequent requests.") - if !eof { - saveTreeWalk(xl, params, walker) - } - - result := ListObjectsInfo{IsTruncated: !eof} - - for _, fileInfo := range fileInfos { - // With delimiter set we fill in NextMarker and Prefixes. - if delimiter == slashSeparator { - result.NextMarker = fileInfo.Name - if fileInfo.Mode.IsDir() { - result.Prefixes = append(result.Prefixes, fileInfo.Name) - continue - } - } - result.Objects = append(result.Objects, ObjectInfo{ - Name: fileInfo.Name, - ModTime: fileInfo.ModTime, - Size: fileInfo.Size, - IsDir: false, - }) - } - return result, nil + return listObjectsCommon(xl, bucket, prefix, marker, delimiter, maxKeys) }