From 67bba270a056ad6659c04585e283c07c17ac9a57 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 2 Jun 2016 12:18:56 -0700 Subject: [PATCH] FS: Cleanup and Fix all multipart related operations. (#1836) --- fs-v1-multipart-common.go | 70 +++++++ fs-v1-multipart.go | 430 ++++++++++++++++++++------------------ 2 files changed, 300 insertions(+), 200 deletions(-) create mode 100644 fs-v1-multipart-common.go diff --git a/fs-v1-multipart-common.go b/fs-v1-multipart-common.go new file mode 100644 index 000000000..96ecea0bf --- /dev/null +++ b/fs-v1-multipart-common.go @@ -0,0 +1,70 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "path" + "strings" +) + +// Returns if the prefix is a multipart upload. +func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool { + _, err := fs.storage.StatFile(bucket, pathJoin(prefix, uploadsJSONFile)) + return err == nil +} + +// listUploadsInfo - list all uploads info. +func (fs fsObjects) listUploadsInfo(prefixPath string) (uploads []uploadInfo, err error) { + splitPrefixes := strings.SplitN(prefixPath, "/", 3) + uploadIDs, err := readUploadsJSON(splitPrefixes[1], splitPrefixes[2], fs.storage) + if err != nil { + if err == errFileNotFound { + return []uploadInfo{}, nil + } + return nil, err + } + uploads = uploadIDs.Uploads + return uploads, nil +} + +// Checks whether bucket exists. +func (fs fsObjects) isBucketExist(bucket string) bool { + // Check whether bucket exists. + _, err := fs.storage.StatVol(bucket) + if err != nil { + if err == errVolumeNotFound { + return false + } + errorIf(err, "Stat failed on bucket "+bucket+".") + return false + } + return true +} + +// isUploadIDExists - verify if a given uploadID exists and is valid. +func (fs fsObjects) isUploadIDExists(bucket, object, uploadID string) bool { + uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + _, err := fs.storage.StatFile(minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile)) + if err != nil { + if err == errFileNotFound { + return false + } + errorIf(err, "Unable to access upload id"+uploadIDPath) + return false + } + return true +} diff --git a/fs-v1-multipart.go b/fs-v1-multipart.go index 230a17370..cebe2b77c 100644 --- a/fs-v1-multipart.go +++ b/fs-v1-multipart.go @@ -29,132 +29,9 @@ import ( "github.com/skyrings/skyring-common/tools/uuid" ) -// Checks whether bucket exists. -func (fs fsObjects) isBucketExist(bucket string) bool { - // Check whether bucket exists. - _, err := fs.storage.StatVol(bucket) - if err != nil { - if err == errVolumeNotFound { - return false - } - errorIf(err, "Stat failed on bucket "+bucket+".") - return false - } - return true -} - -// newMultipartUpload - initialize a new multipart. -func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[string]string) (uploadID string, err error) { - // Verify if bucket name is valid. - if !IsValidBucketName(bucket) { - return "", BucketNameInvalid{Bucket: bucket} - } - // Verify whether the bucket exists. - if !fs.isBucketExist(bucket) { - return "", BucketNotFound{Bucket: bucket} - } - // Verify if object name is valid. - if !IsValidObjectName(object) { - return "", ObjectNameInvalid{Bucket: bucket, Object: object} - } - // No metadata is set, allocate a new one. - if meta == nil { - meta = make(map[string]string) - } - - // Initialize `fs.json` values. - fsMeta := newFSMetaV1() - - // This lock needs to be held for any changes to the directory contents of ".minio/multipart/object/" - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) - - uploadID = getUUID() - initiated := time.Now().UTC() - // Create 'uploads.json' - if err = writeUploadJSON(bucket, object, uploadID, initiated, fs.storage); err != nil { - return "", err - } - uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) - tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID) - if err = fs.writeFSMetadata(minioMetaBucket, tempUploadIDPath, fsMeta); err != nil { - return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) - } - err = fs.storage.RenameFile(minioMetaBucket, path.Join(tempUploadIDPath, fsMetaJSONFile), minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile)) - if err != nil { - if dErr := fs.storage.DeleteFile(minioMetaBucket, path.Join(tempUploadIDPath, fsMetaJSONFile)); dErr != nil { - return "", toObjectErr(dErr, minioMetaBucket, tempUploadIDPath) - } - return "", toObjectErr(err, minioMetaBucket, uploadIDPath) - } - // Return success. - return uploadID, nil -} - -// Returns if the prefix is a multipart upload. -func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool { - _, err := fs.storage.StatFile(bucket, pathJoin(prefix, uploadsJSONFile)) - return err == nil -} - -// listUploadsInfo - list all uploads info. -func (fs fsObjects) listUploadsInfo(prefixPath string) (uploads []uploadInfo, err error) { - splitPrefixes := strings.SplitN(prefixPath, "/", 3) - uploadIDs, err := readUploadsJSON(splitPrefixes[1], splitPrefixes[2], fs.storage) - if err != nil { - if err == errFileNotFound { - return []uploadInfo{}, nil - } - return nil, err - } - uploads = uploadIDs.Uploads - return uploads, nil -} - // listMultipartUploads - lists all multipart uploads. func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { result := ListMultipartsInfo{} - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return ListMultipartsInfo{}, BucketNameInvalid{Bucket: bucket} - } - if !fs.isBucketExist(bucket) { - return ListMultipartsInfo{}, BucketNotFound{Bucket: bucket} - } - if !IsValidObjectPrefix(prefix) { - return ListMultipartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix} - } - // Verify if delimiter is anything other than '/', which we do not support. - if delimiter != "" && delimiter != slashSeparator { - return ListMultipartsInfo{}, UnsupportedDelimiter{ - Delimiter: delimiter, - } - } - // Verify if marker has prefix. - if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) { - return ListMultipartsInfo{}, InvalidMarkerPrefixCombination{ - Marker: keyMarker, - Prefix: prefix, - } - } - if uploadIDMarker != "" { - if strings.HasSuffix(keyMarker, slashSeparator) { - return result, InvalidUploadIDKeyCombination{ - UploadIDMarker: uploadIDMarker, - KeyMarker: keyMarker, - } - } - id, err := uuid.Parse(uploadIDMarker) - if err != nil { - return result, err - } - if id.IsZero() { - return result, MalformedUploadID{ - UploadID: uploadIDMarker, - } - } - } - recursive := true if delimiter == slashSeparator { recursive = false @@ -181,7 +58,9 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark var err error var eof bool if uploadIDMarker != "" { + nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker)) uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, fs.storage) + nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker)) if err != nil { return ListMultipartsInfo{}, err } @@ -227,7 +106,9 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark var tmpUploads []uploadMetadata var end bool uploadIDMarker = "" + nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry)) tmpUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, fs.storage) + nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry)) if err != nil { return ListMultipartsInfo{}, err } @@ -264,20 +145,109 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark return result, nil } -// ListMultipartUploads - list multipart uploads. +// ListMultipartUploads - lists all the pending multipart uploads on a +// bucket. Additionally takes 'prefix, keyMarker, uploadIDmarker and a +// delimiter' which allows us to list uploads match a particular +// prefix or lexically starting from 'keyMarker' or delimiting the +// output to get a directory like listing. +// +// Implements S3 compatible ListMultipartUploads API. The resulting +// ListMultipartsInfo structure is unmarshalled directly into XML and +// replied back to the client. func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { + // Validate input arguments. + if !IsValidBucketName(bucket) { + return ListMultipartsInfo{}, BucketNameInvalid{Bucket: bucket} + } + if !fs.isBucketExist(bucket) { + return ListMultipartsInfo{}, BucketNotFound{Bucket: bucket} + } + if !IsValidObjectPrefix(prefix) { + return ListMultipartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix} + } + // Verify if delimiter is anything other than '/', which we do not support. + if delimiter != "" && delimiter != slashSeparator { + return ListMultipartsInfo{}, UnsupportedDelimiter{ + Delimiter: delimiter, + } + } + // Verify if marker has prefix. + if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) { + return ListMultipartsInfo{}, InvalidMarkerPrefixCombination{ + Marker: keyMarker, + Prefix: prefix, + } + } + if uploadIDMarker != "" { + if strings.HasSuffix(keyMarker, slashSeparator) { + return ListMultipartsInfo{}, InvalidUploadIDKeyCombination{ + UploadIDMarker: uploadIDMarker, + KeyMarker: keyMarker, + } + } + id, err := uuid.Parse(uploadIDMarker) + if err != nil { + return ListMultipartsInfo{}, err + } + if id.IsZero() { + return ListMultipartsInfo{}, MalformedUploadID{ + UploadID: uploadIDMarker, + } + } + } return fs.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) } -// NewMultipartUpload - initialize a new multipart upload, returns a unique id. -func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) { - meta = make(map[string]string) // Reset the meta value, we are not going to save headers for fs. - return fs.newMultipartUpload(bucket, object, meta) +// newMultipartUpload - wrapper for initializing a new multipart +// request, returns back a unique upload id. +// +// Internally this function creates 'uploads.json' associated for the +// incoming object at '.minio/multipart/bucket/object/uploads.json' on +// all the disks. `uploads.json` carries metadata regarding on going +// multipart operation on the object. +func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[string]string) (uploadID string, err error) { + // No metadata is set, allocate a new one. + if meta == nil { + meta = make(map[string]string) + } + + // Initialize `fs.json` values. + fsMeta := newFSMetaV1() + + // This lock needs to be held for any changes to the directory contents of ".minio/multipart/object/" + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) + + uploadID = getUUID() + initiated := time.Now().UTC() + // Create 'uploads.json' + if err = writeUploadJSON(bucket, object, uploadID, initiated, fs.storage); err != nil { + return "", err + } + uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID) + if err = fs.writeFSMetadata(minioMetaBucket, tempUploadIDPath, fsMeta); err != nil { + return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) + } + err = fs.storage.RenameFile(minioMetaBucket, path.Join(tempUploadIDPath, fsMetaJSONFile), minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile)) + if err != nil { + if dErr := fs.storage.DeleteFile(minioMetaBucket, path.Join(tempUploadIDPath, fsMetaJSONFile)); dErr != nil { + return "", toObjectErr(dErr, minioMetaBucket, tempUploadIDPath) + } + return "", toObjectErr(err, minioMetaBucket, uploadIDPath) + } + // Return success. + return uploadID, nil } -// putObjectPartCommon - put object part. -func (fs fsObjects) putObjectPart(bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { - // Verify if bucket is valid. +// NewMultipartUpload - initialize a new multipart upload, returns a +// unique id. The unique id returned here is of UUID form, for each +// subsequent request each UUID is unique. +// +// Implements S3 compatible initiate multipart API. +func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) { + meta = make(map[string]string) // Reset the meta value, we are not going to save headers for fs. + // Verify if bucket name is valid. if !IsValidBucketName(bucket) { return "", BucketNameInvalid{Bucket: bucket} } @@ -285,16 +255,18 @@ func (fs fsObjects) putObjectPart(bucket string, object string, uploadID string, if !fs.isBucketExist(bucket) { return "", BucketNotFound{Bucket: bucket} } + // Verify if object name is valid. if !IsValidObjectName(object) { return "", ObjectNameInvalid{Bucket: bucket, Object: object} } - if !fs.isUploadIDExists(bucket, object, uploadID) { - return "", InvalidUploadID{UploadID: uploadID} - } - // Hold read lock on the uploadID so that no one aborts it. - nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - defer nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + return fs.newMultipartUpload(bucket, object, meta) +} +// putObjectPart - reads incoming data until EOF for the part file on +// an ongoing multipart transaction. Internally incoming data is +// written to '.minio/tmp' location and safely renamed to +// '.minio/multipart' for reach parts. +func (fs fsObjects) putObjectPart(bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { // Hold write lock on the part so that there is no parallel upload on the part. nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID))) defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID))) @@ -362,29 +334,39 @@ func (fs fsObjects) putObjectPart(bucket string, object string, uploadID string, return newMD5Hex, nil } -// PutObjectPart - writes the multipart upload chunks. +// PutObjectPart - reads incoming stream and internally erasure codes +// them. This call is similar to single put operation but it is part +// of the multipart transcation. +// +// Implements S3 compatible Upload Part API. func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { - return fs.putObjectPart(bucket, object, uploadID, partID, size, data, md5Hex) -} - -func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket} + return "", BucketNameInvalid{Bucket: bucket} } // Verify whether the bucket exists. if !fs.isBucketExist(bucket) { - return ListPartsInfo{}, BucketNotFound{Bucket: bucket} + return "", BucketNotFound{Bucket: bucket} } if !IsValidObjectName(object) { - return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} + return "", ObjectNameInvalid{Bucket: bucket, Object: object} } + + // Hold read lock on the uploadID so that no one aborts it. + nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + defer nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + if !fs.isUploadIDExists(bucket, object, uploadID) { - return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID} + return "", InvalidUploadID{UploadID: uploadID} } - // Hold lock so that there is no competing abort-multipart-upload or complete-multipart-upload. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + md5Sum, err := fs.putObjectPart(bucket, object, uploadID, partID, size, data, md5Hex) + return md5Sum, err +} + +// listObjectParts - wrapper scanning through +// '.minio/multipart/bucket/object/UPLOADID'. Lists all the parts +// saved inside '.minio/multipart/bucket/object/UPLOADID'. +func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { result := ListPartsInfo{} uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) @@ -432,26 +414,41 @@ func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberM return result, nil } -// ListObjectParts - list all parts. +// ListObjectParts - lists all previously uploaded parts for a given +// object and uploadID. Takes additional input of part-number-marker +// to indicate where the listing should begin from. +// +// Implements S3 compatible ListObjectParts API. The resulting +// ListPartsInfo structure is unmarshalled directly into XML and +// replied back to the client. func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket} + } + // Verify whether the bucket exists. + if !fs.isBucketExist(bucket) { + return ListPartsInfo{}, BucketNotFound{Bucket: bucket} + } + if !IsValidObjectName(object) { + return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} + } + // Hold lock so that there is no competing abort-multipart-upload or complete-multipart-upload. + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + + if !fs.isUploadIDExists(bucket, object, uploadID) { + return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID} + } return fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) } -// isUploadIDExists - verify if a given uploadID exists and is valid. -func (fs fsObjects) isUploadIDExists(bucket, object, uploadID string) bool { - uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) - _, err := fs.storage.StatFile(minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile)) - if err != nil { - if err == errFileNotFound { - return false - } - errorIf(err, "Unable to access upload id"+uploadIDPath) - return false - } - return true -} - -// CompleteMultipartUpload - implement complete multipart upload transaction. +// CompleteMultipartUpload - completes an ongoing multipart +// transaction after receiving all the parts indicated by the client. +// Returns an md5sum calculated by concatenating all the individual +// md5sums of all the parts. +// +// Implements S3 compatible Complete multipart API. func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { @@ -467,10 +464,25 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload Object: object, } } + + uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + // Hold lock so that + // 1) no one aborts this multipart upload + // 2) no one does a parallel complete-multipart-upload on this + // multipart upload + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + if !fs.isUploadIDExists(bucket, object, uploadID) { return "", InvalidUploadID{UploadID: uploadID} } + // Read saved fs metadata for ongoing multipart. + fsMeta, err := fs.readFSMetadata(minioMetaBucket, uploadIDPath) + if err != nil { + return "", toObjectErr(err, minioMetaBucket, uploadIDPath) + } + // Calculate s3 compatible md5sum for complete multipart. s3MD5, err := completeMultipartMD5(parts...) if err != nil { @@ -482,23 +494,22 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload // Loop through all parts, validate them and then commit to disk. for i, part := range parts { + partIdx := fsMeta.ObjectPartIndex(part.PartNumber) + if partIdx == -1 { + return "", InvalidPart{} + } + if fsMeta.Parts[partIdx].ETag != part.ETag { + return "", BadDigest{} + } + // All parts except the last part has to be atleast 5MB. + if (i < len(parts)-1) && !isMinAllowedPartSize(fsMeta.Parts[partIdx].Size) { + return "", PartTooSmall{} + } // Construct part suffix. partSuffix := fmt.Sprintf("object%d", part.PartNumber) multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) - var fi FileInfo - fi, err = fs.storage.StatFile(minioMetaBucket, multipartPartFile) - if err != nil { - if err == errFileNotFound { - return "", InvalidPart{} - } - return "", toObjectErr(err, minioMetaBucket, multipartPartFile) - } - // All parts except the last part has to be atleast 5MB. - if (i < len(parts)-1) && !isMinAllowedPartSize(fi.Size) { - return "", PartTooSmall{} - } offset := int64(0) - totalLeft := fi.Size + totalLeft := fsMeta.Parts[partIdx].Size for totalLeft > 0 { var n int64 n, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, offset, buffer) @@ -535,26 +546,11 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload return s3MD5, nil } -// abortMultipartUpload - aborts a multipart upload. +// abortMultipartUpload - wrapper for purging an ongoing multipart +// transaction, deletes uploadID entry from `uploads.json` and purges +// the directory at '.minio/multipart/bucket/object/uploadID' holding +// all the upload parts. func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return BucketNameInvalid{Bucket: bucket} - } - if !fs.isBucketExist(bucket) { - return BucketNotFound{Bucket: bucket} - } - if !IsValidObjectName(object) { - return ObjectNameInvalid{Bucket: bucket, Object: object} - } - if !fs.isUploadIDExists(bucket, object, uploadID) { - return InvalidUploadID{UploadID: uploadID} - } - - // Hold lock so that there is no competing complete-multipart-upload or put-object-part. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - // Cleanup all uploaded parts. if err := cleanupUploadedParts(bucket, object, uploadID, fs.storage); err != nil { return err @@ -568,6 +564,8 @@ func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error if uploadIDIdx != -1 { uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...) } + // There are pending uploads for the same object, preserve + // them update 'uploads.json' in-place. if len(uploadsJSON.Uploads) > 0 { err = updateUploadsJSON(bucket, object, uploadsJSON, fs.storage) if err != nil { @@ -575,14 +573,46 @@ func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error } return nil } - } + } // No more pending uploads for the object, we purge the entire + // entry at '.minio/multipart/bucket/object'. if err = fs.storage.DeleteFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)); err != nil { return toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) } return nil } -// AbortMultipartUpload - aborts an multipart upload. +// AbortMultipartUpload - aborts an ongoing multipart operation +// signified by the input uploadID. This is an atomic operation +// doesn't require clients to initiate multiple such requests. +// +// All parts are purged from all disks and reference to the uploadID +// would be removed from the system, rollback is not possible on this +// operation. +// +// Implements S3 compatible Abort multipart API, slight difference is +// that this is an atomic idempotent operation. Subsequent calls have +// no affect and further requests to the same uploadID would not be +// honored. func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error { - return fs.abortMultipartUpload(bucket, object, uploadID) + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return BucketNameInvalid{Bucket: bucket} + } + if !fs.isBucketExist(bucket) { + return BucketNotFound{Bucket: bucket} + } + if !IsValidObjectName(object) { + return ObjectNameInvalid{Bucket: bucket, Object: object} + } + + // Hold lock so that there is no competing complete-multipart-upload or put-object-part. + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + + if !fs.isUploadIDExists(bucket, object, uploadID) { + return InvalidUploadID{UploadID: uploadID} + } + + err := fs.abortMultipartUpload(bucket, object, uploadID) + return err }