diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 69f6d84f0..2299f8d2d 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -2082,7 +2082,6 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts // Walk a bucket, optionally prefix recursively, until we have returned // all the contents of the provided bucket+prefix. -// TODO: Note that most errors will result in a truncated listing. func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, results chan<- itemOrErr[ObjectInfo], opts WalkOptions) error { if err := checkListObjsArgs(ctx, bucket, prefix, ""); err != nil { xioutil.SafeClose(results) @@ -2199,9 +2198,8 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re // Convert and filter merged entries. merged := make(chan metaCacheEntry, 100) vcfg, _ := globalBucketVersioningSys.Get(bucket) + errCh := make(chan error, 1) go func() { - defer cancelCause(nil) - defer xioutil.SafeClose(results) sentErr := false sendErr := func(err error) { if !sentErr { @@ -2212,6 +2210,15 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re } } } + defer func() { + select { + case <-ctx.Done(): + sendErr(ctx.Err()) + default: + } + xioutil.SafeClose(results) + cancelCause(nil) + }() send := func(oi ObjectInfo) bool { select { case results <- itemOrErr[ObjectInfo]{Item: oi}: @@ -2266,12 +2273,16 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re } } } + if err := <-errCh; err != nil { + sendErr(err) + } }() go func() { + defer close(errCh) // Merge all entries from all disks. // We leave quorum at 1, since entries are already resolved to have the desired quorum. // mergeEntryChannels will close 'merged' channel upon completion or cancellation. - storageLogIf(ctx, mergeEntryChannels(ctx, entries, merged, 1)) + errCh <- mergeEntryChannels(ctx, entries, merged, 1) }() return nil