batch: Set a default retry attempts and a prefix (#20452)

A batch job will fail if the retry attempt is not provided. The reason
is that the code mistakenly gets the retry attempts from the job status
rather than the job yaml file.

This will also set a default empty prefix for batch expiration.

Also this will avoid trimming the prefix since the yaml decoder already
does that if no quotes were provided, and we should not trim if quotes
were provided and the user provided a leading or a trailing space.
This commit is contained in:
Anis Eleuch 2024-09-18 18:59:03 +01:00 committed by GitHub
parent 5bd27346ac
commit fa5d9c02ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 29 additions and 25 deletions

View File

@ -27,7 +27,6 @@ import (
"net/http" "net/http"
"runtime" "runtime"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio-go/v7/pkg/tags"
@ -389,9 +388,12 @@ func (oiCache objInfoCache) Get(toDel ObjectToDelete) (*ObjectInfo, bool) {
func batchObjsForDelete(ctx context.Context, r *BatchJobExpire, ri *batchJobInfo, job BatchJobRequest, api ObjectLayer, wk *workers.Workers, expireCh <-chan []expireObjInfo) { func batchObjsForDelete(ctx context.Context, r *BatchJobExpire, ri *batchJobInfo, job BatchJobRequest, api ObjectLayer, wk *workers.Workers, expireCh <-chan []expireObjInfo) {
vc, _ := globalBucketVersioningSys.Get(r.Bucket) vc, _ := globalBucketVersioningSys.Get(r.Bucket)
retryAttempts := r.Retry.Attempts retryAttempts := job.Expire.Retry.Attempts
if retryAttempts <= 0 {
retryAttempts = batchExpireJobDefaultRetries
}
delay := job.Expire.Retry.Delay delay := job.Expire.Retry.Delay
if delay == 0 { if delay <= 0 {
delay = batchExpireJobDefaultRetryDelay delay = batchExpireJobDefaultRetryDelay
} }
@ -557,9 +559,13 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
results := make(chan itemOrErr[ObjectInfo], workerSize) results := make(chan itemOrErr[ObjectInfo], workerSize)
go func() { go func() {
for _, prefix := range r.Prefix.F() { prefixes := r.Prefix.F()
if len(prefixes) == 0 {
prefixes = []string{""}
}
for _, prefix := range prefixes {
prefixResultCh := make(chan itemOrErr[ObjectInfo], workerSize) prefixResultCh := make(chan itemOrErr[ObjectInfo], workerSize)
err := api.Walk(ctx, r.Bucket, strings.TrimSpace(prefix), prefixResultCh, WalkOptions{ err := api.Walk(ctx, r.Bucket, prefix, prefixResultCh, WalkOptions{
Marker: lastObject, Marker: lastObject,
LatestOnly: false, // we need to visit all versions of the object to implement purge: retainVersions LatestOnly: false, // we need to visit all versions of the object to implement purge: retainVersions
VersionsSort: WalkVersionsSortDesc, VersionsSort: WalkVersionsSortDesc,

View File

@ -276,8 +276,12 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
} }
globalBatchJobsMetrics.save(job.ID, ri) globalBatchJobsMetrics.save(job.ID, ri)
retryAttempts := job.Replicate.Flags.Retry.Attempts
if retryAttempts <= 0 {
retryAttempts = batchReplJobDefaultRetries
}
delay := job.Replicate.Flags.Retry.Delay delay := job.Replicate.Flags.Retry.Delay
if delay == 0 { if delay <= 0 {
delay = batchReplJobDefaultRetryDelay delay = batchReplJobDefaultRetryDelay
} }
rnd := rand.New(rand.NewSource(time.Now().UnixNano())) rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
@ -377,7 +381,6 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
return err return err
} }
retryAttempts := ri.RetryAttempts
retry := false retry := false
for attempts := 1; attempts <= retryAttempts; attempts++ { for attempts := 1; attempts <= retryAttempts; attempts++ {
attempts := attempts attempts := attempts
@ -394,7 +397,7 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
} }
for _, prefix := range prefixes { for _, prefix := range prefixes {
prefixObjInfoCh := c.ListObjects(ctx, r.Source.Bucket, miniogo.ListObjectsOptions{ prefixObjInfoCh := c.ListObjects(ctx, r.Source.Bucket, miniogo.ListObjectsOptions{
Prefix: strings.TrimSpace(prefix), Prefix: prefix,
WithVersions: minioSrc, WithVersions: minioSrc,
Recursive: true, Recursive: true,
WithMetadata: true, WithMetadata: true,
@ -772,22 +775,10 @@ func (ri *batchJobInfo) loadOrInit(ctx context.Context, api ObjectLayer, job Bat
switch { switch {
case job.Replicate != nil: case job.Replicate != nil:
ri.Version = batchReplVersionV1 ri.Version = batchReplVersionV1
ri.RetryAttempts = batchReplJobDefaultRetries
if job.Replicate.Flags.Retry.Attempts > 0 {
ri.RetryAttempts = job.Replicate.Flags.Retry.Attempts
}
case job.KeyRotate != nil: case job.KeyRotate != nil:
ri.Version = batchKeyRotateVersionV1 ri.Version = batchKeyRotateVersionV1
ri.RetryAttempts = batchKeyRotateJobDefaultRetries
if job.KeyRotate.Flags.Retry.Attempts > 0 {
ri.RetryAttempts = job.KeyRotate.Flags.Retry.Attempts
}
case job.Expire != nil: case job.Expire != nil:
ri.Version = batchExpireVersionV1 ri.Version = batchExpireVersionV1
ri.RetryAttempts = batchExpireJobDefaultRetries
if job.Expire.Retry.Attempts > 0 {
ri.RetryAttempts = job.Expire.Retry.Attempts
}
} }
return nil return nil
} }
@ -1034,10 +1025,15 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
globalBatchJobsMetrics.save(job.ID, ri) globalBatchJobsMetrics.save(job.ID, ri)
lastObject := ri.Object lastObject := ri.Object
retryAttempts := job.Replicate.Flags.Retry.Attempts
if retryAttempts <= 0 {
retryAttempts = batchReplJobDefaultRetries
}
delay := job.Replicate.Flags.Retry.Delay delay := job.Replicate.Flags.Retry.Delay
if delay < time.Second { if delay <= 0 {
delay = batchReplJobDefaultRetryDelay delay = batchReplJobDefaultRetryDelay
} }
rnd := rand.New(rand.NewSource(time.Now().UnixNano())) rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
selectObj := func(info FileInfo) (ok bool) { selectObj := func(info FileInfo) (ok bool) {
@ -1125,7 +1121,6 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID) c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID)
retryAttempts := ri.RetryAttempts
retry := false retry := false
for attempts := 1; attempts <= retryAttempts; attempts++ { for attempts := 1; attempts <= retryAttempts; attempts++ {
attempts := attempts attempts := attempts
@ -1214,7 +1209,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
} }
for _, prefix := range prefixes { for _, prefix := range prefixes {
prefixWalkCh := make(chan itemOrErr[ObjectInfo], 100) prefixWalkCh := make(chan itemOrErr[ObjectInfo], 100)
if err := api.Walk(ctx, r.Source.Bucket, strings.TrimSpace(prefix), prefixWalkCh, WalkOptions{ if err := api.Walk(ctx, r.Source.Bucket, prefix, prefixWalkCh, WalkOptions{
Marker: lastObject, Marker: lastObject,
Filter: selectObj, Filter: selectObj,
AskDisks: walkQuorum, AskDisks: walkQuorum,

View File

@ -267,8 +267,12 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba
globalBatchJobsMetrics.save(job.ID, ri) globalBatchJobsMetrics.save(job.ID, ri)
lastObject := ri.Object lastObject := ri.Object
retryAttempts := job.KeyRotate.Flags.Retry.Attempts
if retryAttempts <= 0 {
retryAttempts = batchKeyRotateJobDefaultRetries
}
delay := job.KeyRotate.Flags.Retry.Delay delay := job.KeyRotate.Flags.Retry.Delay
if delay == 0 { if delay <= 0 {
delay = batchKeyRotateJobDefaultRetryDelay delay = batchKeyRotateJobDefaultRetryDelay
} }
@ -354,7 +358,6 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba
return err return err
} }
retryAttempts := ri.RetryAttempts
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
results := make(chan itemOrErr[ObjectInfo], 100) results := make(chan itemOrErr[ObjectInfo], 100)