mirror of
https://github.com/minio/minio.git
synced 2025-11-07 21:02:58 -05:00
fix: cleanup locking, cancel context upon lock timeout (#12183)
upon errors to acquire lock context would still leak, since the cancel would never be called. since the lock is never acquired - proactively clear it before returning.
This commit is contained in:
@@ -378,14 +378,15 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec
|
||||
// Implements S3 compatible Upload Part API.
|
||||
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
|
||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
rctx, rcancel, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
||||
rlkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return PartInfo{}, err
|
||||
}
|
||||
rctx := rlkctx.Context()
|
||||
readLocked := true
|
||||
defer func() {
|
||||
if readLocked {
|
||||
uploadIDLock.RUnlock(rcancel)
|
||||
uploadIDLock.RUnlock(rlkctx.Cancel)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -505,13 +506,14 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||
|
||||
// Unlock here before acquiring write locks all concurrent
|
||||
// PutObjectParts would serialize here updating `xl.meta`
|
||||
uploadIDLock.RUnlock(rcancel)
|
||||
uploadIDLock.RUnlock(rlkctx.Cancel)
|
||||
readLocked = false
|
||||
wctx, wcancel, err := uploadIDLock.GetLock(ctx, globalOperationTimeout)
|
||||
wlkctx, err := uploadIDLock.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return PartInfo{}, err
|
||||
}
|
||||
defer uploadIDLock.Unlock(wcancel)
|
||||
wctx := wlkctx.Context()
|
||||
defer uploadIDLock.Unlock(wlkctx.Cancel)
|
||||
|
||||
// Validates if upload ID exists.
|
||||
if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil {
|
||||
@@ -592,11 +594,12 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
|
||||
}
|
||||
|
||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
ctx, cancel, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
||||
lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return MultipartInfo{}, err
|
||||
}
|
||||
defer uploadIDLock.RUnlock(cancel)
|
||||
ctx = lkctx.Context()
|
||||
defer uploadIDLock.RUnlock(lkctx.Cancel)
|
||||
|
||||
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||
return result, toObjectErr(err, bucket, object, uploadID)
|
||||
@@ -641,11 +644,12 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
|
||||
// replied back to the client.
|
||||
func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
|
||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
ctx, cancel, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
||||
lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return ListPartsInfo{}, err
|
||||
}
|
||||
defer uploadIDLock.RUnlock(cancel)
|
||||
ctx = lkctx.Context()
|
||||
defer uploadIDLock.RUnlock(lkctx.Cancel)
|
||||
|
||||
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||
return result, toObjectErr(err, bucket, object, uploadID)
|
||||
@@ -735,19 +739,20 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
// Hold read-locks to verify uploaded parts, also disallows
|
||||
// parallel part uploads as well.
|
||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
ctx, cancel, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
||||
rlkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer uploadIDLock.RUnlock(cancel)
|
||||
rctx := rlkctx.Context()
|
||||
defer uploadIDLock.RUnlock(rlkctx.Cancel)
|
||||
|
||||
if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||
if err = er.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil {
|
||||
return oi, toObjectErr(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
// Check if an object is present as one of the parent dir.
|
||||
// -- FIXME. (needs a new kind of lock).
|
||||
if opts.ParentIsObject != nil && opts.ParentIsObject(ctx, bucket, path.Dir(object)) {
|
||||
if opts.ParentIsObject != nil && opts.ParentIsObject(rctx, bucket, path.Dir(object)) {
|
||||
return oi, toObjectErr(errFileParentIsFile, bucket, object)
|
||||
}
|
||||
|
||||
@@ -761,15 +766,15 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
storageDisks := er.getDisks()
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
||||
partsMetadata, errs := readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
|
||||
_, writeQuorum, err := objectQuorumFromMeta(rctx, partsMetadata, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
return oi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
reducedErr := reduceWriteQuorumErrs(rctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
if reducedErr == errErasureWriteQuorum {
|
||||
return oi, toObjectErr(reducedErr, bucket, object)
|
||||
}
|
||||
@@ -777,7 +782,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||
|
||||
// Pick one from the first valid metadata.
|
||||
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum)
|
||||
fi, err := pickValidFileInfo(rctx, partsMetadata, modTime, dataDir, writeQuorum)
|
||||
if err != nil {
|
||||
return oi, err
|
||||
}
|
||||
@@ -871,8 +876,18 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
}
|
||||
}
|
||||
|
||||
// Hold namespace to complete the transaction
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return oi, err
|
||||
}
|
||||
ctx = lkctx.Context()
|
||||
defer lk.Unlock(lkctx.Cancel)
|
||||
|
||||
// Write final `xl.meta` at uploadID location
|
||||
if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
|
||||
onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum)
|
||||
if err != nil {
|
||||
return oi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
@@ -889,14 +904,6 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
}
|
||||
}
|
||||
|
||||
// Hold namespace to complete the transaction
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
ctx, cancel, err = lk.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer lk.Unlock(cancel)
|
||||
|
||||
// Rename the multipart object to final location.
|
||||
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath,
|
||||
partsMetadata, bucket, object, writeQuorum); err != nil {
|
||||
@@ -934,11 +941,12 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
// operation.
|
||||
func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) {
|
||||
lk := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
ctx, cancel, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer lk.Unlock(cancel)
|
||||
ctx = lkctx.Context()
|
||||
defer lk.Unlock(lkctx.Cancel)
|
||||
|
||||
// Validates if upload ID exists.
|
||||
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user