diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index 5bcc29698..c3e0cb48f 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -24,6 +24,7 @@ import ( "net/http" "runtime" "strings" + "sync" "time" miniogo "github.com/minio/minio-go/v7" @@ -71,11 +72,16 @@ type expiryTask struct { } type expiryState struct { + once sync.Once expiryCh chan expiryTask } func (es *expiryState) queueExpiryTask(oi ObjectInfo, rmVersion bool) { select { + case <-GlobalContext.Done(): + es.once.Do(func() { + close(es.expiryCh) + }) case es.expiryCh <- expiryTask{objInfo: oi, versionExpiry: rmVersion}: default: } @@ -86,14 +92,9 @@ var ( ) func newExpiryState() *expiryState { - es := &expiryState{ + return &expiryState{ expiryCh: make(chan expiryTask, 10000), } - go func() { - <-GlobalContext.Done() - close(es.expiryCh) - }() - return es } func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) { @@ -106,12 +107,17 @@ func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) { } type transitionState struct { + once sync.Once // add future metrics here transitionCh chan ObjectInfo } func (t *transitionState) queueTransitionTask(oi ObjectInfo) { select { + case <-GlobalContext.Done(): + t.once.Do(func() { + close(t.transitionCh) + }) case t.transitionCh <- oi: default: } @@ -123,19 +129,13 @@ var ( ) func newTransitionState() *transitionState { - // fix minimum concurrent transition to 1 for single CPU setup if globalTransitionConcurrent == 0 { globalTransitionConcurrent = 1 } - ts := &transitionState{ + return &transitionState{ transitionCh: make(chan ObjectInfo, 10000), } - go func() { - <-GlobalContext.Done() - close(ts.transitionCh) - }() - return ts } // addWorker creates a new worker to process tasks diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 3b9026961..f22c1eb44 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -815,6 +815,7 @@ var ( // ReplicationPool describes replication pool type ReplicationPool struct { + once sync.Once mu sync.Mutex size int replicaCh chan ReplicateObjectInfo @@ -910,8 +911,10 @@ func (p *ReplicationPool) queueReplicaTask(ctx context.Context, ri ReplicateObje } select { case <-ctx.Done(): - close(p.replicaCh) - close(p.mrfReplicaCh) + p.once.Do(func() { + close(p.replicaCh) + close(p.mrfReplicaCh) + }) case p.replicaCh <- ri: case p.mrfReplicaCh <- ri: // queue all overflows into the mrfReplicaCh to handle incoming pending/failed operations @@ -925,8 +928,10 @@ func (p *ReplicationPool) queueReplicaDeleteTask(ctx context.Context, doi Delete } select { case <-ctx.Done(): - close(p.replicaDeleteCh) - close(p.mrfReplicaDeleteCh) + p.once.Do(func() { + close(p.replicaDeleteCh) + close(p.mrfReplicaDeleteCh) + }) case p.replicaDeleteCh <- doi: case p.mrfReplicaDeleteCh <- doi: // queue all overflows into the mrfReplicaDeleteCh to handle incoming pending/failed operations diff --git a/cmd/server-main.go b/cmd/server-main.go index d808a5c87..0ee4cabf6 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -516,9 +516,9 @@ func serverMain(ctx *cli.Context) { if globalIsErasure { initAutoHeal(GlobalContext, newObject) initBackgroundTransition(GlobalContext, newObject) - initBackgroundExpiry(GlobalContext, newObject) } + initBackgroundExpiry(GlobalContext, newObject) initDataScanner(GlobalContext, newObject) if err = initServer(GlobalContext, newObject); err != nil {