fix: make sure failed requests only to failed queue (#12196)

failed queue should be used for retried requests to
avoid cascading the failures into incoming queue, this
would allow for a more fair retry for failed replicas.

Additionally also avoid taking context in queue task
to avoid confusion, simplifies its usage.
This commit is contained in:
Harshavardhana 2021-04-29 18:20:39 -07:00 committed by GitHub
parent 90112b5644
commit 0faa4e6187
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 40 deletions

View File

@ -799,7 +799,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
if replicationStatus != replication.Completed && ri.RetryCount < 1 { if replicationStatus != replication.Completed && ri.RetryCount < 1 {
ri.OpType = replication.HealReplicationType ri.OpType = replication.HealReplicationType
ri.RetryCount++ ri.RetryCount++
globalReplicationPool.queueReplicaTask(ctx, ri) globalReplicationPool.queueReplicaFailedTask(ri)
} }
} }
@ -839,31 +839,29 @@ var (
// ReplicationPool describes replication pool // ReplicationPool describes replication pool
type ReplicationPool struct { type ReplicationPool struct {
objLayer ObjectLayer objLayer ObjectLayer
ctx context.Context ctx context.Context
mrfWorkerKillCh chan struct{} mrfWorkerKillCh chan struct{}
workerKillCh chan struct{} workerKillCh chan struct{}
mrfReplicaDeleteCh chan DeletedObjectVersionInfo replicaCh chan ReplicateObjectInfo
replicaCh chan ReplicateObjectInfo replicaDeleteCh chan DeletedObjectVersionInfo
replicaDeleteCh chan DeletedObjectVersionInfo mrfReplicaCh chan ReplicateObjectInfo
mrfReplicaCh chan ReplicateObjectInfo workerSize int
workerSize int mrfWorkerSize int
mrfWorkerSize int workerWg sync.WaitGroup
workerWg sync.WaitGroup mrfWorkerWg sync.WaitGroup
mrfWorkerWg sync.WaitGroup once sync.Once
once sync.Once mu sync.Mutex
mu sync.Mutex
} }
// NewReplicationPool creates a pool of replication workers of specified size // NewReplicationPool creates a pool of replication workers of specified size
func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool { func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool {
pool := &ReplicationPool{ pool := &ReplicationPool{
replicaCh: make(chan ReplicateObjectInfo, 10000), replicaCh: make(chan ReplicateObjectInfo, 100000),
replicaDeleteCh: make(chan DeletedObjectVersionInfo, 10000), replicaDeleteCh: make(chan DeletedObjectVersionInfo, 100000),
mrfReplicaCh: make(chan ReplicateObjectInfo, 100000), mrfReplicaCh: make(chan ReplicateObjectInfo, 100000),
mrfReplicaDeleteCh: make(chan DeletedObjectVersionInfo, 100000), ctx: ctx,
ctx: ctx, objLayer: o,
objLayer: o,
} }
pool.ResizeWorkers(opts.Workers) pool.ResizeWorkers(opts.Workers)
pool.ResizeFailedWorkers(opts.FailedWorkers) pool.ResizeFailedWorkers(opts.FailedWorkers)
@ -882,11 +880,6 @@ func (p *ReplicationPool) AddMRFWorker() {
return return
} }
replicateObject(p.ctx, oi, p.objLayer) replicateObject(p.ctx, oi, p.objLayer)
case doi, ok := <-p.mrfReplicaDeleteCh:
if !ok {
return
}
replicateDelete(p.ctx, doi, p.objLayer)
} }
} }
} }
@ -947,36 +940,46 @@ func (p *ReplicationPool) ResizeFailedWorkers(n int) {
} }
} }
func (p *ReplicationPool) queueReplicaTask(ctx context.Context, ri ReplicateObjectInfo) { func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) {
if p == nil { if p == nil {
return return
} }
select { select {
case <-ctx.Done(): case <-GlobalContext.Done():
p.once.Do(func() {
close(p.replicaCh)
close(p.mrfReplicaCh)
})
case p.mrfReplicaCh <- ri:
default:
}
}
func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
if p == nil {
return
}
select {
case <-GlobalContext.Done():
p.once.Do(func() { p.once.Do(func() {
close(p.replicaCh) close(p.replicaCh)
close(p.mrfReplicaCh) close(p.mrfReplicaCh)
}) })
case p.replicaCh <- ri: case p.replicaCh <- ri:
case p.mrfReplicaCh <- ri:
// queue all overflows into the mrfReplicaCh to handle incoming pending/failed operations
default: default:
} }
} }
func (p *ReplicationPool) queueReplicaDeleteTask(ctx context.Context, doi DeletedObjectVersionInfo) { func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectVersionInfo) {
if p == nil { if p == nil {
return return
} }
select { select {
case <-ctx.Done(): case <-GlobalContext.Done():
p.once.Do(func() { p.once.Do(func() {
close(p.replicaDeleteCh) close(p.replicaDeleteCh)
close(p.mrfReplicaDeleteCh)
}) })
case p.replicaDeleteCh <- doi: case p.replicaDeleteCh <- doi:
case p.mrfReplicaDeleteCh <- doi:
// queue all overflows into the mrfReplicaDeleteCh to handle incoming pending/failed operations
default: default:
} }
} }
@ -1134,7 +1137,7 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer,
if sync { if sync {
replicateObject(ctx, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType}, o) replicateObject(ctx, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType}, o)
} else { } else {
globalReplicationPool.queueReplicaTask(GlobalContext, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType}) globalReplicationPool.queueReplicaTask(ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType})
} }
if sz, err := objInfo.GetActualSize(); err == nil { if sz, err := objInfo.GetActualSize(); err == nil {
globalReplicationStats.Update(objInfo.Bucket, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType) globalReplicationStats.Update(objInfo.Bucket, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType)
@ -1142,6 +1145,6 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer,
} }
func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectVersionInfo, o ObjectLayer, sync bool) { func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectVersionInfo, o ObjectLayer, sync bool) {
globalReplicationPool.queueReplicaDeleteTask(GlobalContext, dv) globalReplicationPool.queueReplicaDeleteTask(dv)
globalReplicationStats.Update(dv.Bucket, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType) globalReplicationStats.Update(dv.Bucket, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
} }

