resume any batch jobs in a goroutine (#20035)

Bonus: move batch job initialization to the last item after all other initialization, 
            allowing for faster startup time for different subsystems.
This commit is contained in:
Harshavardhana
2024-07-03 00:16:05 -07:00
committed by GitHub
parent b6d4a77b94
commit 32d04091a2
7 changed files with 389 additions and 323 deletions

View File

@@ -1815,19 +1815,21 @@ func newBatchJobPool(ctx context.Context, o ObjectLayer, workers int) *BatchJobP
jobCancelers: make(map[string]context.CancelFunc),
}
jpool.ResizeWorkers(workers)
jpool.resume()
go jpool.cleanupReports()
randomWait := func() time.Duration {
// randomWait depends on the number of nodes to avoid triggering resume and cleanups at the same time.
return time.Duration(rand.Float64() * float64(time.Duration(globalEndpoints.NEndpoints())*time.Hour))
}
go func() {
jpool.resume(randomWait)
jpool.cleanupReports(randomWait)
}()
return jpool
}
func (j *BatchJobPool) cleanupReports() {
randomWait := func() time.Duration {
// randomWait depends on the number of nodes to avoid triggering the cleanup at the same time
return time.Duration(rand.Float64() * float64(time.Duration(globalEndpoints.NEndpoints())*time.Hour))
}
func (j *BatchJobPool) cleanupReports(randomWait func() time.Duration) {
t := time.NewTimer(randomWait())
defer t.Stop()
@@ -1864,7 +1866,9 @@ func (j *BatchJobPool) cleanupReports() {
}
}
func (j *BatchJobPool) resume() {
func (j *BatchJobPool) resume(randomWait func() time.Duration) {
time.Sleep(randomWait())
results := make(chan itemOrErr[ObjectInfo], 100)
ctx, cancel := context.WithCancel(j.ctx)
defer cancel()

View File

@@ -1030,11 +1030,6 @@ func serverMain(ctx *cli.Context) {
globalTransitionState.Init(newObject)
})
// Initialize batch job pool.
bootstrapTrace("newBatchJobPool", func() {
globalBatchJobPool = newBatchJobPool(GlobalContext, newObject, 100)
})
// Initialize the license update job
bootstrapTrace("initLicenseUpdateJob", func() {
initLicenseUpdateJob(GlobalContext, newObject)
@@ -1104,6 +1099,11 @@ func serverMain(ctx *cli.Context) {
})
}
// Initialize batch job pool.
bootstrapTrace("newBatchJobPool", func() {
globalBatchJobPool = newBatchJobPool(GlobalContext, newObject, 100)
})
// Prints the formatted startup message, if err is not nil then it prints additional information as well.
printStartupMessage(getAPIEndpoints(), err)