diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index bed11fd5c..cbc3f3a18 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -925,6 +925,21 @@ func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, ri.countItem(info.Size, info.DeleteMarker, success) } +func (ri *batchJobInfo) trackCurrentBucketBatch(bucket string, batch []ObjectInfo) { + if ri == nil { + return + } + + ri.mu.Lock() + defer ri.mu.Unlock() + + ri.Bucket = bucket + for i := range batch { + ri.Object = batch[i].Name + ri.countItem(batch[i].Size, batch[i].DeleteMarker, true) + } +} + // Start start the batch replication job, resumes if there was a pending job via "job.ID" func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job BatchJobRequest) error { ri := &batchJobInfo{ @@ -1087,6 +1102,11 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba for _, b := range batch { slowCh <- b } + } else { + ri.trackCurrentBucketBatch(r.Source.Bucket, batch) + globalBatchJobsMetrics.save(job.ID, ri) + // persist in-memory state to disk after every 10secs. + logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) } batch = batch[:0] }