mirror of
https://github.com/minio/minio.git
synced 2025-04-19 10:07:30 -04:00
Return error from mergeEntryChannels (#19970)
- Add error from mergeEntryChannels to `results.` - Make sure we check the context error before we close the channel.
This commit is contained in:
parent
dfab400d43
commit
2d7a3d1516
@ -2082,7 +2082,6 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts
|
|||||||
|
|
||||||
// Walk a bucket, optionally prefix recursively, until we have returned
|
// Walk a bucket, optionally prefix recursively, until we have returned
|
||||||
// all the contents of the provided bucket+prefix.
|
// all the contents of the provided bucket+prefix.
|
||||||
// TODO: Note that most errors will result in a truncated listing.
|
|
||||||
func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, results chan<- itemOrErr[ObjectInfo], opts WalkOptions) error {
|
func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, results chan<- itemOrErr[ObjectInfo], opts WalkOptions) error {
|
||||||
if err := checkListObjsArgs(ctx, bucket, prefix, ""); err != nil {
|
if err := checkListObjsArgs(ctx, bucket, prefix, ""); err != nil {
|
||||||
xioutil.SafeClose(results)
|
xioutil.SafeClose(results)
|
||||||
@ -2199,9 +2198,8 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
|
|||||||
// Convert and filter merged entries.
|
// Convert and filter merged entries.
|
||||||
merged := make(chan metaCacheEntry, 100)
|
merged := make(chan metaCacheEntry, 100)
|
||||||
vcfg, _ := globalBucketVersioningSys.Get(bucket)
|
vcfg, _ := globalBucketVersioningSys.Get(bucket)
|
||||||
|
errCh := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
defer cancelCause(nil)
|
|
||||||
defer xioutil.SafeClose(results)
|
|
||||||
sentErr := false
|
sentErr := false
|
||||||
sendErr := func(err error) {
|
sendErr := func(err error) {
|
||||||
if !sentErr {
|
if !sentErr {
|
||||||
@ -2212,6 +2210,15 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
defer func() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
sendErr(ctx.Err())
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
xioutil.SafeClose(results)
|
||||||
|
cancelCause(nil)
|
||||||
|
}()
|
||||||
send := func(oi ObjectInfo) bool {
|
send := func(oi ObjectInfo) bool {
|
||||||
select {
|
select {
|
||||||
case results <- itemOrErr[ObjectInfo]{Item: oi}:
|
case results <- itemOrErr[ObjectInfo]{Item: oi}:
|
||||||
@ -2266,12 +2273,16 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if err := <-errCh; err != nil {
|
||||||
|
sendErr(err)
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
go func() {
|
go func() {
|
||||||
|
defer close(errCh)
|
||||||
// Merge all entries from all disks.
|
// Merge all entries from all disks.
|
||||||
// We leave quorum at 1, since entries are already resolved to have the desired quorum.
|
// We leave quorum at 1, since entries are already resolved to have the desired quorum.
|
||||||
// mergeEntryChannels will close 'merged' channel upon completion or cancellation.
|
// mergeEntryChannels will close 'merged' channel upon completion or cancellation.
|
||||||
storageLogIf(ctx, mergeEntryChannels(ctx, entries, merged, 1))
|
errCh <- mergeEntryChannels(ctx, entries, merged, 1)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
x
Reference in New Issue
Block a user