xl/fs: ListObjectParts should set nextPartNumberMarker properly. (#1528)

For list requests on parts more than 1000, would lead to an infinite
loop.

Fixes #1522
This commit is contained in:
Harshavardhana 2016-05-08 02:21:12 -07:00 committed by Anand Babu (AB) Periasamy
parent a56d5ef415
commit 56b7df90e1
4 changed files with 14 additions and 8 deletions

View File

@ -118,7 +118,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
} }
// Save the s3 md5. // Save the s3 md5.
s3MD5, err := makeS3MD5(md5Sums...) s3MD5, err := completeMultipartMD5(md5Sums...)
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@ -495,8 +495,7 @@ func listMultipartUploadsCommon(layer ObjectLayer, bucket, prefix, keyMarker, up
return result, nil return result, nil
} }
// ListObjectParts - list object parts, common function across both // ListObjectParts - list object parts, common function across both object layers.
// object layers.
func listObjectPartsCommon(storage StorageAPI, bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { func listObjectPartsCommon(storage StorageAPI, bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
// Verify if bucket is valid. // Verify if bucket is valid.
if !IsValidBucketName(bucket) { if !IsValidBucketName(bucket) {
@ -527,15 +526,17 @@ func listObjectPartsCommon(storage StorageAPI, bucket, object, uploadID string,
count := maxParts count := maxParts
for _, entry := range newEntries { for _, entry := range newEntries {
fi, err := storage.StatFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID, entry)) fi, err := storage.StatFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID, entry))
splitEntry := strings.Split(entry, ".") splitEntry := strings.SplitN(entry, ".", 2)
partNum, err := strconv.Atoi(splitEntry[0]) partStr := splitEntry[0]
etagStr := splitEntry[1]
partNum, err := strconv.Atoi(partStr)
if err != nil { if err != nil {
return ListPartsInfo{}, err return ListPartsInfo{}, err
} }
result.Parts = append(result.Parts, partInfo{ result.Parts = append(result.Parts, partInfo{
PartNumber: partNum, PartNumber: partNum,
LastModified: fi.ModTime, LastModified: fi.ModTime,
ETag: splitEntry[1], ETag: etagStr,
Size: fi.Size, Size: fi.Size,
}) })
count-- count--
@ -543,8 +544,13 @@ func listObjectPartsCommon(storage StorageAPI, bucket, object, uploadID string,
break break
} }
} }
// If listed entries are more than maxParts, we set IsTruncated as true.
if len(newEntries) > len(result.Parts) { if len(newEntries) > len(result.Parts) {
result.IsTruncated = true result.IsTruncated = true
// Make sure to fill next part number marker if IsTruncated is
// true for subsequent listing.
nextPartNumberMarker := result.Parts[len(result.Parts)-1].PartNumber
result.NextPartNumberMarker = nextPartNumberMarker
} }
result.Bucket = bucket result.Bucket = bucket
result.Object = object result.Object = object

View File

@ -109,7 +109,7 @@ func pathJoin(s1 string, s2 string) string {
} }
// Create an s3 compatible MD5sum for complete multipart transaction. // Create an s3 compatible MD5sum for complete multipart transaction.
func makeS3MD5(md5Strs ...string) (string, error) { func completeMultipartMD5(md5Strs ...string) (string, error) {
var finalMD5Bytes []byte var finalMD5Bytes []byte
for _, md5Str := range md5Strs { for _, md5Str := range md5Strs {
md5Bytes, err := hex.DecodeString(md5Str) md5Bytes, err := hex.DecodeString(md5Str)

View File

@ -141,7 +141,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
} }
// Calculate and save s3 compatible md5sum. // Calculate and save s3 compatible md5sum.
s3MD5, err := makeS3MD5(md5Sums...) s3MD5, err := completeMultipartMD5(md5Sums...)
if err != nil { if err != nil {
return "", err return "", err
} }