mirror of
https://github.com/minio/minio.git
synced 2025-04-20 10:37:31 -04:00
FS/Multipart: Lock() to avoid race during PutObjectPart. (#1842)
Fixes #1839
This commit is contained in:
parent
67bba270a0
commit
611c892f8f
@ -262,11 +262,33 @@ func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]st
|
|||||||
return fs.newMultipartUpload(bucket, object, meta)
|
return fs.newMultipartUpload(bucket, object, meta)
|
||||||
}
|
}
|
||||||
|
|
||||||
// putObjectPart - reads incoming data until EOF for the part file on
|
// PutObjectPart - reads incoming data until EOF for the part file on
|
||||||
// an ongoing multipart transaction. Internally incoming data is
|
// an ongoing multipart transaction. Internally incoming data is
|
||||||
// written to '.minio/tmp' location and safely renamed to
|
// written to '.minio/tmp' location and safely renamed to
|
||||||
// '.minio/multipart' for reach parts.
|
// '.minio/multipart' for reach parts.
|
||||||
func (fs fsObjects) putObjectPart(bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) {
|
func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) {
|
||||||
|
// Verify if bucket is valid.
|
||||||
|
if !IsValidBucketName(bucket) {
|
||||||
|
return "", BucketNameInvalid{Bucket: bucket}
|
||||||
|
}
|
||||||
|
// Verify whether the bucket exists.
|
||||||
|
if !fs.isBucketExist(bucket) {
|
||||||
|
return "", BucketNotFound{Bucket: bucket}
|
||||||
|
}
|
||||||
|
if !IsValidObjectName(object) {
|
||||||
|
return "", ObjectNameInvalid{Bucket: bucket, Object: object}
|
||||||
|
}
|
||||||
|
|
||||||
|
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
|
||||||
|
|
||||||
|
nsMutex.RLock(minioMetaBucket, uploadIDPath)
|
||||||
|
// Just check if the uploadID exists to avoid copy if it doesn't.
|
||||||
|
uploadIDExists := fs.isUploadIDExists(bucket, object, uploadID)
|
||||||
|
nsMutex.RUnlock(minioMetaBucket, uploadIDPath)
|
||||||
|
if !uploadIDExists {
|
||||||
|
return "", InvalidUploadID{UploadID: uploadID}
|
||||||
|
}
|
||||||
|
|
||||||
// Hold write lock on the part so that there is no parallel upload on the part.
|
// Hold write lock on the part so that there is no parallel upload on the part.
|
||||||
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID)))
|
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID)))
|
||||||
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID)))
|
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID)))
|
||||||
@ -304,7 +326,15 @@ func (fs fsObjects) putObjectPart(bucket string, object string, uploadID string,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
|
// Hold write lock as we are updating fs.json
|
||||||
|
nsMutex.Lock(minioMetaBucket, uploadIDPath)
|
||||||
|
defer nsMutex.Unlock(minioMetaBucket, uploadIDPath)
|
||||||
|
|
||||||
|
// Just check if the uploadID exists to avoid copy if it doesn't.
|
||||||
|
if !fs.isUploadIDExists(bucket, object, uploadID) {
|
||||||
|
return "", InvalidUploadID{UploadID: uploadID}
|
||||||
|
}
|
||||||
|
|
||||||
fsMeta, err := fs.readFSMetadata(minioMetaBucket, uploadIDPath)
|
fsMeta, err := fs.readFSMetadata(minioMetaBucket, uploadIDPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
|
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
|
||||||
@ -334,35 +364,6 @@ func (fs fsObjects) putObjectPart(bucket string, object string, uploadID string,
|
|||||||
return newMD5Hex, nil
|
return newMD5Hex, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutObjectPart - reads incoming stream and internally erasure codes
|
|
||||||
// them. This call is similar to single put operation but it is part
|
|
||||||
// of the multipart transcation.
|
|
||||||
//
|
|
||||||
// Implements S3 compatible Upload Part API.
|
|
||||||
func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) {
|
|
||||||
// Verify if bucket is valid.
|
|
||||||
if !IsValidBucketName(bucket) {
|
|
||||||
return "", BucketNameInvalid{Bucket: bucket}
|
|
||||||
}
|
|
||||||
// Verify whether the bucket exists.
|
|
||||||
if !fs.isBucketExist(bucket) {
|
|
||||||
return "", BucketNotFound{Bucket: bucket}
|
|
||||||
}
|
|
||||||
if !IsValidObjectName(object) {
|
|
||||||
return "", ObjectNameInvalid{Bucket: bucket, Object: object}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Hold read lock on the uploadID so that no one aborts it.
|
|
||||||
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
|
|
||||||
defer nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
|
|
||||||
|
|
||||||
if !fs.isUploadIDExists(bucket, object, uploadID) {
|
|
||||||
return "", InvalidUploadID{UploadID: uploadID}
|
|
||||||
}
|
|
||||||
md5Sum, err := fs.putObjectPart(bucket, object, uploadID, partID, size, data, md5Hex)
|
|
||||||
return md5Sum, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// listObjectParts - wrapper scanning through
|
// listObjectParts - wrapper scanning through
|
||||||
// '.minio/multipart/bucket/object/UPLOADID'. Lists all the parts
|
// '.minio/multipart/bucket/object/UPLOADID'. Lists all the parts
|
||||||
// saved inside '.minio/multipart/bucket/object/UPLOADID'.
|
// saved inside '.minio/multipart/bucket/object/UPLOADID'.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user