add pre-conditions support for PUT calls during replication (#15674)

PUT shall only proceed if pre-conditions are met, the new
code uses

- x-minio-source-mtime
- x-minio-source-etag

to verify if the object indeed needs to be replicated
or not, allowing us to avoid StatObject() call.
This commit is contained in:
Harshavardhana 2022-09-14 18:44:04 -07:00 committed by GitHub
parent b910904fa6
commit 124544d834
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 271 additions and 42 deletions

View File

@ -946,7 +946,12 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
wg.Add(1)
go func(index int, tgt *TargetClient) {
defer wg.Done()
rinfos.Targets[index] = replicateObjectToTarget(ctx, ri, objectAPI, tgt)
if ri.OpType == replication.ObjectReplicationType {
// all incoming calls go through optimized path.
rinfos.Targets[index] = ri.replicateObject(ctx, objectAPI, tgt)
} else {
rinfos.Targets[index] = ri.replicateAll(ctx, objectAPI, tgt)
}
}(i, tgt)
}
wg.Wait()
@ -1013,30 +1018,16 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
}
}
// replicateObjectToTarget replicates the specified version of the object to destination bucket
// replicateObject replicates object data for specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) {
func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) {
startTime := time.Now()
objInfo := ri.ObjectInfo.Clone()
bucket := objInfo.Bucket
object := objInfo.Name
var (
closeOnDefer bool
gr *GetObjectReader
size int64
err error
)
sz, _ := objInfo.GetActualSize()
// set defaults for replication action based on operation being performed - actual
// replication action can only be determined after stat on remote. This default is
// needed for updating replication metrics correctly when target is offline.
var rAction replicationAction
switch ri.OpType {
case replication.MetadataReplicationType:
rAction = replicateMetadata
default:
rAction = replicateAll
}
rAction := replicateAll
rinfo = replicatedTargetInfo{
Size: sz,
Arn: tgt.ARN,
@ -1051,6 +1042,7 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object
rinfo.ReplicationResynced = true
return
}
if globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, tgt.ARN))
sendEvent(eventArgs{
@ -1065,7 +1057,7 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object
versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object)
versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object)
gr, err = objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
VersionID: objInfo.VersionID,
Versioned: versioned,
VersionSuspended: versionSuspended,
@ -1082,15 +1074,159 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object
}
return
}
defer func() {
if closeOnDefer {
gr.Close()
}
}()
closeOnDefer = true
defer gr.Close()
objInfo = gr.ObjInfo
size, err = objInfo.GetActualSize()
size, err := objInfo.GetActualSize()
if err != nil {
logger.LogIf(ctx, err)
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
return
}
if tgt.Bucket == "" {
logger.LogIf(ctx, fmt.Errorf("Unable to replicate object %s(%s), bucket is empty", objInfo.Name, objInfo.VersionID))
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
return rinfo
}
defer func() {
if rinfo.ReplicationStatus == replication.Completed && ri.OpType == replication.ExistingObjectReplicationType && tgt.ResetID != "" {
rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID)
rinfo.ReplicationResynced = true
}
rinfo.Duration = time.Since(startTime)
}()
rinfo.ReplicationStatus = replication.Completed
rinfo.Size = size
rinfo.ReplicationAction = rAction
// use core client to avoid doing multipart on PUT
c := &miniogo.Core{Client: tgt.Client}
putOpts, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s err:%w", bucket, err))
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
return
}
var headerSize int
for k, v := range putOpts.Header() {
headerSize += len(k) + len(v)
}
opts := &bandwidth.MonitorReaderOptions{
Bucket: objInfo.Bucket,
HeaderSize: headerSize,
}
newCtx := ctx
if globalBucketMonitor.IsThrottled(bucket) {
var cancel context.CancelFunc
newCtx, cancel = context.WithTimeout(ctx, throttleDeadline)
defer cancel()
}
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
if objInfo.isMultipart() {
if err := replicateObjectWithMultipart(ctx, c, tgt.Bucket, object,
r, objInfo, putOpts); err != nil {
if minio.ToErrorResponse(err).Code != "PreConditionFailed" {
rinfo.ReplicationStatus = replication.Failed
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
}
}
} else {
if _, err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts); err != nil {
if minio.ToErrorResponse(err).Code != "PreConditionFailed" {
rinfo.ReplicationStatus = replication.Failed
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
}
}
}
return
}
// replicateAll replicates metadata for specified version of the object to destination bucket
// if the destination version is missing it automatically does fully copy as well.
// The source object is then updated to reflect the replication status.
func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) {
startTime := time.Now()
objInfo := ri.ObjectInfo.Clone()
bucket := objInfo.Bucket
object := objInfo.Name
sz, _ := objInfo.GetActualSize()
// set defaults for replication action based on operation being performed - actual
// replication action can only be determined after stat on remote. This default is
// needed for updating replication metrics correctly when target is offline.
rAction := replicateMetadata
rinfo = replicatedTargetInfo{
Size: sz,
Arn: tgt.ARN,
PrevReplicationStatus: objInfo.TargetReplicationStatus(tgt.ARN),
ReplicationStatus: replication.Failed,
OpType: ri.OpType,
ReplicationAction: rAction,
}
if ri.ObjectInfo.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) {
rinfo.ReplicationStatus = replication.Completed
rinfo.ReplicationResynced = true
return
}
if globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, tgt.ARN))
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
return
}
versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object)
versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object)
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
VersionID: objInfo.VersionID,
Versioned: versioned,
VersionSuspended: versionSuspended,
})
if err != nil {
if !isErrObjectNotFound(err) {
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
logger.LogIf(ctx, fmt.Errorf("Unable to update replicate metadata for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err))
}
return
}
defer gr.Close()
objInfo = gr.ObjInfo
size, err := objInfo.GetActualSize()
if err != nil {
logger.LogIf(ctx, err)
sendEvent(eventArgs{
@ -1149,8 +1285,6 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object
// as Completed.
//
// Note: Replication Stats would have been updated despite metadata update failure.
gr.Close()
closeOnDefer = false
rinfo.ReplicationAction = rAction
rinfo.ReplicationStatus = replication.Completed
}
@ -1221,8 +1355,6 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object
}
}
}
gr.Close()
closeOnDefer = false
return
}

