mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
Improve multipart upload (#12514)
Each multipart upload is holding a read lock for the entire upload duration of each part. This makes it impossible for other parts to complete until all currently uploading parts have released their locks. It will also make it impossible for new parts to start as long as the write lock is still being requested, essentially deadlocking uploads until all that may have been granted a read lock has been completed. Refactor to only hold the upload id lock while reading and writing the metadata, but hold a part id lock while the part is being uploaded.
This commit is contained in:
parent
951877f576
commit
33cee9f38a
@ -397,16 +397,27 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec
|
|||||||
//
|
//
|
||||||
// Implements S3 compatible Upload Part API.
|
// Implements S3 compatible Upload Part API.
|
||||||
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
|
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
|
||||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
// Write lock for this part ID.
|
||||||
rlkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
// Held throughout the operation.
|
||||||
|
partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID)))
|
||||||
|
plkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout)
|
||||||
|
if err != nil {
|
||||||
|
return PartInfo{}, err
|
||||||
|
}
|
||||||
|
pctx := plkctx.Context()
|
||||||
|
defer partIDLock.Unlock(plkctx.Cancel)
|
||||||
|
|
||||||
|
// Read lock for upload id.
|
||||||
|
// Only held while reading the upload metadata.
|
||||||
|
uploadIDRLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||||
|
rlkctx, err := uploadIDRLock.GetRLock(ctx, globalOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return PartInfo{}, err
|
return PartInfo{}, err
|
||||||
}
|
}
|
||||||
rctx := rlkctx.Context()
|
rctx := rlkctx.Context()
|
||||||
readLocked := true
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if readLocked {
|
if uploadIDRLock != nil {
|
||||||
uploadIDLock.RUnlock(rlkctx.Cancel)
|
uploadIDRLock.RUnlock(rlkctx.Cancel)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -432,13 +443,17 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
partsMetadata, errs = readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket,
|
partsMetadata, errs = readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket,
|
||||||
uploadIDPath, "", false)
|
uploadIDPath, "", false)
|
||||||
|
|
||||||
|
// Unlock upload id locks before, so others can get it.
|
||||||
|
uploadIDRLock.RUnlock(rlkctx.Cancel)
|
||||||
|
uploadIDRLock = nil
|
||||||
|
|
||||||
// get Quorum for this object
|
// get Quorum for this object
|
||||||
_, writeQuorum, err := objectQuorumFromMeta(rctx, partsMetadata, errs, er.defaultParityCount)
|
_, writeQuorum, err := objectQuorumFromMeta(pctx, partsMetadata, errs, er.defaultParityCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return pi, toObjectErr(err, bucket, object)
|
return pi, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
reducedErr := reduceWriteQuorumErrs(rctx, errs, objectOpIgnoredErrs, writeQuorum)
|
reducedErr := reduceWriteQuorumErrs(pctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||||
if reducedErr == errErasureWriteQuorum {
|
if reducedErr == errErasureWriteQuorum {
|
||||||
return pi, toObjectErr(reducedErr, bucket, object)
|
return pi, toObjectErr(reducedErr, bucket, object)
|
||||||
}
|
}
|
||||||
@ -447,7 +462,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
|
onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||||
|
|
||||||
// Pick one from the first valid metadata.
|
// Pick one from the first valid metadata.
|
||||||
fi, err := pickValidFileInfo(rctx, partsMetadata, modTime, dataDir, writeQuorum)
|
fi, err := pickValidFileInfo(pctx, partsMetadata, modTime, dataDir, writeQuorum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return pi, err
|
return pi, err
|
||||||
}
|
}
|
||||||
@ -469,7 +484,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
erasure, err := NewErasure(rctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
|
erasure, err := NewErasure(pctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return pi, toObjectErr(err, bucket, object)
|
return pi, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
@ -505,7 +520,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize())
|
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := erasure.Encode(rctx, data, writers, buffer, writeQuorum)
|
n, err := erasure.Encode(pctx, data, writers, buffer, writeQuorum)
|
||||||
closeBitrotWriters(writers)
|
closeBitrotWriters(writers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return pi, toObjectErr(err, bucket, object)
|
return pi, toObjectErr(err, bucket, object)
|
||||||
@ -523,16 +538,14 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unlock here before acquiring write locks all concurrent
|
// Acquire write lock to update metadata.
|
||||||
// PutObjectParts would serialize here updating `xl.meta`
|
uploadIDWLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||||
uploadIDLock.RUnlock(rlkctx.Cancel)
|
wlkctx, err := uploadIDWLock.GetLock(pctx, globalOperationTimeout)
|
||||||
readLocked = false
|
|
||||||
wlkctx, err := uploadIDLock.GetLock(ctx, globalOperationTimeout)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return PartInfo{}, err
|
return PartInfo{}, err
|
||||||
}
|
}
|
||||||
wctx := wlkctx.Context()
|
wctx := wlkctx.Context()
|
||||||
defer uploadIDLock.Unlock(wlkctx.Cancel)
|
defer uploadIDWLock.Unlock(wlkctx.Cancel)
|
||||||
|
|
||||||
// Validates if upload ID exists.
|
// Validates if upload ID exists.
|
||||||
if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil {
|
if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user