simplify common functions in replication (#19480)

This commit is contained in:
Harshavardhana 2024-04-11 17:27:32 -07:00 committed by GitHub
parent 5206c0e883
commit 7e3166475d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1267,23 +1267,19 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
}
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
if objInfo.isMultipart() {
if rinfo.Err = replicateObjectWithMultipart(ctx, c, tgt.Bucket, object,
r, objInfo, putOpts); rinfo.Err != nil {
if minio.ToErrorResponse(rinfo.Err).Code != "PreconditionFailed" {
rinfo.ReplicationStatus = replication.Failed
replLogIf(ctx, fmt.Errorf("unable to replicate for object %s/%s(%s): %s (target: %s)", bucket, objInfo.Name, objInfo.VersionID, rinfo.Err, tgt.EndpointURL()))
}
}
rinfo.Err = replicateObjectWithMultipart(ctx, c, tgt.Bucket, object, r, objInfo, putOpts)
} else {
if _, rinfo.Err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts); rinfo.Err != nil {
if minio.ToErrorResponse(rinfo.Err).Code != "PreconditionFailed" {
rinfo.ReplicationStatus = replication.Failed
replLogIf(ctx, fmt.Errorf("unable to replicate for object %s/%s(%s): %s (target: %s)", bucket, objInfo.Name, objInfo.VersionID, rinfo.Err, tgt.EndpointURL()))
}
}
_, rinfo.Err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts)
}
if rinfo.Err != nil && minio.IsNetworkOrHostDown(rinfo.Err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
globalBucketTargetSys.markOffline(tgt.EndpointURL())
if rinfo.Err != nil {
if minio.ToErrorResponse(rinfo.Err).Code != "PreconditionFailed" {
rinfo.ReplicationStatus = replication.Failed
replLogIf(ctx, fmt.Errorf("unable to replicate for object %s/%s(%s): to (target: %s): %w",
bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), rinfo.Err))
}
if minio.IsNetworkOrHostDown(rinfo.Err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
globalBucketTargetSys.markOffline(tgt.EndpointURL())
}
}
return
}
@ -1537,24 +1533,14 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
}
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
if objInfo.isMultipart() {
if rinfo.Err = replicateObjectWithMultipart(ctx, c, tgt.Bucket, object,
r, objInfo, putOpts); rinfo.Err != nil {
if minio.ToErrorResponse(rinfo.Err).Code != "PreconditionFailed" {
rinfo.ReplicationStatus = replication.Failed
replLogIf(ctx, fmt.Errorf("unable to replicate for object %s/%s(%s) to target %s: %w", bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), rinfo.Err))
} else {
rinfo.ReplicationStatus = replication.Completed
}
}
rinfo.Err = replicateObjectWithMultipart(ctx, c, tgt.Bucket, object, r, objInfo, putOpts)
} else {
if _, rinfo.Err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts); rinfo.Err != nil {
if minio.ToErrorResponse(rinfo.Err).Code != "PreconditionFailed" {
rinfo.ReplicationStatus = replication.Failed
replLogIf(ctx, fmt.Errorf("unable to replicate for object %s/%s(%s) to target %s: %w", bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), rinfo.Err))
} else {
rinfo.ReplicationStatus = replication.Completed
}
}
_, rinfo.Err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts)
}
if rinfo.Err != nil && minio.ToErrorResponse(rinfo.Err).Code != "PreconditionFailed" {
rinfo.ReplicationStatus = replication.Failed
replLogIf(ctx, fmt.Errorf("unable to replicate for object %s/%s(%s) to target %s: %w",
bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), rinfo.Err))
}
if rinfo.Err != nil && minio.IsNetworkOrHostDown(rinfo.Err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
globalBucketTargetSys.markOffline(tgt.EndpointURL())