From 990fc415f732f8e5afd2dcd7ae6602af2924e25d Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Tue, 7 Feb 2023 23:11:42 -0800 Subject: [PATCH] Ensure safety of transitionState at startup (#16563) --- cmd/bucket-lifecycle.go | 28 +++++++++++++++++++--------- cmd/server-main.go | 4 ++-- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index eb3af59b3..e80c40a9a 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -180,20 +180,32 @@ func (t *transitionState) queueTransitionTask(oi ObjectInfo, sc string) { var globalTransitionState *transitionState -func newTransitionState(ctx context.Context, objAPI ObjectLayer) *transitionState { +// newTransitionState returns a transitionState object ready to be initialized +// via its Init method. +func newTransitionState(ctx context.Context) *transitionState { return &transitionState{ transitionCh: make(chan transitionTask, 10000), ctx: ctx, - objAPI: objAPI, killCh: make(chan struct{}), lastDayStats: make(map[string]*lastDayTierStats), } } +// Init initializes t with given objAPI and instantiates the configured number +// of transition workers. +func (t *transitionState) Init(objAPI ObjectLayer) { + n := globalAPIConfig.getTransitionWorkers() + t.mu.Lock() + defer t.mu.Unlock() + + t.objAPI = objAPI + t.updateWorkers(n) +} + // PendingTasks returns the number of ILM transition tasks waiting for a worker // goroutine. func (t *transitionState) PendingTasks() int { - return len(globalTransitionState.transitionCh) + return len(t.transitionCh) } // ActiveTasks returns the number of active (ongoing) ILM transition tasks. @@ -259,6 +271,10 @@ func (t *transitionState) UpdateWorkers(n int) { t.mu.Lock() defer t.mu.Unlock() + t.updateWorkers(n) +} + +func (t *transitionState) updateWorkers(n int) { for t.numWorkers < n { go t.worker(t.ctx, t.objAPI) t.numWorkers++ @@ -270,12 +286,6 @@ func (t *transitionState) UpdateWorkers(n int) { } } -func initBackgroundTransition(ctx context.Context, objectAPI ObjectLayer) { - globalTransitionState = newTransitionState(ctx, objectAPI) - n := globalAPIConfig.getTransitionWorkers() - globalTransitionState.UpdateWorkers(n) -} - var errInvalidStorageClass = errors.New("invalid storage class") func validateTransitionTier(lc *lifecycle.Lifecycle) error { diff --git a/cmd/server-main.go b/cmd/server-main.go index feb9f4d70..3678ef389 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -323,6 +323,7 @@ func initAllSubsystems(ctx context.Context) { // Create new ILM tier configuration subsystem globalTierConfigMgr = NewTierConfigMgr() + globalTransitionState = newTransitionState(GlobalContext) globalSiteResyncMetrics = newSiteResyncMetrics(GlobalContext) } @@ -674,8 +675,7 @@ func serverMain(ctx *cli.Context) { // Initialize background replication initBackgroundReplication(GlobalContext, newObject) - // Initialize background transition - initBackgroundTransition(GlobalContext, newObject) + globalTransitionState.Init(newObject) // Initialize batch job pool. globalBatchJobPool = newBatchJobPool(GlobalContext, newObject, 100)