FS: Cleanup and Fix all multipart related operations. (#1836)

This commit is contained in:
Harshavardhana 2016-06-02 12:18:56 -07:00
parent de21126f7e
commit 67bba270a0
2 changed files with 300 additions and 200 deletions

70
fs-v1-multipart-common.go Normal file
View File

@ -0,0 +1,70 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"path"
"strings"
)
// Returns if the prefix is a multipart upload.
func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool {
_, err := fs.storage.StatFile(bucket, pathJoin(prefix, uploadsJSONFile))
return err == nil
}
// listUploadsInfo - list all uploads info.
func (fs fsObjects) listUploadsInfo(prefixPath string) (uploads []uploadInfo, err error) {
splitPrefixes := strings.SplitN(prefixPath, "/", 3)
uploadIDs, err := readUploadsJSON(splitPrefixes[1], splitPrefixes[2], fs.storage)
if err != nil {
if err == errFileNotFound {
return []uploadInfo{}, nil
}
return nil, err
}
uploads = uploadIDs.Uploads
return uploads, nil
}
// Checks whether bucket exists.
func (fs fsObjects) isBucketExist(bucket string) bool {
// Check whether bucket exists.
_, err := fs.storage.StatVol(bucket)
if err != nil {
if err == errVolumeNotFound {
return false
}
errorIf(err, "Stat failed on bucket "+bucket+".")
return false
}
return true
}
// isUploadIDExists - verify if a given uploadID exists and is valid.
func (fs fsObjects) isUploadIDExists(bucket, object, uploadID string) bool {
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
_, err := fs.storage.StatFile(minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile))
if err != nil {
if err == errFileNotFound {
return false
}
errorIf(err, "Unable to access upload id"+uploadIDPath)
return false
}
return true
}

View File

