move readLock on uploadId at EOF in PutObjectPart (#258)

we do not need to hold the read locks at the higher
layer instead before reading the body, instead hold
the read locks properly at the time of renamePart()
for protection from racy part overwrites to compete
with concurrent completeMultipart().
This commit is contained in:
Harshavardhana 2024-12-14 03:06:44 +05:30
parent 2b008c598b
commit 54ecce66f0
2 changed files with 9 additions and 10 deletions

View File

@ -744,6 +744,15 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
ctx = plkctx.Context()
defer partIDLock.Unlock(plkctx)
// Read lock for upload id, only held while reading the upload metadata.
uploadIDRLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
rlkctx, err := uploadIDRLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return PartInfo{}, err
}
ctx = rlkctx.Context()
defer uploadIDRLock.RUnlock(rlkctx)
onlineDisks, err = er.renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, partFI, writeQuorum)
if err != nil {
if errors.Is(err, errFileNotFound) {

View File

@ -1858,16 +1858,6 @@ func (z *erasureServerPools) PutObjectPart(ctx context.Context, bucket, object,
return PartInfo{}, err
}
// Read lock for upload id.
// Only held while reading the upload metadata.
uploadIDRLock := z.NewNSLock(bucket, pathJoin(object, uploadID))
rlkctx, err := uploadIDRLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return PartInfo{}, err
}
ctx = rlkctx.Context()
defer uploadIDRLock.RUnlock(rlkctx)
if z.SinglePool() {
return z.serverPools[0].PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
}