verify preconditions during CompleteMultipart (#19713)

Bonus: hold the write lock properly to apply
optimistic concurrency during NewMultipartUpload()
This commit is contained in:
Harshavardhana 2024-05-10 17:31:22 -07:00 committed by GitHub
parent 60d7e8143a
commit e8d14c0d90
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 67 additions and 27 deletions

View File

@ -1542,13 +1542,15 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
} else {
_, rinfo.Err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts)
}
if rinfo.Err != nil && minio.ToErrorResponse(rinfo.Err).Code != "PreconditionFailed" {
rinfo.ReplicationStatus = replication.Failed
replLogIf(ctx, fmt.Errorf("unable to replicate for object %s/%s(%s) to target %s: %w",
bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), rinfo.Err))
}
if rinfo.Err != nil && minio.IsNetworkOrHostDown(rinfo.Err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
globalBucketTargetSys.markOffline(tgt.EndpointURL())
if rinfo.Err != nil {
if minio.ToErrorResponse(rinfo.Err).Code != "PreconditionFailed" {
rinfo.ReplicationStatus = replication.Failed
replLogIf(ctx, fmt.Errorf("unable to replicate for object %s/%s(%s) to target %s: %w",
bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), rinfo.Err))
}
if minio.IsNetworkOrHostDown(rinfo.Err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
globalBucketTargetSys.markOffline(tgt.EndpointURL())
}
}
}
return
@ -1568,7 +1570,7 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob
break
}
if minio.ToErrorResponse(err).Code == "PreconditionFailed" {
return err
return nil
}
attempts++
time.Sleep(time.Duration(rand.Int63n(int64(time.Second))))
@ -1643,6 +1645,7 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob
UserMetadata: map[string]string{validSSEReplicationHeaders[ReservedMetadataPrefix+"Actual-Object-Size"]: objInfo.UserDefined[ReservedMetadataPrefix+"actual-size"]},
Internal: minio.AdvancedPutOptions{
SourceMTime: objInfo.ModTime,
SourceETag: objInfo.ETag,
// always set this to distinguish between `mc mirror` replication and serverside
ReplicationRequest: true,
},

View File

@ -381,21 +381,24 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
// operation(s) on the object.
func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, object string, opts ObjectOptions) (*NewMultipartUploadResult, error) {
if opts.CheckPrecondFn != nil {
// Lock the object before reading.
lk := er.NewNSLock(bucket, object)
lkctx, err := lk.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return nil, err
if !opts.NoLock {
ns := er.NewNSLock(bucket, object)
lkctx, err := ns.GetLock(ctx, globalOperationTimeout)
if err != nil {
return nil, err
}
ctx = lkctx.Context()
defer ns.Unlock(lkctx)
opts.NoLock = true
}
rctx := lkctx.Context()
obj, err := er.getObjectInfo(rctx, bucket, object, opts)
lk.RUnlock(lkctx)
if err != nil && !isErrVersionNotFound(err) && !isErrObjectNotFound(err) {
return nil, err
}
if opts.CheckPrecondFn(obj) {
obj, err := er.getObjectInfo(ctx, bucket, object, opts)
if err == nil && opts.CheckPrecondFn(obj) {
return nil, PreConditionFailed{}
}
if err != nil && !isErrVersionNotFound(err) && !isErrObjectNotFound(err) && !isErrReadQuorum(err) {
return nil, err
}
}
userDefined := cloneMSS(opts.UserDefined)
@ -1003,6 +1006,27 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
auditObjectErasureSet(ctx, object, &er)
}
if opts.CheckPrecondFn != nil {
if !opts.NoLock {
ns := er.NewNSLock(bucket, object)
lkctx, err := ns.GetLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer ns.Unlock(lkctx)
opts.NoLock = true
}
obj, err := er.getObjectInfo(ctx, bucket, object, opts)
if err == nil && opts.CheckPrecondFn(obj) {
return ObjectInfo{}, PreConditionFailed{}
}
if err != nil && !isErrVersionNotFound(err) && !isErrObjectNotFound(err) && !isErrReadQuorum(err) {
return ObjectInfo{}, err
}
}
// Hold write locks to verify uploaded parts, also disallows any
// parallel PutObjectPart() requests.
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
@ -1237,14 +1261,15 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
}
}
// Hold namespace to complete the transaction
lk := er.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return oi, err
if !opts.NoLock {
lk := er.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx)
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx)
if checksumType.IsSet() {
checksumType |= hash.ChecksumMultipart | hash.ChecksumIncludesMultipart

View File

@ -997,6 +997,18 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
multipartETag := etag.Multipart(completeETags...)
opts.UserDefined["etag"] = multipartETag.String()
if opts.PreserveETag != "" ||
r.Header.Get(xhttp.IfMatch) != "" ||
r.Header.Get(xhttp.IfNoneMatch) != "" {
opts.CheckPrecondFn = func(oi ObjectInfo) bool {
if _, err := DecryptObjectInfo(&oi, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return true
}
return checkPreconditionsPUT(ctx, w, r, oi, opts)
}
}
objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, complMultipartUpload.Parts, opts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)