From 6aea950d741e7f9237d1997708b7796b6b5f826d Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 18 Nov 2022 03:09:35 -0800 Subject: [PATCH] avoid partID lock validating uploadID exists prematurely (#16086) --- cmd/erasure-multipart.go | 31 ++++++++++++++++--------------- cmd/erasure-server-pool.go | 27 ++++++++++++++++++--------- 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index b0a7c399a..b7561e908 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -567,16 +567,6 @@ func writeAllDisks(ctx context.Context, disks []StorageAPI, dstBucket, dstEntry func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) { auditObjectErasureSet(ctx, object, &er) - // Write lock for this part ID. - // Held throughout the operation. - partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID))) - plkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout) - if err != nil { - return PartInfo{}, err - } - pctx := plkctx.Context() - defer partIDLock.Unlock(plkctx.Cancel) - // Read lock for upload id. // Only held while reading the upload metadata. uploadIDRLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) @@ -587,6 +577,13 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo rctx := rlkctx.Context() defer uploadIDRLock.RUnlock(rlkctx.Cancel) + uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) + // Validates if upload ID exists. + fi, _, err := er.checkUploadIDExists(rctx, bucket, object, uploadID, true) + if err != nil { + return pi, toObjectErr(err, bucket, object, uploadID) + } + data := r.Reader // Validate input data size and it can never be less than zero. if data.Size() < -1 { @@ -594,13 +591,17 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo return pi, toObjectErr(errInvalidArgument) } - uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) - - // Validates if upload ID exists. - fi, _, err := er.checkUploadIDExists(rctx, bucket, object, uploadID, true) + // Write lock for this part ID, only hold it if we are planning to read from the + // streamto avoid any concurrent updates. + // + // Must be held throughout this call. + partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID))) + plkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout) if err != nil { - return pi, toObjectErr(err, bucket, object, uploadID) + return PartInfo{}, err } + pctx := plkctx.Context() + defer partIDLock.Unlock(plkctx.Cancel) onlineDisks := er.getDisks() writeQuorum := fi.WriteQuorum(er.defaultWQuorum()) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 7b785480b..7b2ac71f8 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1076,9 +1076,12 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o NoLock: true, }) if err != nil { - derrs[j] = err + if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { + derrs[j] = err + } dobjects[j] = DeletedObject{ ObjectName: obj.ObjectName, + VersionID: obj.VersionID, } return nil } @@ -1435,9 +1438,9 @@ func (z *erasureServerPools) PutObjectPart(ctx context.Context, bucket, object, if z.IsSuspended(idx) { continue } - _, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts) + pi, err := pool.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts) if err == nil { - return pool.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts) + return pi, nil } switch err.(type) { case InvalidUploadID: @@ -1499,9 +1502,9 @@ func (z *erasureServerPools) ListObjectParts(ctx context.Context, bucket, object if z.IsSuspended(idx) { continue } - _, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts) + result, err := pool.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) if err == nil { - return pool.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) + return result, nil } switch err.(type) { case InvalidUploadID: @@ -1530,9 +1533,9 @@ func (z *erasureServerPools) AbortMultipartUpload(ctx context.Context, bucket, o if z.IsSuspended(idx) { continue } - _, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts) + err := pool.AbortMultipartUpload(ctx, bucket, object, uploadID, opts) if err == nil { - return pool.AbortMultipartUpload(ctx, bucket, object, uploadID, opts) + return nil } switch err.(type) { case InvalidUploadID: @@ -1562,10 +1565,16 @@ func (z *erasureServerPools) CompleteMultipartUpload(ctx context.Context, bucket if z.IsSuspended(idx) { continue } - _, err := pool.GetMultipartInfo(ctx, bucket, object, uploadID, opts) + objInfo, err = pool.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts) if err == nil { - return pool.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts) + return objInfo, nil } + switch err.(type) { + case InvalidUploadID: + // upload id not found move to next pool + continue + } + return objInfo, err } return objInfo, InvalidUploadID{