hold on to batch job stats till cleanup (#20480)

This PR also fixes job stats not available after restart
This commit is contained in:
Poorna 2024-09-24 14:50:11 -07:00 committed by GitHub
parent 2b0156b1fc
commit b2c5819dbc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 36 additions and 4 deletions

View File

@ -1989,7 +1989,6 @@ func (j *BatchJobPool) AddWorker() {
} }
} }
} }
job.delete(j.ctx, j.objLayer)
j.canceler(job.ID, false) j.canceler(job.ID, false)
case <-j.workerKillCh: case <-j.workerKillCh:
return return
@ -2176,11 +2175,42 @@ func (m *batchJobMetrics) purgeJobMetrics() {
m.RUnlock() m.RUnlock()
for _, jobID := range toDeleteJobMetrics { for _, jobID := range toDeleteJobMetrics {
m.delete(jobID) m.delete(jobID)
j := BatchJobRequest{
ID: jobID,
}
j.delete(GlobalContext, newObjectLayerFn())
} }
} }
} }
} }
// load metrics from disk on startup
func (m *batchJobMetrics) init(ctx context.Context, objectAPI ObjectLayer) error {
resultCh := make(chan itemOrErr[ObjectInfo])
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := objectAPI.Walk(ctx, minioMetaBucket, batchJobReportsPrefix, resultCh, WalkOptions{}); err != nil {
return err
}
for result := range resultCh {
if result.Err != nil {
return result.Err
}
ri := &batchJobInfo{}
if err := ri.loadByPath(ctx, objectAPI, result.Item.Name); err != nil {
if !errors.Is(err, errNoSuchJob) {
batchLogIf(ctx, err)
}
continue
}
m.metrics[ri.JobID] = ri
}
return nil
}
func (m *batchJobMetrics) delete(jobID string) { func (m *batchJobMetrics) delete(jobID string) {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()

View File

@ -91,9 +91,6 @@ func init() {
logger.Init(GOPATH, GOROOT) logger.Init(GOPATH, GOROOT)
logger.RegisterError(config.FmtError) logger.RegisterError(config.FmtError)
globalBatchJobsMetrics = batchJobMetrics{metrics: make(map[string]*batchJobInfo)}
go globalBatchJobsMetrics.purgeJobMetrics()
t, _ := minioVersionToReleaseTime(Version) t, _ := minioVersionToReleaseTime(Version)
if !t.IsZero() { if !t.IsZero() {
globalVersionUnix = uint64(t.Unix()) globalVersionUnix = uint64(t.Unix())

View File

@ -1104,6 +1104,11 @@ func serverMain(ctx *cli.Context) {
// Initialize batch job pool. // Initialize batch job pool.
bootstrapTrace("newBatchJobPool", func() { bootstrapTrace("newBatchJobPool", func() {
globalBatchJobPool = newBatchJobPool(GlobalContext, newObject, 100) globalBatchJobPool = newBatchJobPool(GlobalContext, newObject, 100)
globalBatchJobsMetrics = batchJobMetrics{
metrics: make(map[string]*batchJobInfo),
}
go globalBatchJobsMetrics.init(GlobalContext, newObject)
go globalBatchJobsMetrics.purgeJobMetrics()
}) })
// Prints the formatted startup message, if err is not nil then it prints additional information as well. // Prints the formatted startup message, if err is not nil then it prints additional information as well.