@ -29,132 +29,9 @@ import (
"github.com/skyrings/skyring-common/tools/uuid" "github.com/skyrings/skyring-common/tools/uuid"
) )
// Checks whether bucket exists.
func (fs fsObjects) isBucketExist(bucket string) bool {
// Check whether bucket exists.
_, err := fs.storage.StatVol(bucket)
if err != nil {
if err == errVolumeNotFound {
return false
}
errorIf(err, "Stat failed on bucket "+bucket+".")
return false
}
return true
}
// newMultipartUpload - initialize a new multipart.
func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[string]string) (uploadID string, err error) {
// Verify if bucket name is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
}
// Verify whether the bucket exists.
if !fs.isBucketExist(bucket) {
return "", BucketNotFound{Bucket: bucket}
}
// Verify if object name is valid.
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{Bucket: bucket, Object: object}
}
// No metadata is set, allocate a new one.
if meta == nil {
meta = make(map[string]string)
}
// Initialize `fs.json` values.
fsMeta := newFSMetaV1()
// This lock needs to be held for any changes to the directory contents of ".minio/multipart/object/"
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
uploadID = getUUID()
initiated := time.Now().UTC()
// Create 'uploads.json'
if err = writeUploadJSON(bucket, object, uploadID, initiated, fs.storage); err != nil {
return "", err
}
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID)
if err = fs.writeFSMetadata(minioMetaBucket, tempUploadIDPath, fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
}
err = fs.storage.RenameFile(minioMetaBucket, path.Join(tempUploadIDPath, fsMetaJSONFile), minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile))
if err != nil {
if dErr := fs.storage.DeleteFile(minioMetaBucket, path.Join(tempUploadIDPath, fsMetaJSONFile)); dErr != nil {
return "", toObjectErr(dErr, minioMetaBucket, tempUploadIDPath)
}
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
// Return success.
return uploadID, nil
}
// Returns if the prefix is a multipart upload.
func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool {
_, err := fs.storage.StatFile(bucket, pathJoin(prefix, uploadsJSONFile))
return err == nil
}
// listUploadsInfo - list all uploads info.
func (fs fsObjects) listUploadsInfo(prefixPath string) (uploads []uploadInfo, err error) {
splitPrefixes := strings.SplitN(prefixPath, "/", 3)
uploadIDs, err := readUploadsJSON(splitPrefixes[1], splitPrefixes[2], fs.storage)
if err != nil {
if err == errFileNotFound {
return []uploadInfo{}, nil
}
return nil, err
}
uploads = uploadIDs.Uploads
return uploads, nil
}
// listMultipartUploads - lists all multipart uploads. // listMultipartUploads - lists all multipart uploads.
func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
result := ListMultipartsInfo{} result := ListMultipartsInfo{}
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListMultipartsInfo{}, BucketNameInvalid{Bucket: bucket}
}
if !fs.isBucketExist(bucket) {
return ListMultipartsInfo{}, BucketNotFound{Bucket: bucket}
}
if !IsValidObjectPrefix(prefix) {
return ListMultipartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix}
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return ListMultipartsInfo{}, UnsupportedDelimiter{
Delimiter: delimiter,
}
}
// Verify if marker has prefix.
if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) {
return ListMultipartsInfo{}, InvalidMarkerPrefixCombination{
Marker: keyMarker,
Prefix: prefix,
}
}
if uploadIDMarker != "" {
if strings.HasSuffix(keyMarker, slashSeparator) {
return result, InvalidUploadIDKeyCombination{
UploadIDMarker: uploadIDMarker,
KeyMarker: keyMarker,
}
}
id, err := uuid.Parse(uploadIDMarker)
if err != nil {
return result, err
}
if id.IsZero() {
return result, MalformedUploadID{
UploadID: uploadIDMarker,
}
}
}
recursive := true recursive := true
if delimiter == slashSeparator { if delimiter == slashSeparator {
recursive = false recursive = false
@ -181,7 +58,9 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
var err error var err error
var eof bool var eof bool
if uploadIDMarker != "" { if uploadIDMarker != "" {
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker))
uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, fs.storage) uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, fs.storage)
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker))
if err != nil { if err != nil {
return ListMultipartsInfo{}, err return ListMultipartsInfo{}, err
} }
@ -227,7 +106,9 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
var tmpUploads []uploadMetadata var tmpUploads []uploadMetadata
var end bool var end bool
uploadIDMarker = "" uploadIDMarker = ""
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry))
tmpUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, fs.storage) tmpUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, fs.storage)
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry))
if err != nil { if err != nil {
return ListMultipartsInfo{}, err return ListMultipartsInfo{}, err
} }
@ -264,20 +145,109 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
return result, nil return result, nil
} }
// ListMultipartUploads - list multipart uploads. // ListMultipartUploads - lists all the pending multipart uploads on a
// bucket. Additionally takes 'prefix, keyMarker, uploadIDmarker and a
// delimiter' which allows us to list uploads match a particular
// prefix or lexically starting from 'keyMarker' or delimiting the
// output to get a directory like listing.
//
// Implements S3 compatible ListMultipartUploads API. The resulting
// ListMultipartsInfo structure is unmarshalled directly into XML and
// replied back to the client.
func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
// Validate input arguments.
if !IsValidBucketName(bucket) {
return ListMultipartsInfo{}, BucketNameInvalid{Bucket: bucket}
}
if !fs.isBucketExist(bucket) {
return ListMultipartsInfo{}, BucketNotFound{Bucket: bucket}
}
if !IsValidObjectPrefix(prefix) {
return ListMultipartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix}
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return ListMultipartsInfo{}, UnsupportedDelimiter{
Delimiter: delimiter,
}
}
// Verify if marker has prefix.
if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) {
return ListMultipartsInfo{}, InvalidMarkerPrefixCombination{
Marker: keyMarker,
Prefix: prefix,
}
}
if uploadIDMarker != "" {
if strings.HasSuffix(keyMarker, slashSeparator) {
return ListMultipartsInfo{}, InvalidUploadIDKeyCombination{
UploadIDMarker: uploadIDMarker,
KeyMarker: keyMarker,
}
}
id, err := uuid.Parse(uploadIDMarker)
if err != nil {
return ListMultipartsInfo{}, err
}
if id.IsZero() {
return ListMultipartsInfo{}, MalformedUploadID{
UploadID: uploadIDMarker,
}
}
}
return fs.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) return fs.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
} }
// NewMultipartUpload - initialize a new multipart upload, returns a unique id. // newMultipartUpload - wrapper for initializing a new multipart
func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) { // request, returns back a unique upload id.
meta = make(map[string]string) // Reset the meta value, we are not going to save headers for fs. //
return fs.newMultipartUpload(bucket, object, meta) // Internally this function creates 'uploads.json' associated for the
// incoming object at '.minio/multipart/bucket/object/uploads.json' on
// all the disks. `uploads.json` carries metadata regarding on going
// multipart operation on the object.
func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[string]string) (uploadID string, err error) {
// No metadata is set, allocate a new one.
if meta == nil {
meta = make(map[string]string)
}
// Initialize `fs.json` values.
fsMeta := newFSMetaV1()
// This lock needs to be held for any changes to the directory contents of ".minio/multipart/object/"
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
uploadID = getUUID()
initiated := time.Now().UTC()
// Create 'uploads.json'
if err = writeUploadJSON(bucket, object, uploadID, initiated, fs.storage); err != nil {
return "", err
}
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID)
if err = fs.writeFSMetadata(minioMetaBucket, tempUploadIDPath, fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
}
err = fs.storage.RenameFile(minioMetaBucket, path.Join(tempUploadIDPath, fsMetaJSONFile), minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile))
if err != nil {
if dErr := fs.storage.DeleteFile(minioMetaBucket, path.Join(tempUploadIDPath, fsMetaJSONFile)); dErr != nil {
return "", toObjectErr(dErr, minioMetaBucket, tempUploadIDPath)
}
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
// Return success.
return uploadID, nil
} }
// putObjectPartCommon - put object part. // NewMultipartUpload - initialize a new multipart upload, returns a
func (fs fsObjects) putObjectPart(bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { // unique id. The unique id returned here is of UUID form, for each
// Verify if bucket is valid. // subsequent request each UUID is unique.
//
// Implements S3 compatible initiate multipart API.
func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) {
meta = make(map[string]string) // Reset the meta value, we are not going to save headers for fs.
// Verify if bucket name is valid.
if !IsValidBucketName(bucket) { if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket} return "", BucketNameInvalid{Bucket: bucket}
} }
@ -285,16 +255,18 @@ func (fs fsObjects) putObjectPart(bucket string, object string, uploadID string,
if !fs.isBucketExist(bucket) { if !fs.isBucketExist(bucket) {
return "", BucketNotFound{Bucket: bucket} return "", BucketNotFound{Bucket: bucket}
} }
// Verify if object name is valid.
if !IsValidObjectName(object) { if !IsValidObjectName(object) {
return "", ObjectNameInvalid{Bucket: bucket, Object: object} return "", ObjectNameInvalid{Bucket: bucket, Object: object}
} }
if !fs.isUploadIDExists(bucket, object, uploadID) { return fs.newMultipartUpload(bucket, object, meta)
return "", InvalidUploadID{UploadID: uploadID} }
}
// Hold read lock on the uploadID so that no one aborts it.
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
// putObjectPart - reads incoming data until EOF for the part file on
// an ongoing multipart transaction. Internally incoming data is
// written to '.minio/tmp' location and safely renamed to
// '.minio/multipart' for reach parts.
func (fs fsObjects) putObjectPart(bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) {
// Hold write lock on the part so that there is no parallel upload on the part. // Hold write lock on the part so that there is no parallel upload on the part.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID))) nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID)))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID))) defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID)))
@ -362,29 +334,39 @@ func (fs fsObjects) putObjectPart(bucket string, object string, uploadID string,
return newMD5Hex, nil return newMD5Hex, nil
} }
// PutObjectPart - writes the multipart upload chunks. // PutObjectPart - reads incoming stream and internally erasure codes
// them. This call is similar to single put operation but it is part
// of the multipart transcation.
//
// Implements S3 compatible Upload Part API.
func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) {
return fs.putObjectPart(bucket, object, uploadID, partID, size, data, md5Hex)
}
func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
// Verify if bucket is valid. // Verify if bucket is valid.
if !IsValidBucketName(bucket) { if !IsValidBucketName(bucket) {
return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket} return "", BucketNameInvalid{Bucket: bucket}
} }
// Verify whether the bucket exists. // Verify whether the bucket exists.
if !fs.isBucketExist(bucket) { if !fs.isBucketExist(bucket) {
return ListPartsInfo{}, BucketNotFound{Bucket: bucket} return "", BucketNotFound{Bucket: bucket}
} }
if !IsValidObjectName(object) { if !IsValidObjectName(object) {
return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} return "", ObjectNameInvalid{Bucket: bucket, Object: object}
} }
// Hold read lock on the uploadID so that no one aborts it.
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
if !fs.isUploadIDExists(bucket, object, uploadID) { if !fs.isUploadIDExists(bucket, object, uploadID) {
return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID} return "", InvalidUploadID{UploadID: uploadID}
} }
// Hold lock so that there is no competing abort-multipart-upload or complete-multipart-upload. md5Sum, err := fs.putObjectPart(bucket, object, uploadID, partID, size, data, md5Hex)
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) return md5Sum, err
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) }
// listObjectParts - wrapper scanning through
// '.minio/multipart/bucket/object/UPLOADID'. Lists all the parts
// saved inside '.minio/multipart/bucket/object/UPLOADID'.
func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
result := ListPartsInfo{} result := ListPartsInfo{}
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
@ -432,26 +414,41 @@ func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberM
return result, nil return result, nil
} }
// ListObjectParts - list all parts. // ListObjectParts - lists all previously uploaded parts for a given
// object and uploadID. Takes additional input of part-number-marker
// to indicate where the listing should begin from.
//
// Implements S3 compatible ListObjectParts API. The resulting
// ListPartsInfo structure is unmarshalled directly into XML and
// replied back to the client.
func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket}
}
// Verify whether the bucket exists.
if !fs.isBucketExist(bucket) {
return ListPartsInfo{}, BucketNotFound{Bucket: bucket}
}
if !IsValidObjectName(object) {
return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object}
}
// Hold lock so that there is no competing abort-multipart-upload or complete-multipart-upload.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
if !fs.isUploadIDExists(bucket, object, uploadID) {
return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID}
}
return fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) return fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
} }
// isUploadIDExists - verify if a given uploadID exists and is valid. // CompleteMultipartUpload - completes an ongoing multipart
func (fs fsObjects) isUploadIDExists(bucket, object, uploadID string) bool { // transaction after receiving all the parts indicated by the client.
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) // Returns an md5sum calculated by concatenating all the individual
_, err := fs.storage.StatFile(minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile)) // md5sums of all the parts.
if err != nil { //
if err == errFileNotFound { // Implements S3 compatible Complete multipart API.
return false
}
errorIf(err, "Unable to access upload id"+uploadIDPath)
return false
}
return true
}
// CompleteMultipartUpload - implement complete multipart upload transaction.
func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) { func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) {
// Verify if bucket is valid. // Verify if bucket is valid.
if !IsValidBucketName(bucket) { if !IsValidBucketName(bucket) {
@ -467,10 +464,25 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
Object: object, Object: object,
} }
} }
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
// Hold lock so that
// 1) no one aborts this multipart upload
// 2) no one does a parallel complete-multipart-upload on this
// multipart upload
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
if !fs.isUploadIDExists(bucket, object, uploadID) { if !fs.isUploadIDExists(bucket, object, uploadID) {
return "", InvalidUploadID{UploadID: uploadID} return "", InvalidUploadID{UploadID: uploadID}
} }
// Read saved fs metadata for ongoing multipart.
fsMeta, err := fs.readFSMetadata(minioMetaBucket, uploadIDPath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
// Calculate s3 compatible md5sum for complete multipart. // Calculate s3 compatible md5sum for complete multipart.
s3MD5, err := completeMultipartMD5(parts...) s3MD5, err := completeMultipartMD5(parts...)
if err != nil { if err != nil {
@ -482,23 +494,22 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
// Loop through all parts, validate them and then commit to disk. // Loop through all parts, validate them and then commit to disk.
for i, part := range parts { for i, part := range parts {
partIdx := fsMeta.ObjectPartIndex(part.PartNumber)
if partIdx == -1 {
return "", InvalidPart{}
}
if fsMeta.Parts[partIdx].ETag != part.ETag {
return "", BadDigest{}
}
// All parts except the last part has to be atleast 5MB.
if (i < len(parts)-1) && !isMinAllowedPartSize(fsMeta.Parts[partIdx].Size) {
return "", PartTooSmall{}
}
// Construct part suffix. // Construct part suffix.
partSuffix := fmt.Sprintf("object%d", part.PartNumber) partSuffix := fmt.Sprintf("object%d", part.PartNumber)
multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix)
var fi FileInfo
fi, err = fs.storage.StatFile(minioMetaBucket, multipartPartFile)
if err != nil {
if err == errFileNotFound {
return "", InvalidPart{}
}
return "", toObjectErr(err, minioMetaBucket, multipartPartFile)
}
// All parts except the last part has to be atleast 5MB.
if (i < len(parts)-1) && !isMinAllowedPartSize(fi.Size) {
return "", PartTooSmall{}
}
offset := int64(0) offset := int64(0)
totalLeft := fi.Size totalLeft := fsMeta.Parts[partIdx].Size
for totalLeft > 0 { for totalLeft > 0 {
var n int64 var n int64
n, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, offset, buffer) n, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, offset, buffer)
@ -535,26 +546,11 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
return s3MD5, nil return s3MD5, nil
} }
// abortMultipartUpload - aborts a multipart upload. // abortMultipartUpload - wrapper for purging an ongoing multipart
// transaction, deletes uploadID entry from `uploads.json` and purges
// the directory at '.minio/multipart/bucket/object/uploadID' holding
// all the upload parts.
func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error { func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
}
if !fs.isBucketExist(bucket) {
return BucketNotFound{Bucket: bucket}
}
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
}
if !fs.isUploadIDExists(bucket, object, uploadID) {
return InvalidUploadID{UploadID: uploadID}
}
// Hold lock so that there is no competing complete-multipart-upload or put-object-part.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
// Cleanup all uploaded parts. // Cleanup all uploaded parts.
if err := cleanupUploadedParts(bucket, object, uploadID, fs.storage); err != nil { if err := cleanupUploadedParts(bucket, object, uploadID, fs.storage); err != nil {
return err return err
@ -568,6 +564,8 @@ func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error
if uploadIDIdx != -1 { if uploadIDIdx != -1 {
uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...) uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...)
} }
// There are pending uploads for the same object, preserve
// them update 'uploads.json' in-place.
if len(uploadsJSON.Uploads) > 0 { if len(uploadsJSON.Uploads) > 0 {
err = updateUploadsJSON(bucket, object, uploadsJSON, fs.storage) err = updateUploadsJSON(bucket, object, uploadsJSON, fs.storage)
if err != nil { if err != nil {
@ -575,14 +573,46 @@ func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error
} }
return nil return nil
} }
} } // No more pending uploads for the object, we purge the entire
// entry at '.minio/multipart/bucket/object'.
if err = fs.storage.DeleteFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)); err != nil { if err = fs.storage.DeleteFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)); err != nil {
return toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) return toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
} }
return nil return nil
} }
// AbortMultipartUpload - aborts an multipart upload. // AbortMultipartUpload - aborts an ongoing multipart operation
// signified by the input uploadID. This is an atomic operation
// doesn't require clients to initiate multiple such requests.
//
// All parts are purged from all disks and reference to the uploadID
// would be removed from the system, rollback is not possible on this
// operation.
//
// Implements S3 compatible Abort multipart API, slight difference is
// that this is an atomic idempotent operation. Subsequent calls have
// no affect and further requests to the same uploadID would not be
// honored.
func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error { func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error {
return fs.abortMultipartUpload(bucket, object, uploadID) // Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
}
if !fs.isBucketExist(bucket) {
return BucketNotFound{Bucket: bucket}
}
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
}
// Hold lock so that there is no competing complete-multipart-upload or put-object-part.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
if !fs.isUploadIDExists(bucket, object, uploadID) {
return InvalidUploadID{UploadID: uploadID}
}
err := fs.abortMultipartUpload(bucket, object, uploadID)
return err
} }