From 847ee5ac452b265f8acb2cf5280130adb1c9aa35 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Mon, 6 May 2024 13:27:52 -0700 Subject: [PATCH] Make WalkDir return errors (#19677) If used, 'opts.Marker` will cause many missed entries since results are returned unsorted, and pools are serialized. Switch to fully concurrent listing and merging across pools to return sorted entries. --- cmd/batch-expire.go | 37 ++++++++++++--------- cmd/batch-handlers.go | 64 ++++++++++++++++++++++++++----------- cmd/batch-handlers_gen.go | 35 +++++++++++++++++--- cmd/batch-rotate.go | 21 ++++++------ cmd/bucket-replication.go | 23 +++++++++---- cmd/erasure-server-pool.go | 31 ++++++++++++------ cmd/iam-object-store.go | 25 ++++++++------- cmd/object-api-interface.go | 2 +- cmd/utils.go | 6 ++++ 9 files changed, 169 insertions(+), 75 deletions(-) diff --git a/cmd/batch-expire.go b/cmd/batch-expire.go index 5128f5da4..9d86d6def 100644 --- a/cmd/batch-expire.go +++ b/cmd/batch-expire.go @@ -472,17 +472,17 @@ func batchObjsForDelete(ctx context.Context, r *BatchJobExpire, ri *batchJobInfo stopFn(toDelCopy[i], err) batchLogIf(ctx, fmt.Errorf("Failed to expire %s/%s versionID=%s due to %v (attempts=%d)", ri.Bucket, toDelCopy[i].ObjectName, toDelCopy[i].VersionID, err, attempts)) failed++ - if attempts == retryAttempts { // all retry attempts failed, record failure - if oi, ok := oiCache.Get(toDelCopy[i]); ok { - ri.trackCurrentBucketObject(r.Bucket, *oi, false) - } - } else { + if oi, ok := oiCache.Get(toDelCopy[i]); ok { + ri.trackCurrentBucketObject(r.Bucket, *oi, false, attempts) + } + if attempts != retryAttempts { + // retry toDel = append(toDel, toDelCopy[i]) } } else { stopFn(toDelCopy[i], nil) if oi, ok := oiCache.Get(toDelCopy[i]); ok { - ri.trackCurrentBucketObject(r.Bucket, *oi, true) + ri.trackCurrentBucketObject(r.Bucket, *oi, true, attempts) } } } @@ -537,7 +537,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo ctx, cancel := context.WithCancel(ctx) defer cancel() - results := make(chan ObjectInfo, workerSize) + results := make(chan itemOrErr[ObjectInfo], workerSize) if err := api.Walk(ctx, r.Bucket, r.Prefix, results, WalkOptions{ Marker: lastObject, LatestOnly: false, // we need to visit all versions of the object to implement purge: retainVersions @@ -584,11 +584,18 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo versionsCount int toDel []expireObjInfo ) + failed := true for result := range results { + if result.Err != nil { + failed = true + batchLogIf(ctx, result.Err) + continue + } + // Apply filter to find the matching rule to apply expiry // actions accordingly. // nolint:gocritic - if result.IsLatest { + if result.Item.IsLatest { // send down filtered entries to be deleted using // DeleteObjects method if len(toDel) > 10 { // batch up to 10 objects/versions to be expired simultaneously. @@ -609,7 +616,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo var match BatchJobExpireFilter var found bool for _, rule := range r.Rules { - if rule.Matches(result, now) { + if rule.Matches(result.Item, now) { match = rule found = true break @@ -619,18 +626,18 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo continue } - prevObj = result + prevObj = result.Item matchedFilter = match versionsCount = 1 // Include the latest version if matchedFilter.Purge.RetainVersions == 0 { toDel = append(toDel, expireObjInfo{ - ObjectInfo: result, + ObjectInfo: result.Item, ExpireAll: true, }) continue } - } else if prevObj.Name == result.Name { + } else if prevObj.Name == result.Item.Name { if matchedFilter.Purge.RetainVersions == 0 { continue // including latest version in toDel suffices, skipping other versions } @@ -643,7 +650,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo continue // retain versions } toDel = append(toDel, expireObjInfo{ - ObjectInfo: result, + ObjectInfo: result.Item, }) } // Send any remaining objects downstream @@ -658,8 +665,8 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo <-expireDoneCh // waits for the expire goroutine to complete wk.Wait() // waits for all expire workers to retire - ri.Complete = ri.ObjectsFailed == 0 - ri.Failed = ri.ObjectsFailed > 0 + ri.Complete = !failed && ri.ObjectsFailed == 0 + ri.Failed = failed || ri.ObjectsFailed > 0 globalBatchJobsMetrics.save(job.ID, ri) // Close the saverQuitCh - this also triggers saving in-memory state diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 12bf67ac6..e3a4d2016 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -447,7 +447,7 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay } else { stopFn(oi, nil) } - ri.trackCurrentBucketObject(r.Target.Bucket, oi, success) + ri.trackCurrentBucketObject(r.Target.Bucket, oi, success, attempts) globalBatchJobsMetrics.save(job.ID, ri) // persist in-memory state to disk after every 10secs. batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) @@ -690,6 +690,7 @@ type batchJobInfo struct { StartTime time.Time `json:"startTime" msg:"st"` LastUpdate time.Time `json:"lastUpdate" msg:"lu"` RetryAttempts int `json:"retryAttempts" msg:"ra"` + Attempts int `json:"attempts" msg:"at"` Complete bool `json:"complete" msg:"cmp"` Failed bool `json:"failed" msg:"fld"` @@ -833,13 +834,15 @@ func (ri *batchJobInfo) clone() *batchJobInfo { ObjectsFailed: ri.ObjectsFailed, BytesTransferred: ri.BytesTransferred, BytesFailed: ri.BytesFailed, + Attempts: ri.Attempts, } } -func (ri *batchJobInfo) countItem(size int64, dmarker, success bool) { +func (ri *batchJobInfo) countItem(size int64, dmarker, success bool, attempt int) { if ri == nil { return } + ri.Attempts++ if success { if dmarker { ri.DeleteMarkers++ @@ -847,7 +850,19 @@ func (ri *batchJobInfo) countItem(size int64, dmarker, success bool) { ri.Objects++ ri.BytesTransferred += size } + if attempt > 1 { + if dmarker { + ri.DeleteMarkersFailed-- + } else { + ri.ObjectsFailed-- + ri.BytesFailed += size + } + } } else { + if attempt > 1 { + // Only count first attempt + return + } if dmarker { ri.DeleteMarkersFailed++ } else { @@ -921,7 +936,7 @@ func (ri *batchJobInfo) trackMultipleObjectVersions(bucket string, info ObjectIn } } -func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, success bool) { +func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, success bool, attempt int) { if ri == nil { return } @@ -931,7 +946,7 @@ func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, ri.Bucket = bucket ri.Object = info.Name - ri.countItem(info.Size, info.DeleteMarker, success) + ri.countItem(info.Size, info.DeleteMarker, success, attempt) } func (ri *batchJobInfo) trackCurrentBucketBatch(bucket string, batch []ObjectInfo) { @@ -945,7 +960,7 @@ func (ri *batchJobInfo) trackCurrentBucketBatch(bucket string, batch []ObjectInf ri.Bucket = bucket for i := range batch { ri.Object = batch[i].Name - ri.countItem(batch[i].Size, batch[i].DeleteMarker, true) + ri.countItem(batch[i].Size, batch[i].DeleteMarker, true, 1) } } @@ -1057,8 +1072,8 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID) var ( - walkCh = make(chan ObjectInfo, 100) - slowCh = make(chan ObjectInfo, 100) + walkCh = make(chan itemOrErr[ObjectInfo], 100) + slowCh = make(chan itemOrErr[ObjectInfo], 100) ) if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() { @@ -1084,7 +1099,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba if err := r.writeAsArchive(ctx, api, cl, batch); err != nil { batchLogIf(ctx, err) for _, b := range batch { - slowCh <- b + slowCh <- itemOrErr[ObjectInfo]{Item: b} } } else { ri.trackCurrentBucketBatch(r.Source.Bucket, batch) @@ -1095,12 +1110,12 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba } } for obj := range walkCh { - if obj.DeleteMarker || !obj.VersionPurgeStatus.Empty() || obj.Size >= int64(smallerThan) { + if obj.Item.DeleteMarker || !obj.Item.VersionPurgeStatus.Empty() || obj.Item.Size >= int64(smallerThan) { slowCh <- obj continue } - batch = append(batch, obj) + batch = append(batch, obj.Item) if len(batch) < *r.Source.Snowball.Batch { continue @@ -1153,8 +1168,13 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba prevObj := "" skipReplicate := false - for result := range slowCh { - result := result + for res := range slowCh { + if res.Err != nil { + ri.Failed = true + batchLogIf(ctx, res.Err) + continue + } + result := res.Item if result.Name != prevObj { prevObj = result.Name skipReplicate = result.DeleteMarker && s3Type @@ -1183,7 +1203,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba } else { stopFn(result, nil) } - ri.trackCurrentBucketObject(r.Source.Bucket, result, success) + ri.trackCurrentBucketObject(r.Source.Bucket, result, success, attempts) globalBatchJobsMetrics.save(job.ID, ri) // persist in-memory state to disk after every 10secs. batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) @@ -1484,7 +1504,7 @@ func (a adminAPIHandlers) ListBatchJobs(w http.ResponseWriter, r *http.Request) jobType = string(madmin.BatchJobReplicate) } - resultCh := make(chan ObjectInfo) + resultCh := make(chan itemOrErr[ObjectInfo]) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -1496,8 +1516,12 @@ func (a adminAPIHandlers) ListBatchJobs(w http.ResponseWriter, r *http.Request) listResult := madmin.ListBatchJobsResult{} for result := range resultCh { + if result.Err != nil { + writeErrorResponseJSON(ctx, w, toAPIError(ctx, result.Err), r.URL) + return + } req := &BatchJobRequest{} - if err := req.load(ctx, objectAPI, result.Name); err != nil { + if err := req.load(ctx, objectAPI, result.Item.Name); err != nil { if !errors.Is(err, errNoSuchJob) { batchLogIf(ctx, err) } @@ -1702,7 +1726,7 @@ func newBatchJobPool(ctx context.Context, o ObjectLayer, workers int) *BatchJobP } func (j *BatchJobPool) resume() { - results := make(chan ObjectInfo, 100) + results := make(chan itemOrErr[ObjectInfo], 100) ctx, cancel := context.WithCancel(j.ctx) defer cancel() if err := j.objLayer.Walk(ctx, minioMetaBucket, batchJobPrefix, results, WalkOptions{}); err != nil { @@ -1710,12 +1734,16 @@ func (j *BatchJobPool) resume() { return } for result := range results { + if result.Err != nil { + batchLogIf(j.ctx, result.Err) + continue + } // ignore batch-replicate.bin and batch-rotate.bin entries - if strings.HasSuffix(result.Name, slashSeparator) { + if strings.HasSuffix(result.Item.Name, slashSeparator) { continue } req := &BatchJobRequest{} - if err := req.load(ctx, j.objLayer, result.Name); err != nil { + if err := req.load(ctx, j.objLayer, result.Item.Name); err != nil { batchLogIf(ctx, err) continue } diff --git a/cmd/batch-handlers_gen.go b/cmd/batch-handlers_gen.go index 09d3cec12..4cae65173 100644 --- a/cmd/batch-handlers_gen.go +++ b/cmd/batch-handlers_gen.go @@ -419,6 +419,12 @@ func (z *batchJobInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "RetryAttempts") return } + case "at": + z.Attempts, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "Attempts") + return + } case "cmp": z.Complete, err = dc.ReadBool() if err != nil { @@ -492,9 +498,9 @@ func (z *batchJobInfo) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *batchJobInfo) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 16 + // map header, size 17 // write "v" - err = en.Append(0xde, 0x0, 0x10, 0xa1, 0x76) + err = en.Append(0xde, 0x0, 0x11, 0xa1, 0x76) if err != nil { return } @@ -553,6 +559,16 @@ func (z *batchJobInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "RetryAttempts") return } + // write "at" + err = en.Append(0xa2, 0x61, 0x74) + if err != nil { + return + } + err = en.WriteInt(z.Attempts) + if err != nil { + err = msgp.WrapError(err, "Attempts") + return + } // write "cmp" err = en.Append(0xa3, 0x63, 0x6d, 0x70) if err != nil { @@ -659,9 +675,9 @@ func (z *batchJobInfo) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *batchJobInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 16 + // map header, size 17 // string "v" - o = append(o, 0xde, 0x0, 0x10, 0xa1, 0x76) + o = append(o, 0xde, 0x0, 0x11, 0xa1, 0x76) o = msgp.AppendInt(o, z.Version) // string "jid" o = append(o, 0xa3, 0x6a, 0x69, 0x64) @@ -678,6 +694,9 @@ func (z *batchJobInfo) MarshalMsg(b []byte) (o []byte, err error) { // string "ra" o = append(o, 0xa2, 0x72, 0x61) o = msgp.AppendInt(o, z.RetryAttempts) + // string "at" + o = append(o, 0xa2, 0x61, 0x74) + o = msgp.AppendInt(o, z.Attempts) // string "cmp" o = append(o, 0xa3, 0x63, 0x6d, 0x70) o = msgp.AppendBool(o, z.Complete) @@ -765,6 +784,12 @@ func (z *batchJobInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "RetryAttempts") return } + case "at": + z.Attempts, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Attempts") + return + } case "cmp": z.Complete, bts, err = msgp.ReadBoolBytes(bts) if err != nil { @@ -839,6 +864,6 @@ func (z *batchJobInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *batchJobInfo) Msgsize() (s int) { - s = 3 + 2 + msgp.IntSize + 4 + msgp.StringPrefixSize + len(z.JobID) + 3 + msgp.StringPrefixSize + len(z.JobType) + 3 + msgp.TimeSize + 3 + msgp.TimeSize + 3 + msgp.IntSize + 4 + msgp.BoolSize + 4 + msgp.BoolSize + 5 + msgp.StringPrefixSize + len(z.Bucket) + 5 + msgp.StringPrefixSize + len(z.Object) + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 4 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size + s = 3 + 2 + msgp.IntSize + 4 + msgp.StringPrefixSize + len(z.JobID) + 3 + msgp.StringPrefixSize + len(z.JobType) + 3 + msgp.TimeSize + 3 + msgp.TimeSize + 3 + msgp.IntSize + 3 + msgp.IntSize + 4 + msgp.BoolSize + 4 + msgp.BoolSize + 5 + msgp.StringPrefixSize + len(z.Bucket) + 5 + msgp.StringPrefixSize + len(z.Object) + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 4 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size return } diff --git a/cmd/batch-rotate.go b/cmd/batch-rotate.go index b8d6dc2cc..9ba7587ff 100644 --- a/cmd/batch-rotate.go +++ b/cmd/batch-rotate.go @@ -356,7 +356,7 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba retryAttempts := ri.RetryAttempts ctx, cancel := context.WithCancel(ctx) - results := make(chan ObjectInfo, 100) + results := make(chan itemOrErr[ObjectInfo], 100) if err := api.Walk(ctx, r.Bucket, r.Prefix, results, WalkOptions{ Marker: lastObject, Filter: selectObj, @@ -365,9 +365,14 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba // Do not need to retry if we can't list objects on source. return err } - - for result := range results { - result := result + failed := false + for res := range results { + if res.Err != nil { + failed = true + batchLogIf(ctx, res.Err) + break + } + result := res.Item sseKMS := crypto.S3KMS.IsEncrypted(result.UserDefined) sseS3 := crypto.S3.IsEncrypted(result.UserDefined) if !sseKMS && !sseS3 { // neither sse-s3 nor sse-kms disallowed @@ -377,7 +382,6 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba go func() { defer wk.Give() for attempts := 1; attempts <= retryAttempts; attempts++ { - attempts := attempts stopFn := globalBatchJobsMetrics.trace(batchJobMetricKeyRotation, job.ID, attempts) success := true if err := r.KeyRotate(ctx, api, result); err != nil { @@ -387,8 +391,7 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba } else { stopFn(result, nil) } - ri.trackCurrentBucketObject(r.Bucket, result, success) - ri.RetryAttempts = attempts + ri.trackCurrentBucketObject(r.Bucket, result, success, attempts) globalBatchJobsMetrics.save(job.ID, ri) // persist in-memory state to disk after every 10secs. batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) @@ -407,8 +410,8 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba } wk.Wait() - ri.Complete = ri.ObjectsFailed == 0 - ri.Failed = ri.ObjectsFailed > 0 + ri.Complete = !failed && ri.ObjectsFailed == 0 + ri.Failed = failed || ri.ObjectsFailed > 0 globalBatchJobsMetrics.save(job.ID, ri) // persist in-memory state to disk. batchLogIf(ctx, ri.updateAfter(ctx, api, 0, job)) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 9593d89ae..eb30a5810 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -2732,7 +2732,7 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object s.workerCh <- struct{}{} }() // Allocate new results channel to receive ObjectInfo. - objInfoCh := make(chan ObjectInfo) + objInfoCh := make(chan itemOrErr[ObjectInfo]) cfg, err := getReplicationConfig(ctx, opts.bucket) if err != nil { replLogIf(ctx, fmt.Errorf("replication resync of %s for arn %s failed with %w", opts.bucket, opts.arn, err)) @@ -2867,7 +2867,12 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object } }(ctx, i) } - for obj := range objInfoCh { + for res := range objInfoCh { + if res.Err != nil { + resyncStatus = ResyncFailed + replLogIf(ctx, res.Err) + return + } select { case <-s.resyncCancelCh: resyncStatus = ResyncCanceled @@ -2876,11 +2881,11 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object return default: } - if heal && lastCheckpoint != "" && lastCheckpoint != obj.Name { + if heal && lastCheckpoint != "" && lastCheckpoint != res.Item.Name { continue } lastCheckpoint = "" - roi := getHealReplicateObjectInfo(obj, rcfg) + roi := getHealReplicateObjectInfo(res.Item, rcfg) if !roi.ExistingObjResync.mustResync() { continue } @@ -3140,7 +3145,7 @@ func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, return nil, err } - objInfoCh := make(chan ObjectInfo, 10) + objInfoCh := make(chan itemOrErr[ObjectInfo], 10) if err := objAPI.Walk(ctx, bucket, opts.Prefix, objInfoCh, WalkOptions{}); err != nil { replLogIf(ctx, err) return nil, err @@ -3152,11 +3157,17 @@ func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, diffCh := make(chan madmin.DiffInfo, 4000) go func() { defer xioutil.SafeClose(diffCh) - for obj := range objInfoCh { + for res := range objInfoCh { + if res.Err != nil { + diffCh <- madmin.DiffInfo{Err: res.Err} + return + } if contextCanceled(ctx) { // Just consume input... continue } + obj := res.Item + // Ignore object prefixes which are excluded // from versioning via the MinIO bucket versioning extension. if globalBucketVersioningSys.PrefixSuspended(bucket, obj.Name) { diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 4a1277e1e..1d2192d0e 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -2045,13 +2045,13 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts // Walk a bucket, optionally prefix recursively, until we have returned // all the contents of the provided bucket+prefix. // TODO: Note that most errors will result in a truncated listing. -func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts WalkOptions) error { +func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, results chan<- itemOrErr[ObjectInfo], opts WalkOptions) error { if err := checkListObjsArgs(ctx, bucket, prefix, ""); err != nil { xioutil.SafeClose(results) return err } - - ctx, cancel := context.WithCancel(ctx) + parentCtx := ctx + ctx, cancelCause := context.WithCancelCause(ctx) var entries []chan metaCacheEntry for poolIdx, erasureSet := range z.serverPools { @@ -2062,8 +2062,9 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re disks, infos, _ := set.getOnlineDisksWithHealingAndInfo(true) if len(disks) == 0 { xioutil.SafeClose(results) - cancel() - return fmt.Errorf("Walk: no online disks found in pool %d, set %d", setIdx, poolIdx) + err := fmt.Errorf("Walk: no online disks found in pool %d, set %d", setIdx, poolIdx) + cancelCause(err) + return err } go func() { defer xioutil.SafeClose(listOut) @@ -2150,8 +2151,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re } if err := listPathRaw(ctx, lopts); err != nil { - storageLogIf(ctx, fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts)) - cancel() + cancelCause(fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts)) return } }() @@ -2162,13 +2162,24 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re merged := make(chan metaCacheEntry, 100) vcfg, _ := globalBucketVersioningSys.Get(bucket) go func() { - defer cancel() + defer cancelCause(nil) defer xioutil.SafeClose(results) + sentErr := false + sendErr := func(err error) { + if !sentErr { + select { + case results <- itemOrErr[ObjectInfo]{Err: err}: + sentErr = true + case <-parentCtx.Done(): + } + } + } send := func(oi ObjectInfo) bool { select { - case results <- oi: + case results <- itemOrErr[ObjectInfo]{Item: oi}: return true case <-ctx.Done(): + sendErr(context.Cause(ctx)) return false } } @@ -2176,6 +2187,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re if opts.LatestOnly { fi, err := entry.fileInfo(bucket) if err != nil { + sendErr(err) return } if opts.Filter != nil { @@ -2193,6 +2205,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re } fivs, err := entry.fileInfoVersions(bucket) if err != nil { + sendErr(err) return } diff --git a/cmd/iam-object-store.go b/cmd/iam-object-store.go index 53cf2cdee..e7cc954d1 100644 --- a/cmd/iam-object-store.go +++ b/cmd/iam-object-store.go @@ -645,36 +645,37 @@ func (iamOS *IAMObjectStore) deleteGroupInfo(ctx context.Context, name string) e return err } -// helper type for listIAMConfigItems -type itemOrErr struct { - Item string - Err error -} - // Lists objects in the minioMetaBucket at the given path prefix. All returned // items have the pathPrefix removed from their names. -func listIAMConfigItems(ctx context.Context, objAPI ObjectLayer, pathPrefix string) <-chan itemOrErr { - ch := make(chan itemOrErr) +func listIAMConfigItems(ctx context.Context, objAPI ObjectLayer, pathPrefix string) <-chan itemOrErr[string] { + ch := make(chan itemOrErr[string]) go func() { defer xioutil.SafeClose(ch) // Allocate new results channel to receive ObjectInfo. - objInfoCh := make(chan ObjectInfo) + objInfoCh := make(chan itemOrErr[ObjectInfo]) if err := objAPI.Walk(ctx, minioMetaBucket, pathPrefix, objInfoCh, WalkOptions{}); err != nil { select { - case ch <- itemOrErr{Err: err}: + case ch <- itemOrErr[string]{Err: err}: case <-ctx.Done(): } return } for obj := range objInfoCh { - item := strings.TrimPrefix(obj.Name, pathPrefix) + if obj.Err != nil { + select { + case ch <- itemOrErr[string]{Err: obj.Err}: + case <-ctx.Done(): + return + } + } + item := strings.TrimPrefix(obj.Item.Name, pathPrefix) item = strings.TrimSuffix(item, SlashSeparator) select { - case ch <- itemOrErr{Item: item}: + case ch <- itemOrErr[string]{Item: item}: case <-ctx.Done(): return } diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 5e0945a6d..8960b6ca6 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -257,7 +257,7 @@ type ObjectLayer interface { ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) ListObjectVersions(ctx context.Context, bucket, prefix, marker, versionMarker, delimiter string, maxKeys int) (result ListObjectVersionsInfo, err error) // Walk lists all objects including versions, delete markers. - Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts WalkOptions) error + Walk(ctx context.Context, bucket, prefix string, results chan<- itemOrErr[ObjectInfo], opts WalkOptions) error // Object operations. diff --git a/cmd/utils.go b/cmd/utils.go index 1eacc3419..037b7ce27 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -1136,3 +1136,9 @@ func sleepContext(ctx context.Context, d time.Duration) error { } return nil } + +// helper type to return either item or error. +type itemOrErr[V any] struct { + Item V + Err error +}