batch-expiry: Save progress regularly in the drives and at the end (#20098)

- Also, fix failure reporting at the end.
- Also, avoid parsing report objects when listing or resuming jobs, this
does not cause any bugs, it is only printing, not useful errors.
This commit is contained in:
Anis Eleuch 2024-07-17 17:42:32 +01:00 committed by GitHub
parent b276651eaa
commit 2e5d792f0c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 21 additions and 12 deletions

View File

@ -552,22 +552,25 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
go func() { go func() {
saveTicker := time.NewTicker(10 * time.Second) saveTicker := time.NewTicker(10 * time.Second)
defer saveTicker.Stop() defer saveTicker.Stop()
for { quit := false
after := time.Minute
for !quit {
select { select {
case <-saveTicker.C: case <-saveTicker.C:
// persist in-memory state to disk after every 10secs.
batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job))
case <-ctx.Done(): case <-ctx.Done():
// persist in-memory state immediately before exiting due to context cancellation. quit = true
batchLogIf(ctx, ri.updateAfter(ctx, api, 0, job))
return
case <-saverQuitCh: case <-saverQuitCh:
// persist in-memory state immediately to disk. quit = true
batchLogIf(ctx, ri.updateAfter(ctx, api, 0, job))
return
} }
if quit {
// save immediately if we are quitting
after = 0
}
ctx, cancel := context.WithTimeout(GlobalContext, 30*time.Second) // independent context
batchLogIf(ctx, ri.updateAfter(ctx, api, after, job))
cancel()
} }
}() }()
@ -584,7 +587,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
versionsCount int versionsCount int
toDel []expireObjInfo toDel []expireObjInfo
) )
failed := true failed := false
for result := range results { for result := range results {
if result.Err != nil { if result.Err != nil {
failed = true failed = true

View File

@ -1567,6 +1567,9 @@ func (a adminAPIHandlers) ListBatchJobs(w http.ResponseWriter, r *http.Request)
writeErrorResponseJSON(ctx, w, toAPIError(ctx, result.Err), r.URL) writeErrorResponseJSON(ctx, w, toAPIError(ctx, result.Err), r.URL)
return return
} }
if strings.HasPrefix(result.Item.Name, batchJobReportsPrefix+slashSeparator) {
continue
}
req := &BatchJobRequest{} req := &BatchJobRequest{}
if err := req.load(ctx, objectAPI, result.Item.Name); err != nil { if err := req.load(ctx, objectAPI, result.Item.Name); err != nil {
if !errors.Is(err, errNoSuchJob) { if !errors.Is(err, errNoSuchJob) {
@ -1883,6 +1886,9 @@ func (j *BatchJobPool) resume(randomWait func() time.Duration) {
batchLogIf(j.ctx, result.Err) batchLogIf(j.ctx, result.Err)
continue continue
} }
if strings.HasPrefix(result.Item.Name, batchJobReportsPrefix+slashSeparator) {
continue
}
// ignore batch-replicate.bin and batch-rotate.bin entries // ignore batch-replicate.bin and batch-rotate.bin entries
if strings.HasSuffix(result.Item.Name, slashSeparator) { if strings.HasSuffix(result.Item.Name, slashSeparator) {
continue continue