Ensure safety of transitionState at startup (#16563)

This commit is contained in:
Krishnan Parthasarathi 2023-02-07 23:11:42 -08:00 committed by GitHub
parent d8daabae9b
commit 990fc415f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 11 deletions

View File

@ -180,20 +180,32 @@ func (t *transitionState) queueTransitionTask(oi ObjectInfo, sc string) {
var globalTransitionState *transitionState
func newTransitionState(ctx context.Context, objAPI ObjectLayer) *transitionState {
// newTransitionState returns a transitionState object ready to be initialized
// via its Init method.
func newTransitionState(ctx context.Context) *transitionState {
return &transitionState{
transitionCh: make(chan transitionTask, 10000),
ctx: ctx,
objAPI: objAPI,
killCh: make(chan struct{}),
lastDayStats: make(map[string]*lastDayTierStats),
}
}
// Init initializes t with given objAPI and instantiates the configured number
// of transition workers.
func (t *transitionState) Init(objAPI ObjectLayer) {
n := globalAPIConfig.getTransitionWorkers()
t.mu.Lock()
defer t.mu.Unlock()
t.objAPI = objAPI
t.updateWorkers(n)
}
// PendingTasks returns the number of ILM transition tasks waiting for a worker
// goroutine.
func (t *transitionState) PendingTasks() int {
return len(globalTransitionState.transitionCh)
return len(t.transitionCh)
}
// ActiveTasks returns the number of active (ongoing) ILM transition tasks.
@ -259,6 +271,10 @@ func (t *transitionState) UpdateWorkers(n int) {
t.mu.Lock()
defer t.mu.Unlock()
t.updateWorkers(n)
}
func (t *transitionState) updateWorkers(n int) {
for t.numWorkers < n {
go t.worker(t.ctx, t.objAPI)
t.numWorkers++
@ -270,12 +286,6 @@ func (t *transitionState) UpdateWorkers(n int) {
}
}
func initBackgroundTransition(ctx context.Context, objectAPI ObjectLayer) {
globalTransitionState = newTransitionState(ctx, objectAPI)
n := globalAPIConfig.getTransitionWorkers()
globalTransitionState.UpdateWorkers(n)
}
var errInvalidStorageClass = errors.New("invalid storage class")
func validateTransitionTier(lc *lifecycle.Lifecycle) error {

View File

@ -323,6 +323,7 @@ func initAllSubsystems(ctx context.Context) {
// Create new ILM tier configuration subsystem
globalTierConfigMgr = NewTierConfigMgr()
globalTransitionState = newTransitionState(GlobalContext)
globalSiteResyncMetrics = newSiteResyncMetrics(GlobalContext)
}
@ -674,8 +675,7 @@ func serverMain(ctx *cli.Context) {
// Initialize background replication
initBackgroundReplication(GlobalContext, newObject)
// Initialize background transition
initBackgroundTransition(GlobalContext, newObject)
globalTransitionState.Init(newObject)
// Initialize batch job pool.
globalBatchJobPool = newBatchJobPool(GlobalContext, newObject, 100)