diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 4b1936d6f..5476c4003 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -799,7 +799,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje if replicationStatus != replication.Completed && ri.RetryCount < 1 { ri.OpType = replication.HealReplicationType ri.RetryCount++ - globalReplicationPool.queueReplicaTask(ctx, ri) + globalReplicationPool.queueReplicaFailedTask(ri) } } @@ -839,31 +839,29 @@ var ( // ReplicationPool describes replication pool type ReplicationPool struct { - objLayer ObjectLayer - ctx context.Context - mrfWorkerKillCh chan struct{} - workerKillCh chan struct{} - mrfReplicaDeleteCh chan DeletedObjectVersionInfo - replicaCh chan ReplicateObjectInfo - replicaDeleteCh chan DeletedObjectVersionInfo - mrfReplicaCh chan ReplicateObjectInfo - workerSize int - mrfWorkerSize int - workerWg sync.WaitGroup - mrfWorkerWg sync.WaitGroup - once sync.Once - mu sync.Mutex + objLayer ObjectLayer + ctx context.Context + mrfWorkerKillCh chan struct{} + workerKillCh chan struct{} + replicaCh chan ReplicateObjectInfo + replicaDeleteCh chan DeletedObjectVersionInfo + mrfReplicaCh chan ReplicateObjectInfo + workerSize int + mrfWorkerSize int + workerWg sync.WaitGroup + mrfWorkerWg sync.WaitGroup + once sync.Once + mu sync.Mutex } // NewReplicationPool creates a pool of replication workers of specified size func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool { pool := &ReplicationPool{ - replicaCh: make(chan ReplicateObjectInfo, 10000), - replicaDeleteCh: make(chan DeletedObjectVersionInfo, 10000), - mrfReplicaCh: make(chan ReplicateObjectInfo, 100000), - mrfReplicaDeleteCh: make(chan DeletedObjectVersionInfo, 100000), - ctx: ctx, - objLayer: o, + replicaCh: make(chan ReplicateObjectInfo, 100000), + replicaDeleteCh: make(chan DeletedObjectVersionInfo, 100000), + mrfReplicaCh: make(chan ReplicateObjectInfo, 100000), + ctx: ctx, + objLayer: o, } pool.ResizeWorkers(opts.Workers) pool.ResizeFailedWorkers(opts.FailedWorkers) @@ -882,11 +880,6 @@ func (p *ReplicationPool) AddMRFWorker() { return } replicateObject(p.ctx, oi, p.objLayer) - case doi, ok := <-p.mrfReplicaDeleteCh: - if !ok { - return - } - replicateDelete(p.ctx, doi, p.objLayer) } } } @@ -947,36 +940,46 @@ func (p *ReplicationPool) ResizeFailedWorkers(n int) { } } -func (p *ReplicationPool) queueReplicaTask(ctx context.Context, ri ReplicateObjectInfo) { +func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) { if p == nil { return } select { - case <-ctx.Done(): + case <-GlobalContext.Done(): + p.once.Do(func() { + close(p.replicaCh) + close(p.mrfReplicaCh) + }) + case p.mrfReplicaCh <- ri: + default: + } +} + +func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { + if p == nil { + return + } + select { + case <-GlobalContext.Done(): p.once.Do(func() { close(p.replicaCh) close(p.mrfReplicaCh) }) case p.replicaCh <- ri: - case p.mrfReplicaCh <- ri: - // queue all overflows into the mrfReplicaCh to handle incoming pending/failed operations default: } } -func (p *ReplicationPool) queueReplicaDeleteTask(ctx context.Context, doi DeletedObjectVersionInfo) { +func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectVersionInfo) { if p == nil { return } select { - case <-ctx.Done(): + case <-GlobalContext.Done(): p.once.Do(func() { close(p.replicaDeleteCh) - close(p.mrfReplicaDeleteCh) }) case p.replicaDeleteCh <- doi: - case p.mrfReplicaDeleteCh <- doi: - // queue all overflows into the mrfReplicaDeleteCh to handle incoming pending/failed operations default: } } @@ -1134,7 +1137,7 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, if sync { replicateObject(ctx, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType}, o) } else { - globalReplicationPool.queueReplicaTask(GlobalContext, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType}) + globalReplicationPool.queueReplicaTask(ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType}) } if sz, err := objInfo.GetActualSize(); err == nil { globalReplicationStats.Update(objInfo.Bucket, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType) @@ -1142,6 +1145,6 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, } func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectVersionInfo, o ObjectLayer, sync bool) { - globalReplicationPool.queueReplicaDeleteTask(GlobalContext, dv) + globalReplicationPool.queueReplicaDeleteTask(dv) globalReplicationStats.Update(dv.Bucket, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType) } diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 8a4865f04..746c4a2d8 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -1136,11 +1136,11 @@ func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi Obj case replication.Pending: sizeS.pendingCount++ sizeS.pendingSize += oi.Size - globalReplicationPool.queueReplicaTask(ctx, ReplicateObjectInfo{ObjectInfo: oi, OpType: replication.HealReplicationType}) + globalReplicationPool.queueReplicaTask(ReplicateObjectInfo{ObjectInfo: oi, OpType: replication.HealReplicationType}) case replication.Failed: sizeS.failedSize += oi.Size sizeS.failedCount++ - globalReplicationPool.queueReplicaTask(ctx, ReplicateObjectInfo{ObjectInfo: oi, OpType: replication.HealReplicationType}) + globalReplicationPool.queueReplicaTask(ReplicateObjectInfo{ObjectInfo: oi, OpType: replication.HealReplicationType}) case replication.Completed, "COMPLETE": sizeS.replicatedSize += oi.Size case replication.Replica: @@ -1159,7 +1159,7 @@ func (i *scannerItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, } else { versionID = oi.VersionID } - globalReplicationPool.queueReplicaDeleteTask(ctx, DeletedObjectVersionInfo{ + globalReplicationPool.queueReplicaDeleteTask(DeletedObjectVersionInfo{ DeletedObject: DeletedObject{ ObjectName: oi.Name, DeleteMarkerVersionID: dmVersionID,