View File

@ -1136,11 +1136,11 @@ func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi Obj
case replication.Pending: case replication.Pending:
sizeS.pendingCount++ sizeS.pendingCount++
sizeS.pendingSize += oi.Size sizeS.pendingSize += oi.Size
globalReplicationPool.queueReplicaTask(ctx, ReplicateObjectInfo{ObjectInfo: oi, OpType: replication.HealReplicationType}) globalReplicationPool.queueReplicaTask(ReplicateObjectInfo{ObjectInfo: oi, OpType: replication.HealReplicationType})
case replication.Failed: case replication.Failed:
sizeS.failedSize += oi.Size sizeS.failedSize += oi.Size
sizeS.failedCount++ sizeS.failedCount++
globalReplicationPool.queueReplicaTask(ctx, ReplicateObjectInfo{ObjectInfo: oi, OpType: replication.HealReplicationType}) globalReplicationPool.queueReplicaTask(ReplicateObjectInfo{ObjectInfo: oi, OpType: replication.HealReplicationType})
case replication.Completed, "COMPLETE": case replication.Completed, "COMPLETE":
sizeS.replicatedSize += oi.Size sizeS.replicatedSize += oi.Size
case replication.Replica: case replication.Replica:
@ -1159,7 +1159,7 @@ func (i *scannerItem) healReplicationDeletes(ctx context.Context, o ObjectLayer,
} else { } else {
versionID = oi.VersionID versionID = oi.VersionID
} }
globalReplicationPool.queueReplicaDeleteTask(ctx, DeletedObjectVersionInfo{ globalReplicationPool.queueReplicaDeleteTask(DeletedObjectVersionInfo{
DeletedObject: DeletedObject{ DeletedObject: DeletedObject{
ObjectName: oi.Name, ObjectName: oi.Name,
DeleteMarkerVersionID: dmVersionID, DeleteMarkerVersionID: dmVersionID,