Avoid Batch Replication Job log spam (#20158)

Only print once per job and error location.

Set default retry to default 1 second wait, and use as minimum.
This commit is contained in:
Klaus Post 2024-07-26 05:55:50 -07:00 committed by GitHub
parent 064f36ca5a
commit 1966668066
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -725,7 +725,7 @@ const (
batchReplJobAPIVersion = "v1" batchReplJobAPIVersion = "v1"
batchReplJobDefaultRetries = 3 batchReplJobDefaultRetries = 3
batchReplJobDefaultRetryDelay = 250 * time.Millisecond batchReplJobDefaultRetryDelay = time.Second
) )
func getJobPath(job BatchJobRequest) string { func getJobPath(job BatchJobRequest) string {
@ -1016,7 +1016,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
lastObject := ri.Object lastObject := ri.Object
delay := job.Replicate.Flags.Retry.Delay delay := job.Replicate.Flags.Retry.Delay
if delay == 0 { if delay < time.Second {
delay = batchReplJobDefaultRetryDelay delay = batchReplJobDefaultRetryDelay
} }
rnd := rand.New(rand.NewSource(time.Now().UnixNano())) rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
@ -1115,7 +1115,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
slowCh = make(chan itemOrErr[ObjectInfo], 100) slowCh = make(chan itemOrErr[ObjectInfo], 100)
) )
if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() { if r.Source.Snowball.Disable != nil && !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() {
go func() { go func() {
// Snowball currently needs the high level minio-go Client, not the Core one // Snowball currently needs the high level minio-go Client, not the Core one
cl, err := miniogo.New(u.Host, &miniogo.Options{ cl, err := miniogo.New(u.Host, &miniogo.Options{
@ -1125,7 +1125,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
BucketLookup: lookupStyle(r.Target.Path), BucketLookup: lookupStyle(r.Target.Path),
}) })
if err != nil { if err != nil {
batchLogIf(ctx, err) batchLogOnceIf(ctx, err, job.ID+"miniogo.New")
return return
} }
@ -1136,7 +1136,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
writeFn := func(batch []ObjectInfo) { writeFn := func(batch []ObjectInfo) {
if len(batch) > 0 { if len(batch) > 0 {
if err := r.writeAsArchive(ctx, api, cl, batch); err != nil { if err := r.writeAsArchive(ctx, api, cl, batch); err != nil {
batchLogIf(ctx, err) batchLogOnceIf(ctx, err, job.ID+"writeAsArchive")
for _, b := range batch { for _, b := range batch {
slowCh <- itemOrErr[ObjectInfo]{Item: b} slowCh <- itemOrErr[ObjectInfo]{Item: b}
} }
@ -1144,7 +1144,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
ri.trackCurrentBucketBatch(r.Source.Bucket, batch) ri.trackCurrentBucketBatch(r.Source.Bucket, batch)
globalBatchJobsMetrics.save(job.ID, ri) globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs. // persist in-memory state to disk after every 10secs.
batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) batchLogOnceIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job), job.ID+"updateAfter")
} }
} }
} }
@ -1204,7 +1204,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
for res := range slowCh { for res := range slowCh {
if res.Err != nil { if res.Err != nil {
ri.Failed = true ri.Failed = true
batchLogIf(ctx, res.Err) batchLogOnceIf(ctx, res.Err, job.ID+"res.Err")
continue continue
} }
result := res.Item result := res.Item
@ -1231,7 +1231,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
return return
} }
stopFn(result, err) stopFn(result, err)
batchLogIf(ctx, err) batchLogOnceIf(ctx, err, job.ID+"ReplicateToTarget")
success = false success = false
} else { } else {
stopFn(result, nil) stopFn(result, nil)
@ -1239,7 +1239,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
ri.trackCurrentBucketObject(r.Source.Bucket, result, success, attempts) ri.trackCurrentBucketObject(r.Source.Bucket, result, success, attempts)
globalBatchJobsMetrics.save(job.ID, ri) globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs. // persist in-memory state to disk after every 10secs.
batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) batchLogOnceIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job), job.ID+"updateAfter2")
if wait := globalBatchConfig.ReplicationWait(); wait > 0 { if wait := globalBatchConfig.ReplicationWait(); wait > 0 {
time.Sleep(wait) time.Sleep(wait)
@ -1254,10 +1254,10 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
globalBatchJobsMetrics.save(job.ID, ri) globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk. // persist in-memory state to disk.
batchLogIf(ctx, ri.updateAfter(ctx, api, 0, job)) batchLogOnceIf(ctx, ri.updateAfter(ctx, api, 0, job), job.ID+"updateAfter3")
if err := r.Notify(ctx, ri); err != nil { if err := r.Notify(ctx, ri); err != nil {
batchLogIf(ctx, fmt.Errorf("unable to notify %v", err)) batchLogOnceIf(ctx, fmt.Errorf("unable to notify %v", err), job.ID+"notify")
} }
cancel() cancel()