diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 4b5a006bf..3b9026961 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -571,7 +571,8 @@ func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationActio // replicateObject replicates the specified version of the object to destination bucket // The source object is then updated to reflect the replication status. -func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLayer) { +func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer) { + objInfo := ri.ObjectInfo bucket := objInfo.Bucket object := objInfo.Name @@ -740,34 +741,42 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa if !ok { return } - - // This lower level implementation is necessary to avoid write locks from CopyObject. - poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size) - if err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) - } else { - fi := FileInfo{} - fi.VersionID = objInfo.VersionID - fi.Metadata = make(map[string]string, len(objInfo.UserDefined)) - for k, v := range objInfo.UserDefined { - fi.Metadata[k] = v - } - if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, fi); err != nil { + // Leave metadata in `PENDING` state if inline replication fails to save iops + if ri.OpType == replication.HealReplicationType || replicationStatus == replication.Completed { + // This lower level implementation is necessary to avoid write locks from CopyObject. + poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size) + if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) + } else { + fi := FileInfo{} + fi.VersionID = objInfo.VersionID + fi.Metadata = make(map[string]string, len(objInfo.UserDefined)) + for k, v := range objInfo.UserDefined { + fi.Metadata[k] = v + } + if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, fi); err != nil { + logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) + } } + opType := replication.MetadataReplicationType + if rtype == replicateAll { + opType = replication.ObjectReplicationType + } + globalReplicationStats.Update(bucket, size, replicationStatus, prevReplStatus, opType) + sendEvent(eventArgs{ + EventName: eventName, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [Replication]", + }) } - - opType := replication.MetadataReplicationType - if rtype == replicateAll { - opType = replication.ObjectReplicationType + // re-queue failures once more - keep a retry count to avoid flooding the queue if + // the target site is down. Leave it to scanner to catch up instead. + if replicationStatus == replication.Failed && ri.RetryCount < 1 { + ri.OpType = replication.HealReplicationType + ri.RetryCount++ + globalReplicationPool.queueReplicaTask(ctx, ri) } - globalReplicationStats.Update(bucket, size, replicationStatus, prevReplStatus, opType) - sendEvent(eventArgs{ - EventName: eventName, - BucketName: bucket, - Object: objInfo, - Host: "Internal: [Replication]", - }) } // filterReplicationStatusMetadata filters replication status metadata for COPY @@ -808,9 +817,9 @@ var ( type ReplicationPool struct { mu sync.Mutex size int - replicaCh chan ObjectInfo + replicaCh chan ReplicateObjectInfo replicaDeleteCh chan DeletedObjectVersionInfo - mrfReplicaCh chan ObjectInfo + mrfReplicaCh chan ReplicateObjectInfo mrfReplicaDeleteCh chan DeletedObjectVersionInfo killCh chan struct{} wg sync.WaitGroup @@ -821,9 +830,9 @@ type ReplicationPool struct { // NewReplicationPool creates a pool of replication workers of specified size func NewReplicationPool(ctx context.Context, o ObjectLayer, sz int) *ReplicationPool { pool := &ReplicationPool{ - replicaCh: make(chan ObjectInfo, 1000), + replicaCh: make(chan ReplicateObjectInfo, 1000), replicaDeleteCh: make(chan DeletedObjectVersionInfo, 1000), - mrfReplicaCh: make(chan ObjectInfo, 100000), + mrfReplicaCh: make(chan ReplicateObjectInfo, 100000), mrfReplicaDeleteCh: make(chan DeletedObjectVersionInfo, 100000), ctx: ctx, objLayer: o, @@ -895,7 +904,7 @@ func (p *ReplicationPool) Resize(n int) { } } -func (p *ReplicationPool) queueReplicaTask(ctx context.Context, oi ObjectInfo) { +func (p *ReplicationPool) queueReplicaTask(ctx context.Context, ri ReplicateObjectInfo) { if p == nil { return } @@ -903,8 +912,8 @@ func (p *ReplicationPool) queueReplicaTask(ctx context.Context, oi ObjectInfo) { case <-ctx.Done(): close(p.replicaCh) close(p.mrfReplicaCh) - case p.replicaCh <- oi: - case p.mrfReplicaCh <- oi: + case p.replicaCh <- ri: + case p.mrfReplicaCh <- ri: // queue all overflows into the mrfReplicaCh to handle incoming pending/failed operations default: } @@ -1065,9 +1074,9 @@ func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, op func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, sync bool, opType replication.Type) { if sync { - replicateObject(ctx, objInfo, o) + replicateObject(ctx, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType}, o) } else { - globalReplicationPool.queueReplicaTask(GlobalContext, objInfo) + globalReplicationPool.queueReplicaTask(GlobalContext, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType}) } if sz, err := objInfo.GetActualSize(); err == nil { globalReplicationStats.Update(objInfo.Bucket, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType) diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index e0cd7fdcc..51fa5771c 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -1109,11 +1109,11 @@ func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi Obj case replication.Pending: sizeS.pendingCount++ sizeS.pendingSize += oi.Size - globalReplicationPool.queueReplicaTask(ctx, oi) + globalReplicationPool.queueReplicaTask(ctx, ReplicateObjectInfo{ObjectInfo: oi, OpType: replication.HealReplicationType}) case replication.Failed: sizeS.failedSize += oi.Size sizeS.failedCount++ - globalReplicationPool.queueReplicaTask(ctx, oi) + globalReplicationPool.queueReplicaTask(ctx, ReplicateObjectInfo{ObjectInfo: oi, OpType: replication.HealReplicationType}) case replication.Completed, "COMPLETE": sizeS.replicatedSize += oi.Size case replication.Replica: diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index 3d72f78c4..210814490 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -220,6 +220,13 @@ func (o ObjectInfo) Clone() (cinfo ObjectInfo) { return cinfo } +// ReplicateObjectInfo represents object info to be replicated +type ReplicateObjectInfo struct { + ObjectInfo + OpType replication.Type + RetryCount uint32 +} + // MultipartInfo captures metadata information about the uploadId // this data structure is used primarily for some internal purposes // for verifying upload type such as was the upload diff --git a/pkg/bucket/replication/replication.go b/pkg/bucket/replication/replication.go index d6fd87ad2..e743f54c2 100644 --- a/pkg/bucket/replication/replication.go +++ b/pkg/bucket/replication/replication.go @@ -122,6 +122,7 @@ const ( ObjectReplicationType Type = 1 + iota DeleteReplicationType MetadataReplicationType + HealReplicationType ) // ObjectOpts provides information to deduce whether replication