diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 9ce885475..e4bf92414 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -946,7 +946,12 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje wg.Add(1) go func(index int, tgt *TargetClient) { defer wg.Done() - rinfos.Targets[index] = replicateObjectToTarget(ctx, ri, objectAPI, tgt) + if ri.OpType == replication.ObjectReplicationType { + // all incoming calls go through optimized path. + rinfos.Targets[index] = ri.replicateObject(ctx, objectAPI, tgt) + } else { + rinfos.Targets[index] = ri.replicateAll(ctx, objectAPI, tgt) + } }(i, tgt) } wg.Wait() @@ -1013,30 +1018,16 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje } } -// replicateObjectToTarget replicates the specified version of the object to destination bucket +// replicateObject replicates object data for specified version of the object to destination bucket // The source object is then updated to reflect the replication status. -func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) { +func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) { startTime := time.Now() objInfo := ri.ObjectInfo.Clone() bucket := objInfo.Bucket object := objInfo.Name - var ( - closeOnDefer bool - gr *GetObjectReader - size int64 - err error - ) sz, _ := objInfo.GetActualSize() - // set defaults for replication action based on operation being performed - actual - // replication action can only be determined after stat on remote. This default is - // needed for updating replication metrics correctly when target is offline. - var rAction replicationAction - switch ri.OpType { - case replication.MetadataReplicationType: - rAction = replicateMetadata - default: - rAction = replicateAll - } + + rAction := replicateAll rinfo = replicatedTargetInfo{ Size: sz, Arn: tgt.ARN, @@ -1051,6 +1042,7 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object rinfo.ReplicationResynced = true return } + if globalBucketTargetSys.isOffline(tgt.EndpointURL()) { logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, tgt.ARN)) sendEvent(eventArgs{ @@ -1065,7 +1057,7 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object) versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object) - gr, err = objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{ + gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{ VersionID: objInfo.VersionID, Versioned: versioned, VersionSuspended: versionSuspended, @@ -1082,15 +1074,159 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object } return } - defer func() { - if closeOnDefer { - gr.Close() - } - }() - closeOnDefer = true + defer gr.Close() objInfo = gr.ObjInfo - size, err = objInfo.GetActualSize() + + size, err := objInfo.GetActualSize() + if err != nil { + logger.LogIf(ctx, err) + sendEvent(eventArgs{ + EventName: event.ObjectReplicationNotTracked, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [Replication]", + }) + return + } + + if tgt.Bucket == "" { + logger.LogIf(ctx, fmt.Errorf("Unable to replicate object %s(%s), bucket is empty", objInfo.Name, objInfo.VersionID)) + sendEvent(eventArgs{ + EventName: event.ObjectReplicationNotTracked, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [Replication]", + }) + return rinfo + } + defer func() { + if rinfo.ReplicationStatus == replication.Completed && ri.OpType == replication.ExistingObjectReplicationType && tgt.ResetID != "" { + rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID) + rinfo.ReplicationResynced = true + } + rinfo.Duration = time.Since(startTime) + }() + + rinfo.ReplicationStatus = replication.Completed + rinfo.Size = size + rinfo.ReplicationAction = rAction + // use core client to avoid doing multipart on PUT + c := &miniogo.Core{Client: tgt.Client} + + putOpts, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s err:%w", bucket, err)) + sendEvent(eventArgs{ + EventName: event.ObjectReplicationNotTracked, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [Replication]", + }) + return + } + + var headerSize int + for k, v := range putOpts.Header() { + headerSize += len(k) + len(v) + } + + opts := &bandwidth.MonitorReaderOptions{ + Bucket: objInfo.Bucket, + HeaderSize: headerSize, + } + newCtx := ctx + if globalBucketMonitor.IsThrottled(bucket) { + var cancel context.CancelFunc + newCtx, cancel = context.WithTimeout(ctx, throttleDeadline) + defer cancel() + } + r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts) + if objInfo.isMultipart() { + if err := replicateObjectWithMultipart(ctx, c, tgt.Bucket, object, + r, objInfo, putOpts); err != nil { + if minio.ToErrorResponse(err).Code != "PreConditionFailed" { + rinfo.ReplicationStatus = replication.Failed + logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err)) + } + } + } else { + if _, err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts); err != nil { + if minio.ToErrorResponse(err).Code != "PreConditionFailed" { + rinfo.ReplicationStatus = replication.Failed + logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err)) + } + } + } + return +} + +// replicateAll replicates metadata for specified version of the object to destination bucket +// if the destination version is missing it automatically does fully copy as well. +// The source object is then updated to reflect the replication status. +func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) { + startTime := time.Now() + objInfo := ri.ObjectInfo.Clone() + bucket := objInfo.Bucket + object := objInfo.Name + sz, _ := objInfo.GetActualSize() + + // set defaults for replication action based on operation being performed - actual + // replication action can only be determined after stat on remote. This default is + // needed for updating replication metrics correctly when target is offline. + rAction := replicateMetadata + + rinfo = replicatedTargetInfo{ + Size: sz, + Arn: tgt.ARN, + PrevReplicationStatus: objInfo.TargetReplicationStatus(tgt.ARN), + ReplicationStatus: replication.Failed, + OpType: ri.OpType, + ReplicationAction: rAction, + } + + if ri.ObjectInfo.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) { + rinfo.ReplicationStatus = replication.Completed + rinfo.ReplicationResynced = true + return + } + + if globalBucketTargetSys.isOffline(tgt.EndpointURL()) { + logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, tgt.ARN)) + sendEvent(eventArgs{ + EventName: event.ObjectReplicationNotTracked, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [Replication]", + }) + return + } + + versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object) + versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object) + + gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{ + VersionID: objInfo.VersionID, + Versioned: versioned, + VersionSuspended: versionSuspended, + }) + if err != nil { + if !isErrObjectNotFound(err) { + sendEvent(eventArgs{ + EventName: event.ObjectReplicationNotTracked, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [Replication]", + }) + logger.LogIf(ctx, fmt.Errorf("Unable to update replicate metadata for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err)) + } + return + } + defer gr.Close() + + objInfo = gr.ObjInfo + + size, err := objInfo.GetActualSize() if err != nil { logger.LogIf(ctx, err) sendEvent(eventArgs{ @@ -1149,8 +1285,6 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object // as Completed. // // Note: Replication Stats would have been updated despite metadata update failure. - gr.Close() - closeOnDefer = false rinfo.ReplicationAction = rAction rinfo.ReplicationStatus = replication.Completed } @@ -1221,8 +1355,6 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object } } } - gr.Close() - closeOnDefer = false return } diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 632248311..d146f268b 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -327,6 +327,24 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec // disks. `uploads.json` carries metadata regarding on-going multipart // operation(s) on the object. func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, object string, opts ObjectOptions) (*NewMultipartUploadResult, error) { + if opts.CheckPrecondFn != nil { + // Lock the object before reading. + lk := er.NewNSLock(bucket, object) + lkctx, err := lk.GetRLock(ctx, globalOperationTimeout) + if err != nil { + return nil, err + } + rctx := lkctx.Context() + obj, err := er.getObjectInfo(rctx, bucket, object, opts) + lk.RUnlock(lkctx.Cancel) + if err != nil { + return nil, err + } + if opts.CheckPrecondFn(obj) { + return nil, PreConditionFailed{} + } + } + userDefined := cloneMSS(opts.UserDefined) onlineDisks := er.getDisks() diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index a704e25b9..07b2b3993 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -915,6 +915,16 @@ func (er erasureObjects) PutObject(ctx context.Context, bucket string, object st func (er erasureObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { auditObjectErasureSet(ctx, object, &er) + if opts.CheckPrecondFn != nil { + obj, err := er.getObjectInfo(ctx, bucket, object, opts) + if err != nil { + return objInfo, err + } + if opts.CheckPrecondFn(obj) { + return objInfo, PreConditionFailed{} + } + } + data := r.Reader userDefined := cloneMSS(opts.UserDefined) @@ -1146,11 +1156,9 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st }) } - if userDefined["etag"] == "" { - userDefined["etag"] = r.MD5CurrentHexString() - if opts.PreserveETag != "" { - userDefined["etag"] = opts.PreserveETag - } + userDefined["etag"] = r.MD5CurrentHexString() + if opts.PreserveETag != "" { + userDefined["etag"] = opts.PreserveETag } // Guess content-type from the extension if possible. diff --git a/cmd/object-api-options.go b/cmd/object-api-options.go index 7027e6d5e..98229902b 100644 --- a/cmd/object-api-options.go +++ b/cmd/object-api-options.go @@ -283,14 +283,12 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada } } - etag := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceETag)) - if etag != "" { - if metadata == nil { - metadata = make(map[string]string, 1) - } - metadata["etag"] = etag + if metadata == nil { + metadata = make(map[string]string) } + etag := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceETag)) + wantCRC, err := hash.GetContentChecksum(r) if err != nil { return opts, InvalidArgument{ @@ -310,6 +308,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada Versioned: versioned, VersionSuspended: versionSuspended, MTime: mtime, + PreserveETag: etag, WantChecksum: wantCRC, }, nil } diff --git a/cmd/object-handlers-common.go b/cmd/object-handlers-common.go index ff7c6072b..4ba960b3c 100644 --- a/cmd/object-handlers-common.go +++ b/cmd/object-handlers-common.go @@ -130,6 +130,54 @@ func checkCopyObjectPreconditions(ctx context.Context, w http.ResponseWriter, r return false } +// Validates the preconditions. Returns true if PUT operation should not proceed. +// Preconditions supported are: +// +// x-minio-source-mtime +// x-minio-source-etag +func checkPreconditionsPUT(ctx context.Context, w http.ResponseWriter, r *http.Request, objInfo ObjectInfo, opts ObjectOptions) bool { + // Return false for methods other than PUT. + if r.Method != http.MethodPut { + return false + } + // If the object doesn't have a modtime (IsZero), or the modtime + // is obviously garbage (Unix time == 0), then ignore modtimes + // and don't process the If-Modified-Since header. + if objInfo.ModTime.IsZero() || objInfo.ModTime.Equal(time.Unix(0, 0)) { + return false + } + + // If top level is a delete marker proceed to upload. + if objInfo.DeleteMarker { + return false + } + + // Headers to be set of object content is not going to be written to the client. + writeHeaders := func() { + // set common headers + setCommonHeaders(w) + + // set object-related metadata headers + w.Header().Set(xhttp.LastModified, objInfo.ModTime.UTC().Format(http.TimeFormat)) + + if objInfo.ETag != "" { + w.Header()[xhttp.ETag] = []string{"\"" + objInfo.ETag + "\""} + } + } + + etagMatch := opts.PreserveETag != "" && isETagEqual(objInfo.ETag, opts.PreserveETag) + vidMatch := opts.VersionID != "" && opts.VersionID == objInfo.VersionID + mtimeMatch := !opts.MTime.IsZero() && objInfo.ModTime.Unix() >= opts.MTime.Unix() + if etagMatch && vidMatch && mtimeMatch { + writeHeaders() + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL) + return true + } + + // Object content should be persisted. + return false +} + // Validates the preconditions. Returns true if GET/HEAD operation should not proceed. // Preconditions supported are: // diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index da9d1ea3d..2dfb7b9f3 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -1786,6 +1786,18 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } opts.IndexCB = idxCb + if !opts.MTime.IsZero() && opts.PreserveETag != "" { + opts.CheckPrecondFn = func(oi ObjectInfo) bool { + if objectAPI.IsEncryptionSupported() { + if _, err := DecryptObjectInfo(&oi, r); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return true + } + } + return checkPreconditionsPUT(ctx, w, r, oi, opts) + } + } + if api.CacheAPI() != nil { putObject = api.CacheAPI().PutObject } diff --git a/cmd/object-multipart-handlers.go b/cmd/object-multipart-handlers.go index 207e2c8ef..86b6c213e 100644 --- a/cmd/object-multipart-handlers.go +++ b/cmd/object-multipart-handlers.go @@ -176,6 +176,18 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r return } + if !opts.MTime.IsZero() && opts.PreserveETag != "" { + opts.CheckPrecondFn = func(oi ObjectInfo) bool { + if objectAPI.IsEncryptionSupported() { + if _, err := DecryptObjectInfo(&oi, r); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return true + } + } + return checkPreconditionsPUT(ctx, w, r, oi, opts) + } + } + checksumType := hash.NewChecksumType(r.Header.Get(xhttp.AmzChecksumAlgo)) if checksumType.Is(hash.ChecksumInvalid) { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequestParameter), r.URL)