From fa5d9c02efceb09b89a51d50a7995ec6f05be916 Mon Sep 17 00:00:00 2001 From: Anis Eleuch Date: Wed, 18 Sep 2024 18:59:03 +0100 Subject: [PATCH] batch: Set a default retry attempts and a prefix (#20452) A batch job will fail if the retry attempt is not provided. The reason is that the code mistakenly gets the retry attempts from the job status rather than the job yaml file. This will also set a default empty prefix for batch expiration. Also this will avoid trimming the prefix since the yaml decoder already does that if no quotes were provided, and we should not trim if quotes were provided and the user provided a leading or a trailing space. --- cmd/batch-expire.go | 16 +++++++++++----- cmd/batch-handlers.go | 31 +++++++++++++------------------ cmd/batch-rotate.go | 7 +++++-- 3 files changed, 29 insertions(+), 25 deletions(-) diff --git a/cmd/batch-expire.go b/cmd/batch-expire.go index 7c7d1e1fb..619b3b5ec 100644 --- a/cmd/batch-expire.go +++ b/cmd/batch-expire.go @@ -27,7 +27,6 @@ import ( "net/http" "runtime" "strconv" - "strings" "time" "github.com/minio/minio-go/v7/pkg/tags" @@ -389,9 +388,12 @@ func (oiCache objInfoCache) Get(toDel ObjectToDelete) (*ObjectInfo, bool) { func batchObjsForDelete(ctx context.Context, r *BatchJobExpire, ri *batchJobInfo, job BatchJobRequest, api ObjectLayer, wk *workers.Workers, expireCh <-chan []expireObjInfo) { vc, _ := globalBucketVersioningSys.Get(r.Bucket) - retryAttempts := r.Retry.Attempts + retryAttempts := job.Expire.Retry.Attempts + if retryAttempts <= 0 { + retryAttempts = batchExpireJobDefaultRetries + } delay := job.Expire.Retry.Delay - if delay == 0 { + if delay <= 0 { delay = batchExpireJobDefaultRetryDelay } @@ -557,9 +559,13 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo results := make(chan itemOrErr[ObjectInfo], workerSize) go func() { - for _, prefix := range r.Prefix.F() { + prefixes := r.Prefix.F() + if len(prefixes) == 0 { + prefixes = []string{""} + } + for _, prefix := range prefixes { prefixResultCh := make(chan itemOrErr[ObjectInfo], workerSize) - err := api.Walk(ctx, r.Bucket, strings.TrimSpace(prefix), prefixResultCh, WalkOptions{ + err := api.Walk(ctx, r.Bucket, prefix, prefixResultCh, WalkOptions{ Marker: lastObject, LatestOnly: false, // we need to visit all versions of the object to implement purge: retainVersions VersionsSort: WalkVersionsSortDesc, diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index d129d09e6..874352136 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -276,8 +276,12 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay } globalBatchJobsMetrics.save(job.ID, ri) + retryAttempts := job.Replicate.Flags.Retry.Attempts + if retryAttempts <= 0 { + retryAttempts = batchReplJobDefaultRetries + } delay := job.Replicate.Flags.Retry.Delay - if delay == 0 { + if delay <= 0 { delay = batchReplJobDefaultRetryDelay } rnd := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -377,7 +381,6 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay return err } - retryAttempts := ri.RetryAttempts retry := false for attempts := 1; attempts <= retryAttempts; attempts++ { attempts := attempts @@ -394,7 +397,7 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay } for _, prefix := range prefixes { prefixObjInfoCh := c.ListObjects(ctx, r.Source.Bucket, miniogo.ListObjectsOptions{ - Prefix: strings.TrimSpace(prefix), + Prefix: prefix, WithVersions: minioSrc, Recursive: true, WithMetadata: true, @@ -772,22 +775,10 @@ func (ri *batchJobInfo) loadOrInit(ctx context.Context, api ObjectLayer, job Bat switch { case job.Replicate != nil: ri.Version = batchReplVersionV1 - ri.RetryAttempts = batchReplJobDefaultRetries - if job.Replicate.Flags.Retry.Attempts > 0 { - ri.RetryAttempts = job.Replicate.Flags.Retry.Attempts - } case job.KeyRotate != nil: ri.Version = batchKeyRotateVersionV1 - ri.RetryAttempts = batchKeyRotateJobDefaultRetries - if job.KeyRotate.Flags.Retry.Attempts > 0 { - ri.RetryAttempts = job.KeyRotate.Flags.Retry.Attempts - } case job.Expire != nil: ri.Version = batchExpireVersionV1 - ri.RetryAttempts = batchExpireJobDefaultRetries - if job.Expire.Retry.Attempts > 0 { - ri.RetryAttempts = job.Expire.Retry.Attempts - } } return nil } @@ -1034,10 +1025,15 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba globalBatchJobsMetrics.save(job.ID, ri) lastObject := ri.Object + retryAttempts := job.Replicate.Flags.Retry.Attempts + if retryAttempts <= 0 { + retryAttempts = batchReplJobDefaultRetries + } delay := job.Replicate.Flags.Retry.Delay - if delay < time.Second { + if delay <= 0 { delay = batchReplJobDefaultRetryDelay } + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) selectObj := func(info FileInfo) (ok bool) { @@ -1125,7 +1121,6 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID) - retryAttempts := ri.RetryAttempts retry := false for attempts := 1; attempts <= retryAttempts; attempts++ { attempts := attempts @@ -1214,7 +1209,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba } for _, prefix := range prefixes { prefixWalkCh := make(chan itemOrErr[ObjectInfo], 100) - if err := api.Walk(ctx, r.Source.Bucket, strings.TrimSpace(prefix), prefixWalkCh, WalkOptions{ + if err := api.Walk(ctx, r.Source.Bucket, prefix, prefixWalkCh, WalkOptions{ Marker: lastObject, Filter: selectObj, AskDisks: walkQuorum, diff --git a/cmd/batch-rotate.go b/cmd/batch-rotate.go index 4bb0a9384..24414e270 100644 --- a/cmd/batch-rotate.go +++ b/cmd/batch-rotate.go @@ -267,8 +267,12 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba globalBatchJobsMetrics.save(job.ID, ri) lastObject := ri.Object + retryAttempts := job.KeyRotate.Flags.Retry.Attempts + if retryAttempts <= 0 { + retryAttempts = batchKeyRotateJobDefaultRetries + } delay := job.KeyRotate.Flags.Retry.Delay - if delay == 0 { + if delay <= 0 { delay = batchKeyRotateJobDefaultRetryDelay } @@ -354,7 +358,6 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba return err } - retryAttempts := ri.RetryAttempts ctx, cancel := context.WithCancel(ctx) results := make(chan itemOrErr[ObjectInfo], 100)