mirror of
https://github.com/minio/minio.git
synced 2025-07-15 03:41:53 -04:00
optimize multipart upload
cherry-pick 33cee9f38a3e662ad68973ab48f595f0435d423f from master branch for improving multipart upload and lock handling
This commit is contained in:
parent
5258a68b45
commit
38709c84b7
@ -358,14 +358,19 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec
|
|||||||
//
|
//
|
||||||
// Implements S3 compatible Upload Part API.
|
// Implements S3 compatible Upload Part API.
|
||||||
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
|
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
|
||||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID)))
|
||||||
if err = uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
if err = partIDLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||||
|
return PartInfo{}, err
|
||||||
|
}
|
||||||
|
defer partIDLock.Unlock()
|
||||||
|
|
||||||
|
uploadIDRLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||||
|
if err = uploadIDRLock.GetRLock(ctx, globalOperationTimeout); err != nil {
|
||||||
return PartInfo{}, err
|
return PartInfo{}, err
|
||||||
}
|
}
|
||||||
readLocked := true
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if readLocked {
|
if uploadIDRLock != nil {
|
||||||
uploadIDLock.RUnlock()
|
uploadIDRLock.RUnlock()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -389,6 +394,10 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
partsMetadata, errs = readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket,
|
partsMetadata, errs = readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket,
|
||||||
uploadIDPath, "")
|
uploadIDPath, "")
|
||||||
|
|
||||||
|
// Unlock upload id locks before, so others can get it.
|
||||||
|
uploadIDRLock.RUnlock()
|
||||||
|
uploadIDRLock = nil
|
||||||
|
|
||||||
// get Quorum for this object
|
// get Quorum for this object
|
||||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -468,14 +477,12 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unlock here before acquiring write locks all concurrent
|
// Acquire write lock to update metadata.
|
||||||
// PutObjectParts would serialize here updating `xl.meta`
|
uploadIDWLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||||
uploadIDLock.RUnlock()
|
if err = uploadIDWLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||||
readLocked = false
|
|
||||||
if err = uploadIDLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
|
||||||
return PartInfo{}, err
|
return PartInfo{}, err
|
||||||
}
|
}
|
||||||
defer uploadIDLock.Unlock()
|
defer uploadIDWLock.Unlock()
|
||||||
|
|
||||||
// Validates if upload ID exists.
|
// Validates if upload ID exists.
|
||||||
if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user