From cfd8645843a5d380a8f84820aff8613dedc0b69f Mon Sep 17 00:00:00 2001 From: Praveen raj Mani Date: Tue, 13 Feb 2024 21:03:27 +0530 Subject: [PATCH] fix: update batch replication stats for snowball uploads (#19045) --- cmd/batch-handlers.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index bed11fd5c..cbc3f3a18 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -925,6 +925,21 @@ func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, ri.countItem(info.Size, info.DeleteMarker, success) } +func (ri *batchJobInfo) trackCurrentBucketBatch(bucket string, batch []ObjectInfo) { + if ri == nil { + return + } + + ri.mu.Lock() + defer ri.mu.Unlock() + + ri.Bucket = bucket + for i := range batch { + ri.Object = batch[i].Name + ri.countItem(batch[i].Size, batch[i].DeleteMarker, true) + } +} + // Start start the batch replication job, resumes if there was a pending job via "job.ID" func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job BatchJobRequest) error { ri := &batchJobInfo{ @@ -1087,6 +1102,11 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba for _, b := range batch { slowCh <- b } + } else { + ri.trackCurrentBucketBatch(r.Source.Bucket, batch) + globalBatchJobsMetrics.save(job.ID, ri) + // persist in-memory state to disk after every 10secs. + logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) } batch = batch[:0] }