View File

@ -327,6 +327,24 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
// disks. `uploads.json` carries metadata regarding on-going multipart
// operation(s) on the object.
func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, object string, opts ObjectOptions) (*NewMultipartUploadResult, error) {
if opts.CheckPrecondFn != nil {
// Lock the object before reading.
lk := er.NewNSLock(bucket, object)
lkctx, err := lk.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return nil, err
}
rctx := lkctx.Context()
obj, err := er.getObjectInfo(rctx, bucket, object, opts)
lk.RUnlock(lkctx.Cancel)
if err != nil {
return nil, err
}
if opts.CheckPrecondFn(obj) {
return nil, PreConditionFailed{}
}
}
userDefined := cloneMSS(opts.UserDefined)
onlineDisks := er.getDisks()

View File

@ -915,6 +915,16 @@ func (er erasureObjects) PutObject(ctx context.Context, bucket string, object st
func (er erasureObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
auditObjectErasureSet(ctx, object, &er)
if opts.CheckPrecondFn != nil {
obj, err := er.getObjectInfo(ctx, bucket, object, opts)
if err != nil {
return objInfo, err
}
if opts.CheckPrecondFn(obj) {
return objInfo, PreConditionFailed{}
}
}
data := r.Reader
userDefined := cloneMSS(opts.UserDefined)
@ -1146,11 +1156,9 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
})
}
if userDefined["etag"] == "" {
userDefined["etag"] = r.MD5CurrentHexString()
if opts.PreserveETag != "" {
userDefined["etag"] = opts.PreserveETag
}
userDefined["etag"] = r.MD5CurrentHexString()
if opts.PreserveETag != "" {
userDefined["etag"] = opts.PreserveETag
}
// Guess content-type from the extension if possible.

View File

@ -283,14 +283,12 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada
}
}
etag := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceETag))
if etag != "" {
if metadata == nil {
metadata = make(map[string]string, 1)
}
metadata["etag"] = etag
if metadata == nil {
metadata = make(map[string]string)
}
etag := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceETag))
wantCRC, err := hash.GetContentChecksum(r)
if err != nil {
return opts, InvalidArgument{
@ -310,6 +308,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada
Versioned: versioned,
VersionSuspended: versionSuspended,
MTime: mtime,
PreserveETag: etag,
WantChecksum: wantCRC,
}, nil
}

