replication: use latest object info for metrics update (#17333)

This commit is contained in:
Poorna 2023-06-01 18:52:55 -07:00 committed by GitHub
parent 931712dc46
commit e95825a42e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1148,13 +1148,15 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
UserAgent: "Internal: [Replication]", UserAgent: "Internal: [Replication]",
Host: globalLocalNodeName, Host: globalLocalNodeName,
}) })
logger.LogIf(ctx, fmt.Errorf("Unable to update replicate metadata for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err)) logger.LogIf(ctx, fmt.Errorf("unable to update replicate metadata for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err))
} }
return return
} }
defer gr.Close() defer gr.Close()
objInfo = gr.ObjInfo objInfo = gr.ObjInfo
// make sure we have the latest metadata for metrics calculation
rinfo.PrevReplicationStatus = objInfo.TargetReplicationStatus(tgt.ARN)
size, err := objInfo.GetActualSize() size, err := objInfo.GetActualSize()
if err != nil { if err != nil {
@ -1170,7 +1172,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
} }
if tgt.Bucket == "" { if tgt.Bucket == "" {
logger.LogIf(ctx, fmt.Errorf("Unable to replicate object %s(%s), bucket is empty", objInfo.Name, objInfo.VersionID)) logger.LogIf(ctx, fmt.Errorf("unable to replicate object %s(%s), bucket is empty", objInfo.Name, objInfo.VersionID))
sendEvent(eventArgs{ sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked, EventName: event.ObjectReplicationNotTracked,
BucketName: bucket, BucketName: bucket,
@ -1229,14 +1231,14 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
r, objInfo, putOpts); err != nil { r, objInfo, putOpts); err != nil {
if minio.ToErrorResponse(err).Code != "PreconditionFailed" { if minio.ToErrorResponse(err).Code != "PreconditionFailed" {
rinfo.ReplicationStatus = replication.Failed rinfo.ReplicationStatus = replication.Failed
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err)) logger.LogIf(ctx, fmt.Errorf("unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
} }
} }
} else { } else {
if _, err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts); err != nil { if _, err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts); err != nil {
if minio.ToErrorResponse(err).Code != "PreconditionFailed" { if minio.ToErrorResponse(err).Code != "PreconditionFailed" {
rinfo.ReplicationStatus = replication.Failed rinfo.ReplicationStatus = replication.Failed
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err)) logger.LogIf(ctx, fmt.Errorf("unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
} }
} }
} }
@ -1296,13 +1298,15 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
UserAgent: "Internal: [Replication]", UserAgent: "Internal: [Replication]",
Host: globalLocalNodeName, Host: globalLocalNodeName,
}) })
logger.LogIf(ctx, fmt.Errorf("Unable to update replicate metadata for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err)) logger.LogIf(ctx, fmt.Errorf("unable to update replicate metadata for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err))
} }
return return
} }
defer gr.Close() defer gr.Close()
objInfo = gr.ObjInfo objInfo = gr.ObjInfo
// make sure we have the latest metadata for metrics calculation
rinfo.PrevReplicationStatus = objInfo.TargetReplicationStatus(tgt.ARN)
// use latest ObjectInfo to check if previous replication attempt succeeded // use latest ObjectInfo to check if previous replication attempt succeeded
if objInfo.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) { if objInfo.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) {