Avoid metadata update for incoming replication failure (#12054)

This is an optimization to save IOPS. The replication
failures will be re-queued once more to re-attempt
replication. If it still does not succeed, the replication
status is set as `FAILED` and will be caught up on
scanner cycle.
This commit is contained in:
Poorna Krishnamoorthy
2021-04-15 16:32:00 -07:00
committed by GitHub
parent 75ac4ea840
commit d30c5d1cf0
4 changed files with 53 additions and 36 deletions

View File

@@ -571,7 +571,8 @@ func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationActio
// replicateObject replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLayer) {
func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer) {
objInfo := ri.ObjectInfo
bucket := objInfo.Bucket
object := objInfo.Name
@@ -740,34 +741,42 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
if !ok {
return
}
// This lower level implementation is necessary to avoid write locks from CopyObject.
poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
} else {
fi := FileInfo{}
fi.VersionID = objInfo.VersionID
fi.Metadata = make(map[string]string, len(objInfo.UserDefined))
for k, v := range objInfo.UserDefined {
fi.Metadata[k] = v
}
if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, fi); err != nil {
// Leave metadata in `PENDING` state if inline replication fails to save iops
if ri.OpType == replication.HealReplicationType || replicationStatus == replication.Completed {
// This lower level implementation is necessary to avoid write locks from CopyObject.
poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
} else {
fi := FileInfo{}
fi.VersionID = objInfo.VersionID
fi.Metadata = make(map[string]string, len(objInfo.UserDefined))
for k, v := range objInfo.UserDefined {
fi.Metadata[k] = v
}
if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, fi); err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
}
}
opType := replication.MetadataReplicationType
if rtype == replicateAll {
opType = replication.ObjectReplicationType
}
globalReplicationStats.Update(bucket, size, replicationStatus, prevReplStatus, opType)
sendEvent(eventArgs{
EventName: eventName,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
}
opType := replication.MetadataReplicationType
if rtype == replicateAll {
opType = replication.ObjectReplicationType
// re-queue failures once more - keep a retry count to avoid flooding the queue if
// the target site is down. Leave it to scanner to catch up instead.
if replicationStatus == replication.Failed && ri.RetryCount < 1 {
ri.OpType = replication.HealReplicationType
ri.RetryCount++
globalReplicationPool.queueReplicaTask(ctx, ri)
}
globalReplicationStats.Update(bucket, size, replicationStatus, prevReplStatus, opType)
sendEvent(eventArgs{
EventName: eventName,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
}
// filterReplicationStatusMetadata filters replication status metadata for COPY
@@ -808,9 +817,9 @@ var (
type ReplicationPool struct {
mu sync.Mutex
size int
replicaCh chan ObjectInfo
replicaCh chan ReplicateObjectInfo
replicaDeleteCh chan DeletedObjectVersionInfo
mrfReplicaCh chan ObjectInfo
mrfReplicaCh chan ReplicateObjectInfo
mrfReplicaDeleteCh chan DeletedObjectVersionInfo
killCh chan struct{}
wg sync.WaitGroup
@@ -821,9 +830,9 @@ type ReplicationPool struct {
// NewReplicationPool creates a pool of replication workers of specified size
func NewReplicationPool(ctx context.Context, o ObjectLayer, sz int) *ReplicationPool {
pool := &ReplicationPool{
replicaCh: make(chan ObjectInfo, 1000),
replicaCh: make(chan ReplicateObjectInfo, 1000),
replicaDeleteCh: make(chan DeletedObjectVersionInfo, 1000),
mrfReplicaCh: make(chan ObjectInfo, 100000),
mrfReplicaCh: make(chan ReplicateObjectInfo, 100000),
mrfReplicaDeleteCh: make(chan DeletedObjectVersionInfo, 100000),
ctx: ctx,
objLayer: o,
@@ -895,7 +904,7 @@ func (p *ReplicationPool) Resize(n int) {
}
}
func (p *ReplicationPool) queueReplicaTask(ctx context.Context, oi ObjectInfo) {
func (p *ReplicationPool) queueReplicaTask(ctx context.Context, ri ReplicateObjectInfo) {
if p == nil {
return
}
@@ -903,8 +912,8 @@ func (p *ReplicationPool) queueReplicaTask(ctx context.Context, oi ObjectInfo) {
case <-ctx.Done():
close(p.replicaCh)
close(p.mrfReplicaCh)
case p.replicaCh <- oi:
case p.mrfReplicaCh <- oi:
case p.replicaCh <- ri:
case p.mrfReplicaCh <- ri:
// queue all overflows into the mrfReplicaCh to handle incoming pending/failed operations
default:
}
@@ -1065,9 +1074,9 @@ func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, op
func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, sync bool, opType replication.Type) {
if sync {
replicateObject(ctx, objInfo, o)
replicateObject(ctx, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType}, o)
} else {
globalReplicationPool.queueReplicaTask(GlobalContext, objInfo)
globalReplicationPool.queueReplicaTask(GlobalContext, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType})
}
if sz, err := objInfo.GetActualSize(); err == nil {
globalReplicationStats.Update(objInfo.Bucket, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType)