diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 88ecec4a7..a12b36219 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -725,7 +725,7 @@ const ( batchReplJobAPIVersion = "v1" batchReplJobDefaultRetries = 3 - batchReplJobDefaultRetryDelay = 250 * time.Millisecond + batchReplJobDefaultRetryDelay = time.Second ) func getJobPath(job BatchJobRequest) string { @@ -1016,7 +1016,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba lastObject := ri.Object delay := job.Replicate.Flags.Retry.Delay - if delay == 0 { + if delay < time.Second { delay = batchReplJobDefaultRetryDelay } rnd := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -1115,7 +1115,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba slowCh = make(chan itemOrErr[ObjectInfo], 100) ) - if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() { + if r.Source.Snowball.Disable != nil && !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() { go func() { // Snowball currently needs the high level minio-go Client, not the Core one cl, err := miniogo.New(u.Host, &miniogo.Options{ @@ -1125,7 +1125,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba BucketLookup: lookupStyle(r.Target.Path), }) if err != nil { - batchLogIf(ctx, err) + batchLogOnceIf(ctx, err, job.ID+"miniogo.New") return } @@ -1136,7 +1136,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba writeFn := func(batch []ObjectInfo) { if len(batch) > 0 { if err := r.writeAsArchive(ctx, api, cl, batch); err != nil { - batchLogIf(ctx, err) + batchLogOnceIf(ctx, err, job.ID+"writeAsArchive") for _, b := range batch { slowCh <- itemOrErr[ObjectInfo]{Item: b} } @@ -1144,7 +1144,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba ri.trackCurrentBucketBatch(r.Source.Bucket, batch) globalBatchJobsMetrics.save(job.ID, ri) // persist in-memory state to disk after every 10secs. - batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) + batchLogOnceIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job), job.ID+"updateAfter") } } } @@ -1204,7 +1204,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba for res := range slowCh { if res.Err != nil { ri.Failed = true - batchLogIf(ctx, res.Err) + batchLogOnceIf(ctx, res.Err, job.ID+"res.Err") continue } result := res.Item @@ -1231,7 +1231,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba return } stopFn(result, err) - batchLogIf(ctx, err) + batchLogOnceIf(ctx, err, job.ID+"ReplicateToTarget") success = false } else { stopFn(result, nil) @@ -1239,7 +1239,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba ri.trackCurrentBucketObject(r.Source.Bucket, result, success, attempts) globalBatchJobsMetrics.save(job.ID, ri) // persist in-memory state to disk after every 10secs. - batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) + batchLogOnceIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job), job.ID+"updateAfter2") if wait := globalBatchConfig.ReplicationWait(); wait > 0 { time.Sleep(wait) @@ -1254,10 +1254,10 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba globalBatchJobsMetrics.save(job.ID, ri) // persist in-memory state to disk. - batchLogIf(ctx, ri.updateAfter(ctx, api, 0, job)) + batchLogOnceIf(ctx, ri.updateAfter(ctx, api, 0, job), job.ID+"updateAfter3") if err := r.Notify(ctx, ri); err != nil { - batchLogIf(ctx, fmt.Errorf("unable to notify %v", err)) + batchLogOnceIf(ctx, fmt.Errorf("unable to notify %v", err), job.ID+"notify") } cancel()