diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 874352136..7dbda7583 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -1989,7 +1989,6 @@ func (j *BatchJobPool) AddWorker() { } } } - job.delete(j.ctx, j.objLayer) j.canceler(job.ID, false) case <-j.workerKillCh: return @@ -2176,11 +2175,42 @@ func (m *batchJobMetrics) purgeJobMetrics() { m.RUnlock() for _, jobID := range toDeleteJobMetrics { m.delete(jobID) + j := BatchJobRequest{ + ID: jobID, + } + j.delete(GlobalContext, newObjectLayerFn()) } } } } +// load metrics from disk on startup +func (m *batchJobMetrics) init(ctx context.Context, objectAPI ObjectLayer) error { + resultCh := make(chan itemOrErr[ObjectInfo]) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := objectAPI.Walk(ctx, minioMetaBucket, batchJobReportsPrefix, resultCh, WalkOptions{}); err != nil { + return err + } + + for result := range resultCh { + if result.Err != nil { + return result.Err + } + ri := &batchJobInfo{} + if err := ri.loadByPath(ctx, objectAPI, result.Item.Name); err != nil { + if !errors.Is(err, errNoSuchJob) { + batchLogIf(ctx, err) + } + continue + } + m.metrics[ri.JobID] = ri + } + return nil +} + func (m *batchJobMetrics) delete(jobID string) { m.Lock() defer m.Unlock() diff --git a/cmd/common-main.go b/cmd/common-main.go index 0670b5de0..3c45b7aa7 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -91,9 +91,6 @@ func init() { logger.Init(GOPATH, GOROOT) logger.RegisterError(config.FmtError) - globalBatchJobsMetrics = batchJobMetrics{metrics: make(map[string]*batchJobInfo)} - go globalBatchJobsMetrics.purgeJobMetrics() - t, _ := minioVersionToReleaseTime(Version) if !t.IsZero() { globalVersionUnix = uint64(t.Unix()) diff --git a/cmd/server-main.go b/cmd/server-main.go index 98b2e4a23..19accde84 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -1104,6 +1104,11 @@ func serverMain(ctx *cli.Context) { // Initialize batch job pool. bootstrapTrace("newBatchJobPool", func() { globalBatchJobPool = newBatchJobPool(GlobalContext, newObject, 100) + globalBatchJobsMetrics = batchJobMetrics{ + metrics: make(map[string]*batchJobInfo), + } + go globalBatchJobsMetrics.init(GlobalContext, newObject) + go globalBatchJobsMetrics.purgeJobMetrics() }) // Prints the formatted startup message, if err is not nil then it prints additional information as well.