View File

@ -130,6 +130,54 @@ func checkCopyObjectPreconditions(ctx context.Context, w http.ResponseWriter, r
return false
}
// Validates the preconditions. Returns true if PUT operation should not proceed.
// Preconditions supported are:
//
// x-minio-source-mtime
// x-minio-source-etag
func checkPreconditionsPUT(ctx context.Context, w http.ResponseWriter, r *http.Request, objInfo ObjectInfo, opts ObjectOptions) bool {
// Return false for methods other than PUT.
if r.Method != http.MethodPut {
return false
}
// If the object doesn't have a modtime (IsZero), or the modtime
// is obviously garbage (Unix time == 0), then ignore modtimes
// and don't process the If-Modified-Since header.
if objInfo.ModTime.IsZero() || objInfo.ModTime.Equal(time.Unix(0, 0)) {
return false
}
// If top level is a delete marker proceed to upload.
if objInfo.DeleteMarker {
return false
}
// Headers to be set of object content is not going to be written to the client.
writeHeaders := func() {
// set common headers
setCommonHeaders(w)
// set object-related metadata headers
w.Header().Set(xhttp.LastModified, objInfo.ModTime.UTC().Format(http.TimeFormat))
if objInfo.ETag != "" {
w.Header()[xhttp.ETag] = []string{"\"" + objInfo.ETag + "\""}
}
}
etagMatch := opts.PreserveETag != "" && isETagEqual(objInfo.ETag, opts.PreserveETag)
vidMatch := opts.VersionID != "" && opts.VersionID == objInfo.VersionID
mtimeMatch := !opts.MTime.IsZero() && objInfo.ModTime.Unix() >= opts.MTime.Unix()
if etagMatch && vidMatch && mtimeMatch {
writeHeaders()
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL)
return true
}
// Object content should be persisted.
return false
}
// Validates the preconditions. Returns true if GET/HEAD operation should not proceed.
// Preconditions supported are:
//

View File

@ -1786,6 +1786,18 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
}
opts.IndexCB = idxCb
if !opts.MTime.IsZero() && opts.PreserveETag != "" {
opts.CheckPrecondFn = func(oi ObjectInfo) bool {
if objectAPI.IsEncryptionSupported() {
if _, err := DecryptObjectInfo(&oi, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return true
}
}
return checkPreconditionsPUT(ctx, w, r, oi, opts)
}
}
if api.CacheAPI() != nil {
putObject = api.CacheAPI().PutObject
}

View File

@ -176,6 +176,18 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
return
}
if !opts.MTime.IsZero() && opts.PreserveETag != "" {
opts.CheckPrecondFn = func(oi ObjectInfo) bool {
if objectAPI.IsEncryptionSupported() {
if _, err := DecryptObjectInfo(&oi, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return true
}
}
return checkPreconditionsPUT(ctx, w, r, oi, opts)
}
}
checksumType := hash.NewChecksumType(r.Header.Get(xhttp.AmzChecksumAlgo))
if checksumType.Is(hash.ChecksumInvalid) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequestParameter), r.URL)