XL/ListObjects: Fix ordering issue during listing if the files were uploaded as multipart uploads. (#1498) (#1506)

i.e if two files "tmp" and "tmp.1" are uploaded as multipart we would list ""tmp.1"" before ""tmp"" as "tmp.1/" < "tmp/"
This commit is contained in:
Krishna Srinivas
2016-05-06 22:49:09 +05:30
committed by Harshavardhana
parent 5133ea50bd
commit 48d3be36da
5 changed files with 192 additions and 278 deletions

View File

@@ -19,12 +19,10 @@ package main
import (
"encoding/json"
"io"
"path"
"path/filepath"
"strings"
"sync"
"github.com/Sirupsen/logrus"
"github.com/minio/minio/pkg/mimedb"
)
@@ -163,17 +161,17 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
}
fi, err := xl.storage.StatFile(bucket, object)
if err != nil {
if err == errFileNotFound {
var info MultipartObjectInfo
info, err = getMultipartObjectInfo(xl.storage, bucket, object)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
fi.Size = info.Size
fi.ModTime = info.ModTime
fi.MD5Sum = info.MD5Sum
if err != errFileNotFound {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
return ObjectInfo{}, toObjectErr(err, bucket, object)
var info MultipartObjectInfo
info, err = getMultipartObjectInfo(xl.storage, bucket, object)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
fi.Size = info.Size
fi.ModTime = info.ModTime
fi.MD5Sum = info.MD5Sum
}
contentType := "application/octet-stream"
if objectExt := filepath.Ext(object); objectExt != "" {
@@ -246,143 +244,5 @@ func (xl xlObjects) DeleteObject(bucket, object string) error {
}
func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket}
}
// Verify whether the bucket exists.
if isExist, err := isBucketExist(xl.storage, bucket); err != nil {
return ListObjectsInfo{}, err
} else if !isExist {
return ListObjectsInfo{}, BucketNotFound{Bucket: bucket}
}
if !IsValidObjectPrefix(prefix) {
return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix}
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return ListObjectsInfo{}, UnsupportedDelimiter{
Delimiter: delimiter,
}
}
// Verify if marker has prefix.
if marker != "" {
if !strings.HasPrefix(marker, prefix) {
return ListObjectsInfo{}, InvalidMarkerPrefixCombination{
Marker: marker,
Prefix: prefix,
}
}
}
if maxKeys == 0 {
return ListObjectsInfo{}, nil
}
// Default is recursive, if delimiter is set then list non recursive.
recursive := true
if delimiter == slashSeparator {
recursive = false
}
walker := lookupTreeWalk(xl, listParams{bucket, recursive, marker, prefix})
if walker == nil {
walker = startTreeWalk(xl, bucket, prefix, marker, recursive)
}
var fileInfos []FileInfo
var eof bool
var err error
var nextMarker string
log.Debugf("Reading from the tree walk channel has begun.")
for i := 0; i < maxKeys; {
walkResult, ok := <-walker.ch
if !ok {
// Closed channel.
eof = true
break
}
// For any walk error return right away.
if walkResult.err != nil {
log.WithFields(logrus.Fields{
"bucket": bucket,
"prefix": prefix,
"marker": marker,
"recursive": recursive,
}).Debugf("Walk resulted in an error %s", walkResult.err)
// File not found is a valid case.
if walkResult.err == errFileNotFound {
return ListObjectsInfo{}, nil
}
return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix)
}
fileInfo := walkResult.fileInfo
if strings.HasSuffix(fileInfo.Name, slashSeparator) && isLeafDirectory(xl.storage, bucket, fileInfo.Name) {
// Code flow reaches here for non-recursive listing.
var info MultipartObjectInfo
info, err = getMultipartObjectInfo(xl.storage, bucket, fileInfo.Name)
if err == nil {
// Set the Mode to a "regular" file.
fileInfo.Mode = 0
fileInfo.Name = strings.TrimSuffix(fileInfo.Name, slashSeparator)
fileInfo.Size = info.Size
fileInfo.MD5Sum = info.MD5Sum
fileInfo.ModTime = info.ModTime
} else if err != errFileNotFound {
return ListObjectsInfo{}, toObjectErr(err, bucket, fileInfo.Name)
}
} else if strings.HasSuffix(fileInfo.Name, multipartMetaFile) {
// Code flow reaches here for recursive listing.
// for object/00000.minio.multipart, strip the base name
// and calculate get the object size.
fileInfo.Name = path.Dir(fileInfo.Name)
var info MultipartObjectInfo
info, err = getMultipartObjectInfo(xl.storage, bucket, fileInfo.Name)
if err != nil {
return ListObjectsInfo{}, toObjectErr(err, bucket, fileInfo.Name)
}
fileInfo.Size = info.Size
} else if strings.HasSuffix(fileInfo.Name, multipartSuffix) {
// Ignore the part files like object/00001.minio.multipart
continue
}
nextMarker = fileInfo.Name
fileInfos = append(fileInfos, fileInfo)
if walkResult.end {
eof = true
break
}
i++
}
params := listParams{bucket, recursive, nextMarker, prefix}
log.WithFields(logrus.Fields{
"bucket": params.bucket,
"recursive": params.recursive,
"marker": params.marker,
"prefix": params.prefix,
}).Debugf("Save the tree walk into map for subsequent requests.")
if !eof {
saveTreeWalk(xl, params, walker)
}
result := ListObjectsInfo{IsTruncated: !eof}
for _, fileInfo := range fileInfos {
// With delimiter set we fill in NextMarker and Prefixes.
if delimiter == slashSeparator {
result.NextMarker = fileInfo.Name
if fileInfo.Mode.IsDir() {
result.Prefixes = append(result.Prefixes, fileInfo.Name)
continue
}
}
result.Objects = append(result.Objects, ObjectInfo{
Name: fileInfo.Name,
ModTime: fileInfo.ModTime,
Size: fileInfo.Size,
IsDir: false,
})
}
return result, nil
return listObjectsCommon(xl, bucket, prefix, marker, delimiter, maxKeys)
}