From e8d14c0d90db28c455101ace1c3d28a92118b4a7 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 10 May 2024 17:31:22 -0700 Subject: [PATCH] verify preconditions during CompleteMultipart (#19713) Bonus: hold the write lock properly to apply optimistic concurrency during NewMultipartUpload() --- cmd/bucket-replication.go | 19 ++++++---- cmd/erasure-multipart.go | 63 ++++++++++++++++++++++---------- cmd/object-multipart-handlers.go | 12 ++++++ 3 files changed, 67 insertions(+), 27 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index eb30a5810..7f04fa66d 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -1542,13 +1542,15 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object } else { _, rinfo.Err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts) } - if rinfo.Err != nil && minio.ToErrorResponse(rinfo.Err).Code != "PreconditionFailed" { - rinfo.ReplicationStatus = replication.Failed - replLogIf(ctx, fmt.Errorf("unable to replicate for object %s/%s(%s) to target %s: %w", - bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), rinfo.Err)) - } - if rinfo.Err != nil && minio.IsNetworkOrHostDown(rinfo.Err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) { - globalBucketTargetSys.markOffline(tgt.EndpointURL()) + if rinfo.Err != nil { + if minio.ToErrorResponse(rinfo.Err).Code != "PreconditionFailed" { + rinfo.ReplicationStatus = replication.Failed + replLogIf(ctx, fmt.Errorf("unable to replicate for object %s/%s(%s) to target %s: %w", + bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), rinfo.Err)) + } + if minio.IsNetworkOrHostDown(rinfo.Err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) { + globalBucketTargetSys.markOffline(tgt.EndpointURL()) + } } } return @@ -1568,7 +1570,7 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob break } if minio.ToErrorResponse(err).Code == "PreconditionFailed" { - return err + return nil } attempts++ time.Sleep(time.Duration(rand.Int63n(int64(time.Second)))) @@ -1643,6 +1645,7 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob UserMetadata: map[string]string{validSSEReplicationHeaders[ReservedMetadataPrefix+"Actual-Object-Size"]: objInfo.UserDefined[ReservedMetadataPrefix+"actual-size"]}, Internal: minio.AdvancedPutOptions{ SourceMTime: objInfo.ModTime, + SourceETag: objInfo.ETag, // always set this to distinguish between `mc mirror` replication and serverside ReplicationRequest: true, }, diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 16183609e..f71804602 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -381,21 +381,24 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec // operation(s) on the object. func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, object string, opts ObjectOptions) (*NewMultipartUploadResult, error) { if opts.CheckPrecondFn != nil { - // Lock the object before reading. - lk := er.NewNSLock(bucket, object) - lkctx, err := lk.GetRLock(ctx, globalOperationTimeout) - if err != nil { - return nil, err + if !opts.NoLock { + ns := er.NewNSLock(bucket, object) + lkctx, err := ns.GetLock(ctx, globalOperationTimeout) + if err != nil { + return nil, err + } + ctx = lkctx.Context() + defer ns.Unlock(lkctx) + opts.NoLock = true } - rctx := lkctx.Context() - obj, err := er.getObjectInfo(rctx, bucket, object, opts) - lk.RUnlock(lkctx) - if err != nil && !isErrVersionNotFound(err) && !isErrObjectNotFound(err) { - return nil, err - } - if opts.CheckPrecondFn(obj) { + + obj, err := er.getObjectInfo(ctx, bucket, object, opts) + if err == nil && opts.CheckPrecondFn(obj) { return nil, PreConditionFailed{} } + if err != nil && !isErrVersionNotFound(err) && !isErrObjectNotFound(err) && !isErrReadQuorum(err) { + return nil, err + } } userDefined := cloneMSS(opts.UserDefined) @@ -1003,6 +1006,27 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str auditObjectErasureSet(ctx, object, &er) } + if opts.CheckPrecondFn != nil { + if !opts.NoLock { + ns := er.NewNSLock(bucket, object) + lkctx, err := ns.GetLock(ctx, globalOperationTimeout) + if err != nil { + return ObjectInfo{}, err + } + ctx = lkctx.Context() + defer ns.Unlock(lkctx) + opts.NoLock = true + } + + obj, err := er.getObjectInfo(ctx, bucket, object, opts) + if err == nil && opts.CheckPrecondFn(obj) { + return ObjectInfo{}, PreConditionFailed{} + } + if err != nil && !isErrVersionNotFound(err) && !isErrObjectNotFound(err) && !isErrReadQuorum(err) { + return ObjectInfo{}, err + } + } + // Hold write locks to verify uploaded parts, also disallows any // parallel PutObjectPart() requests. uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) @@ -1237,14 +1261,15 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str } } - // Hold namespace to complete the transaction - lk := er.NewNSLock(bucket, object) - lkctx, err := lk.GetLock(ctx, globalOperationTimeout) - if err != nil { - return oi, err + if !opts.NoLock { + lk := er.NewNSLock(bucket, object) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { + return ObjectInfo{}, err + } + ctx = lkctx.Context() + defer lk.Unlock(lkctx) } - ctx = lkctx.Context() - defer lk.Unlock(lkctx) if checksumType.IsSet() { checksumType |= hash.ChecksumMultipart | hash.ChecksumIncludesMultipart diff --git a/cmd/object-multipart-handlers.go b/cmd/object-multipart-handlers.go index 073d6206d..c218a8ca4 100644 --- a/cmd/object-multipart-handlers.go +++ b/cmd/object-multipart-handlers.go @@ -997,6 +997,18 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite multipartETag := etag.Multipart(completeETags...) opts.UserDefined["etag"] = multipartETag.String() + if opts.PreserveETag != "" || + r.Header.Get(xhttp.IfMatch) != "" || + r.Header.Get(xhttp.IfNoneMatch) != "" { + opts.CheckPrecondFn = func(oi ObjectInfo) bool { + if _, err := DecryptObjectInfo(&oi, r); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return true + } + return checkPreconditionsPUT(ctx, w, r, oi, opts) + } + } + objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, complMultipartUpload.Parts, opts) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)