diff --git a/xl-objects-multipart.go b/xl-objects-multipart.go index ad02ee121..28f8d72a7 100644 --- a/xl-objects-multipart.go +++ b/xl-objects-multipart.go @@ -24,6 +24,7 @@ import ( "io" "io/ioutil" "path" + "sort" "strconv" "strings" @@ -31,6 +32,45 @@ import ( "github.com/skyrings/skyring-common/tools/uuid" ) +// MultipartPartInfo Info of each part kept in the multipart metadata file after +// CompleteMultipartUpload() is called. +type MultipartPartInfo struct { + PartNumber int + ETag string + Size int64 +} + +// MultipartObjectInfo - contents of the multipart metadata file after +// CompleteMultipartUpload() is called. +type MultipartObjectInfo []MultipartPartInfo + +// GetSize - Return the size of the object. +func (m MultipartObjectInfo) GetSize() (size int64) { + for _, part := range m { + size += part.Size + } + return +} + +// GetPartNumberOffset - given an offset for the whole object, return the part and offset in that part. +func (m MultipartObjectInfo) GetPartNumberOffset(offset int64) (partIndex int, partOffset int64, err error) { + partOffset = offset + for i, part := range m { + partIndex = i + if partOffset < part.Size { + return + } + partOffset -= part.Size + } + // Offset beyond the size of the object + err = errUnexpected + return +} + +func partNumToPartFileName(partNum int) string { + return fmt.Sprintf("%.5d%s", partNum, multipartSuffix) +} + // listLeafEntries - lists all entries if a given prefixPath is a leaf // directory, returns error if any - returns empty list if prefixPath // is not a leaf directory. @@ -326,8 +366,9 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s return "", InvalidUploadID{UploadID: uploadID} } - partSuffix := fmt.Sprintf("%s.%d.%s", uploadID, partID, md5Hex) - fileWriter, err := xl.storage.CreateFile(minioMetaVolume, path.Join(bucket, object, partSuffix)) + partSuffix := fmt.Sprintf("%s.%d", uploadID, partID) + partSuffixPath := path.Join(bucket, object, partSuffix) + fileWriter, err := xl.storage.CreateFile(minioMetaVolume, partSuffixPath) if err != nil { return "", toObjectErr(err, bucket, object) } @@ -369,6 +410,12 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s if err != nil { return "", err } + partSuffixMD5 := fmt.Sprintf("%s.%.5d.%s", uploadID, partID, newMD5Hex) + partSuffixMD5Path := path.Join(bucket, object, partSuffixMD5) + err = xl.storage.RenameFile(minioMetaVolume, partSuffixPath, minioMetaVolume, partSuffixMD5Path) + if err != nil { + return "", err + } return newMD5Hex, nil } @@ -392,7 +439,7 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM // Figure out the marker for the next subsequent calls, if the // partNumberMarker is already set. if partNumberMarker > 0 { - partNumberMarkerPath := uploadIDPath + "." + strconv.Itoa(partNumberMarker) + "." + partNumberMarkerPath := uploadIDPath + "." + fmt.Sprintf("%.5d", partNumberMarker) + "." fileInfos, _, err := xl.storage.ListFiles(minioMetaVolume, partNumberMarkerPath, "", false, 1) if err != nil { return result, toObjectErr(err, minioMetaVolume, partNumberMarkerPath) @@ -449,12 +496,21 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload } else if !status { return "", (InvalidUploadID{UploadID: uploadID}) } - + sort.Sort(completedParts(parts)) + var metadata MultipartObjectInfo + for _, part := range parts { + partSuffix := fmt.Sprintf("%s.%.5d.%s", uploadID, part.PartNumber, part.ETag) + fi, err := xl.storage.StatFile(minioMetaVolume, path.Join(bucket, object, partSuffix)) + if err != nil { + return "", err + } + metadata = append(metadata, MultipartPartInfo{part.PartNumber, part.ETag, fi.Size}) + } var md5Sums []string for _, part := range parts { // Construct part suffix. - partSuffix := fmt.Sprintf("%s.%d.%s", uploadID, part.PartNumber, part.ETag) - err := xl.storage.RenameFile(minioMetaVolume, path.Join(bucket, object, partSuffix), bucket, path.Join(object, fmt.Sprint(part.PartNumber))) + partSuffix := fmt.Sprintf("%s.%.5d.%s", uploadID, part.PartNumber, part.ETag) + err := xl.storage.RenameFile(minioMetaVolume, path.Join(bucket, object, partSuffix), bucket, path.Join(object, partNumToPartFileName(part.PartNumber))) // We need a way to roll back if of the renames failed. if err != nil { return "", err @@ -464,7 +520,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload if w, err := xl.storage.CreateFile(bucket, pathJoin(object, multipartMetaFile)); err == nil { var b []byte - b, err = json.Marshal(parts) + b, err = json.Marshal(metadata) if err != nil { return "", err } diff --git a/xl-objects.go b/xl-objects.go index 26011b782..c42afcaf7 100644 --- a/xl-objects.go +++ b/xl-objects.go @@ -20,8 +20,8 @@ import ( "crypto/md5" "encoding/hex" "encoding/json" - "fmt" "io" + "path" "path/filepath" "strings" @@ -29,7 +29,8 @@ import ( ) const ( - multipartMetaFile = "multipart.json" + multipartSuffix = ".minio.multipart" + multipartMetaFile = "00000" + multipartSuffix ) // xlObjects - Implements fs object layer. @@ -72,25 +73,6 @@ func (xl xlObjects) DeleteBucket(bucket string) error { // GetObject - get an object. func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.ReadCloser, error) { - findPartOffset := func(parts completedParts) (partIndex int, partOffset int64, err error) { - partOffset = startOffset - for i, part := range parts { - partIndex = i - var fileInfo FileInfo - fileInfo, err = xl.storage.StatFile(bucket, pathJoin(object, fmt.Sprint(part.PartNumber))) - if err != nil { - return - } - if partOffset < fileInfo.Size { - return - } - partOffset -= fileInfo.Size - } - // Offset beyond the size of the object - err = errUnexpected - return - } - // Verify if bucket is valid. if !IsValidBucketName(bucket) { return nil, BucketNameInvalid{Bucket: bucket} @@ -111,18 +93,18 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read return nil, toObjectErr(err, bucket, object) } fileReader, fileWriter := io.Pipe() - parts, err := xl.getParts(bucket, object) + info, err := xl.getMultipartObjectInfo(bucket, object) if err != nil { return nil, toObjectErr(err, bucket, object) } - partIndex, offset, err := findPartOffset(parts) + partIndex, offset, err := info.GetPartNumberOffset(startOffset) if err != nil { return nil, toObjectErr(err, bucket, object) } go func() { - for ; partIndex < len(parts); partIndex++ { - part := parts[partIndex] - r, err := xl.storage.ReadFile(bucket, pathJoin(object, fmt.Sprint(part.PartNumber)), offset) + for ; partIndex < len(info); partIndex++ { + part := info[partIndex] + r, err := xl.storage.ReadFile(bucket, pathJoin(object, partNumToPartFileName(part.PartNumber)), offset) if err != nil { fileWriter.CloseWithError(err) return @@ -138,38 +120,19 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read } // Return the parts of a multipart upload. -func (xl xlObjects) getParts(bucket, object string) (parts completedParts, err error) { +func (xl xlObjects) getMultipartObjectInfo(bucket, object string) (info MultipartObjectInfo, err error) { offset := int64(0) r, err := xl.storage.ReadFile(bucket, pathJoin(object, multipartMetaFile), offset) if err != nil { return } - // FIXME: what if multipart.json is > 4MB - b := make([]byte, 4*1024*1024) - n, err := io.ReadFull(r, b) - if err != nil && err != io.ErrUnexpectedEOF { - return - } - b = b[:n] - err = json.Unmarshal(b, &parts) - if err != nil { - return - } + decoder := json.NewDecoder(r) + err = decoder.Decode(&info) return } // GetObjectInfo - get object info. func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { - getMultpartFileSize := func(parts completedParts) (size int64) { - for _, part := range parts { - fi, err := xl.storage.StatFile(bucket, pathJoin(object, fmt.Sprint(part.PartNumber))) - if err != nil { - continue - } - size += fi.Size - } - return size - } // Verify if bucket is valid. if !IsValidBucketName(bucket) { return ObjectInfo{}, BucketNameInvalid{Bucket: bucket} @@ -180,11 +143,11 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { } fi, err := xl.storage.StatFile(bucket, object) if err != nil { - parts, err := xl.getParts(bucket, object) + info, err := xl.getMultipartObjectInfo(bucket, object) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } - fi.Size = getMultpartFileSize(parts) + fi.Size = info.GetSize() } contentType := "application/octet-stream" if objectExt := filepath.Ext(object); objectExt != "" { @@ -287,6 +250,8 @@ func (xl xlObjects) DeleteObject(bucket, object string) error { return nil } +// TODO - support non-recursive case, figure out file size for files uploaded using multipart. + func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { @@ -311,21 +276,71 @@ func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey } } + if maxKeys == 0 { + return ListObjectsInfo{}, nil + } + // Default is recursive, if delimiter is set then list non recursive. recursive := true if delimiter == slashSeparator { recursive = false } - fileInfos, eof, err := xl.storage.ListFiles(bucket, prefix, marker, recursive, maxKeys) - if err != nil { - return ListObjectsInfo{}, toObjectErr(err, bucket) - } - if maxKeys == 0 { - return ListObjectsInfo{}, nil + var allFileInfos, fileInfos []FileInfo + var eof bool + var err error + for { + fileInfos, eof, err = xl.storage.ListFiles(bucket, prefix, marker, recursive, maxKeys) + if err != nil { + return ListObjectsInfo{}, toObjectErr(err, bucket) + } + for _, fileInfo := range fileInfos { + // FIXME: use fileInfo.Mode.IsDir() instead after fixing the bug in + // XL listing which is not reseting the Mode to 0 for leaf dirs. + if strings.HasSuffix(fileInfo.Name, slashSeparator) { + if isLeafDirectory(xl.storage, bucket, fileInfo.Name) { + fileInfo.Name = strings.TrimSuffix(fileInfo.Name, slashSeparator) + // Set the Mode to a "regular" file. + fileInfo.Mode = 0 + var info MultipartObjectInfo + info, err = xl.getMultipartObjectInfo(bucket, fileInfo.Name) + if err != nil { + return ListObjectsInfo{}, toObjectErr(err, bucket) + } + fileInfo.Size = info.GetSize() + allFileInfos = append(allFileInfos, fileInfo) + maxKeys-- + continue + } + } + if strings.HasSuffix(fileInfo.Name, multipartMetaFile) { + fileInfo.Name = path.Dir(fileInfo.Name) + var info MultipartObjectInfo + info, err = xl.getMultipartObjectInfo(bucket, fileInfo.Name) + if err != nil { + return ListObjectsInfo{}, toObjectErr(err, bucket) + } + fileInfo.Size = info.GetSize() + allFileInfos = append(allFileInfos, fileInfo) + maxKeys-- + continue + } + if strings.HasSuffix(fileInfo.Name, multipartSuffix) { + continue + } + allFileInfos = append(allFileInfos, fileInfo) + maxKeys-- + } + if maxKeys == 0 { + break + } + if eof { + break + } } result := ListObjectsInfo{IsTruncated: !eof} - for _, fileInfo := range fileInfos { + + for _, fileInfo := range allFileInfos { // With delimiter set we fill in NextMarker and Prefixes. if delimiter == slashSeparator { result.NextMarker = fileInfo.Name