XL - fixes mostly related to multipart listing. (#1441)

* XL/Multipart: Use json.NewDecoder to decode read stream.

* XL/Multipart: fix recursive and non-recursive listing.

* XL/Multipart: Create object part with md5sum later using RenameFile.

* XL/Multipart: ListObjectParts should list parts in order.

previously: uploadID.10.md5sum < uploadID.2.md5sum
fix       : uploadID.00010.md5sum > uploadID.00002.md5sum

* XL/Multipart: Keep the size of each part in the multipart metadata file to avoid stats on the parts.

* XL/Multipart: fix listing bug which was showing size of the multipart uploaded objects as 0.
This commit is contained in:
Krishna Srinivas 2016-05-02 06:22:16 +05:30 committed by Harshavardhana
parent ba7a55c321
commit 286de4de2c
2 changed files with 135 additions and 64 deletions

View File

@ -24,6 +24,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"path" "path"
"sort"
"strconv" "strconv"
"strings" "strings"
@ -31,6 +32,45 @@ import (
"github.com/skyrings/skyring-common/tools/uuid" "github.com/skyrings/skyring-common/tools/uuid"
) )
// MultipartPartInfo Info of each part kept in the multipart metadata file after
// CompleteMultipartUpload() is called.
type MultipartPartInfo struct {
PartNumber int
ETag string
Size int64
}
// MultipartObjectInfo - contents of the multipart metadata file after
// CompleteMultipartUpload() is called.
type MultipartObjectInfo []MultipartPartInfo
// GetSize - Return the size of the object.
func (m MultipartObjectInfo) GetSize() (size int64) {
for _, part := range m {
size += part.Size
}
return
}
// GetPartNumberOffset - given an offset for the whole object, return the part and offset in that part.
func (m MultipartObjectInfo) GetPartNumberOffset(offset int64) (partIndex int, partOffset int64, err error) {
partOffset = offset
for i, part := range m {
partIndex = i
if partOffset < part.Size {
return
}
partOffset -= part.Size
}
// Offset beyond the size of the object
err = errUnexpected
return
}
func partNumToPartFileName(partNum int) string {
return fmt.Sprintf("%.5d%s", partNum, multipartSuffix)
}
// listLeafEntries - lists all entries if a given prefixPath is a leaf // listLeafEntries - lists all entries if a given prefixPath is a leaf
// directory, returns error if any - returns empty list if prefixPath // directory, returns error if any - returns empty list if prefixPath
// is not a leaf directory. // is not a leaf directory.
@ -326,8 +366,9 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
return "", InvalidUploadID{UploadID: uploadID} return "", InvalidUploadID{UploadID: uploadID}
} }
partSuffix := fmt.Sprintf("%s.%d.%s", uploadID, partID, md5Hex) partSuffix := fmt.Sprintf("%s.%d", uploadID, partID)
fileWriter, err := xl.storage.CreateFile(minioMetaVolume, path.Join(bucket, object, partSuffix)) partSuffixPath := path.Join(bucket, object, partSuffix)
fileWriter, err := xl.storage.CreateFile(minioMetaVolume, partSuffixPath)
if err != nil { if err != nil {
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
@ -369,6 +410,12 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
if err != nil { if err != nil {
return "", err return "", err
} }
partSuffixMD5 := fmt.Sprintf("%s.%.5d.%s", uploadID, partID, newMD5Hex)
partSuffixMD5Path := path.Join(bucket, object, partSuffixMD5)
err = xl.storage.RenameFile(minioMetaVolume, partSuffixPath, minioMetaVolume, partSuffixMD5Path)
if err != nil {
return "", err
}
return newMD5Hex, nil return newMD5Hex, nil
} }
@ -392,7 +439,7 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
// Figure out the marker for the next subsequent calls, if the // Figure out the marker for the next subsequent calls, if the
// partNumberMarker is already set. // partNumberMarker is already set.
if partNumberMarker > 0 { if partNumberMarker > 0 {
partNumberMarkerPath := uploadIDPath + "." + strconv.Itoa(partNumberMarker) + "." partNumberMarkerPath := uploadIDPath + "." + fmt.Sprintf("%.5d", partNumberMarker) + "."
fileInfos, _, err := xl.storage.ListFiles(minioMetaVolume, partNumberMarkerPath, "", false, 1) fileInfos, _, err := xl.storage.ListFiles(minioMetaVolume, partNumberMarkerPath, "", false, 1)
if err != nil { if err != nil {
return result, toObjectErr(err, minioMetaVolume, partNumberMarkerPath) return result, toObjectErr(err, minioMetaVolume, partNumberMarkerPath)
@ -449,12 +496,21 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
} else if !status { } else if !status {
return "", (InvalidUploadID{UploadID: uploadID}) return "", (InvalidUploadID{UploadID: uploadID})
} }
sort.Sort(completedParts(parts))
var metadata MultipartObjectInfo
for _, part := range parts {
partSuffix := fmt.Sprintf("%s.%.5d.%s", uploadID, part.PartNumber, part.ETag)
fi, err := xl.storage.StatFile(minioMetaVolume, path.Join(bucket, object, partSuffix))
if err != nil {
return "", err
}
metadata = append(metadata, MultipartPartInfo{part.PartNumber, part.ETag, fi.Size})
}
var md5Sums []string var md5Sums []string
for _, part := range parts { for _, part := range parts {
// Construct part suffix. // Construct part suffix.
partSuffix := fmt.Sprintf("%s.%d.%s", uploadID, part.PartNumber, part.ETag) partSuffix := fmt.Sprintf("%s.%.5d.%s", uploadID, part.PartNumber, part.ETag)
err := xl.storage.RenameFile(minioMetaVolume, path.Join(bucket, object, partSuffix), bucket, path.Join(object, fmt.Sprint(part.PartNumber))) err := xl.storage.RenameFile(minioMetaVolume, path.Join(bucket, object, partSuffix), bucket, path.Join(object, partNumToPartFileName(part.PartNumber)))
// We need a way to roll back if of the renames failed. // We need a way to roll back if of the renames failed.
if err != nil { if err != nil {
return "", err return "", err
@ -464,7 +520,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
if w, err := xl.storage.CreateFile(bucket, pathJoin(object, multipartMetaFile)); err == nil { if w, err := xl.storage.CreateFile(bucket, pathJoin(object, multipartMetaFile)); err == nil {
var b []byte var b []byte
b, err = json.Marshal(parts) b, err = json.Marshal(metadata)
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@ -20,8 +20,8 @@ import (
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"path"
"path/filepath" "path/filepath"
"strings" "strings"
@ -29,7 +29,8 @@ import (
) )
const ( const (
multipartMetaFile = "multipart.json" multipartSuffix = ".minio.multipart"
multipartMetaFile = "00000" + multipartSuffix
) )
// xlObjects - Implements fs object layer. // xlObjects - Implements fs object layer.
@ -72,25 +73,6 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
// GetObject - get an object. // GetObject - get an object.
func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.ReadCloser, error) { func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.ReadCloser, error) {
findPartOffset := func(parts completedParts) (partIndex int, partOffset int64, err error) {
partOffset = startOffset
for i, part := range parts {
partIndex = i
var fileInfo FileInfo
fileInfo, err = xl.storage.StatFile(bucket, pathJoin(object, fmt.Sprint(part.PartNumber)))
if err != nil {
return
}
if partOffset < fileInfo.Size {
return
}
partOffset -= fileInfo.Size
}
// Offset beyond the size of the object
err = errUnexpected
return
}
// Verify if bucket is valid. // Verify if bucket is valid.
if !IsValidBucketName(bucket) { if !IsValidBucketName(bucket) {
return nil, BucketNameInvalid{Bucket: bucket} return nil, BucketNameInvalid{Bucket: bucket}
@ -111,18 +93,18 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
return nil, toObjectErr(err, bucket, object) return nil, toObjectErr(err, bucket, object)
} }
fileReader, fileWriter := io.Pipe() fileReader, fileWriter := io.Pipe()
parts, err := xl.getParts(bucket, object) info, err := xl.getMultipartObjectInfo(bucket, object)
if err != nil { if err != nil {
return nil, toObjectErr(err, bucket, object) return nil, toObjectErr(err, bucket, object)
} }
partIndex, offset, err := findPartOffset(parts) partIndex, offset, err := info.GetPartNumberOffset(startOffset)
if err != nil { if err != nil {
return nil, toObjectErr(err, bucket, object) return nil, toObjectErr(err, bucket, object)
} }
go func() { go func() {
for ; partIndex < len(parts); partIndex++ { for ; partIndex < len(info); partIndex++ {
part := parts[partIndex] part := info[partIndex]
r, err := xl.storage.ReadFile(bucket, pathJoin(object, fmt.Sprint(part.PartNumber)), offset) r, err := xl.storage.ReadFile(bucket, pathJoin(object, partNumToPartFileName(part.PartNumber)), offset)
if err != nil { if err != nil {
fileWriter.CloseWithError(err) fileWriter.CloseWithError(err)
return return
@ -138,38 +120,19 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
} }
// Return the parts of a multipart upload. // Return the parts of a multipart upload.
func (xl xlObjects) getParts(bucket, object string) (parts completedParts, err error) { func (xl xlObjects) getMultipartObjectInfo(bucket, object string) (info MultipartObjectInfo, err error) {
offset := int64(0) offset := int64(0)
r, err := xl.storage.ReadFile(bucket, pathJoin(object, multipartMetaFile), offset) r, err := xl.storage.ReadFile(bucket, pathJoin(object, multipartMetaFile), offset)
if err != nil { if err != nil {
return return
} }
// FIXME: what if multipart.json is > 4MB decoder := json.NewDecoder(r)
b := make([]byte, 4*1024*1024) err = decoder.Decode(&info)
n, err := io.ReadFull(r, b)
if err != nil && err != io.ErrUnexpectedEOF {
return
}
b = b[:n]
err = json.Unmarshal(b, &parts)
if err != nil {
return
}
return return
} }
// GetObjectInfo - get object info. // GetObjectInfo - get object info.
func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
getMultpartFileSize := func(parts completedParts) (size int64) {
for _, part := range parts {
fi, err := xl.storage.StatFile(bucket, pathJoin(object, fmt.Sprint(part.PartNumber)))
if err != nil {
continue
}
size += fi.Size
}
return size
}
// Verify if bucket is valid. // Verify if bucket is valid.
if !IsValidBucketName(bucket) { if !IsValidBucketName(bucket) {
return ObjectInfo{}, BucketNameInvalid{Bucket: bucket} return ObjectInfo{}, BucketNameInvalid{Bucket: bucket}
@ -180,11 +143,11 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
} }
fi, err := xl.storage.StatFile(bucket, object) fi, err := xl.storage.StatFile(bucket, object)
if err != nil { if err != nil {
parts, err := xl.getParts(bucket, object) info, err := xl.getMultipartObjectInfo(bucket, object)
if err != nil { if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object) return ObjectInfo{}, toObjectErr(err, bucket, object)
} }
fi.Size = getMultpartFileSize(parts) fi.Size = info.GetSize()
} }
contentType := "application/octet-stream" contentType := "application/octet-stream"
if objectExt := filepath.Ext(object); objectExt != "" { if objectExt := filepath.Ext(object); objectExt != "" {
@ -287,6 +250,8 @@ func (xl xlObjects) DeleteObject(bucket, object string) error {
return nil return nil
} }
// TODO - support non-recursive case, figure out file size for files uploaded using multipart.
func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
// Verify if bucket is valid. // Verify if bucket is valid.
if !IsValidBucketName(bucket) { if !IsValidBucketName(bucket) {
@ -311,21 +276,71 @@ func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
} }
} }
if maxKeys == 0 {
return ListObjectsInfo{}, nil
}
// Default is recursive, if delimiter is set then list non recursive. // Default is recursive, if delimiter is set then list non recursive.
recursive := true recursive := true
if delimiter == slashSeparator { if delimiter == slashSeparator {
recursive = false recursive = false
} }
fileInfos, eof, err := xl.storage.ListFiles(bucket, prefix, marker, recursive, maxKeys) var allFileInfos, fileInfos []FileInfo
if err != nil { var eof bool
return ListObjectsInfo{}, toObjectErr(err, bucket) var err error
} for {
if maxKeys == 0 { fileInfos, eof, err = xl.storage.ListFiles(bucket, prefix, marker, recursive, maxKeys)
return ListObjectsInfo{}, nil if err != nil {
return ListObjectsInfo{}, toObjectErr(err, bucket)
}
for _, fileInfo := range fileInfos {
// FIXME: use fileInfo.Mode.IsDir() instead after fixing the bug in
// XL listing which is not reseting the Mode to 0 for leaf dirs.
if strings.HasSuffix(fileInfo.Name, slashSeparator) {
if isLeafDirectory(xl.storage, bucket, fileInfo.Name) {
fileInfo.Name = strings.TrimSuffix(fileInfo.Name, slashSeparator)
// Set the Mode to a "regular" file.
fileInfo.Mode = 0
var info MultipartObjectInfo
info, err = xl.getMultipartObjectInfo(bucket, fileInfo.Name)
if err != nil {
return ListObjectsInfo{}, toObjectErr(err, bucket)
}
fileInfo.Size = info.GetSize()
allFileInfos = append(allFileInfos, fileInfo)
maxKeys--
continue
}
}
if strings.HasSuffix(fileInfo.Name, multipartMetaFile) {
fileInfo.Name = path.Dir(fileInfo.Name)
var info MultipartObjectInfo
info, err = xl.getMultipartObjectInfo(bucket, fileInfo.Name)
if err != nil {
return ListObjectsInfo{}, toObjectErr(err, bucket)
}
fileInfo.Size = info.GetSize()
allFileInfos = append(allFileInfos, fileInfo)
maxKeys--
continue
}
if strings.HasSuffix(fileInfo.Name, multipartSuffix) {
continue
}
allFileInfos = append(allFileInfos, fileInfo)
maxKeys--
}
if maxKeys == 0 {
break
}
if eof {
break
}
} }
result := ListObjectsInfo{IsTruncated: !eof} result := ListObjectsInfo{IsTruncated: !eof}
for _, fileInfo := range fileInfos {
for _, fileInfo := range allFileInfos {
// With delimiter set we fill in NextMarker and Prefixes. // With delimiter set we fill in NextMarker and Prefixes.
if delimiter == slashSeparator { if delimiter == slashSeparator {
result.NextMarker = fileInfo.Name result.NextMarker = fileInfo.Name