mirror of
https://github.com/minio/minio.git
synced 2024-12-24 22:25:54 -05:00
fix: walk() should cancel itself upon context cancellation (#15553)
This PR fixes possible leaks that may emanate from not listening on context cancelation or timeouts. ``` goroutine 60957610 [chan send, 16 minutes]: github.com/minio/minio/cmd.(*erasureServerPools).Walk.func1.1.1(...) github.com/minio/minio/cmd/erasure-server-pool.go:1724 +0x368 github.com/minio/minio/cmd.listPathRaw({0x4a9a740, 0xc0666dffc0},... github.com/minio/minio/cmd/metacache-set.go:1022 +0xfc4 github.com/minio/minio/cmd.(*erasureServerPools).Walk.func1.1() github.com/minio/minio/cmd/erasure-server-pool.go:1764 +0x528 created by github.com/minio/minio/cmd.(*erasureServerPools).Walk.func1 github.com/minio/minio/cmd/erasure-server-pool.go:1697 +0x1b7 ```
This commit is contained in:
parent
d350b666ff
commit
e9055e9ef7
@ -2053,9 +2053,9 @@ func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Walk through all object versions - note ascending order of walk needed to ensure delete marker replicated to
|
// Walk through all object versions - Walk() is always in ascending order needed to ensure
|
||||||
// target after object version is first created.
|
// delete marker replicated to target after object version is first created.
|
||||||
if err := objectAPI.Walk(ctx, bucket, "", objInfoCh, ObjectOptions{WalkAscending: true}); err != nil {
|
if err := objectAPI.Walk(ctx, bucket, "", objInfoCh, ObjectOptions{}); err != nil {
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -1871,18 +1871,17 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
|
|||||||
cancel()
|
cancel()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if opts.WalkAscending {
|
|
||||||
for i := len(fivs.Versions) - 1; i >= 0; i-- {
|
|
||||||
version := fivs.Versions[i]
|
|
||||||
versioned := vcfg != nil && vcfg.Versioned(version.Name)
|
|
||||||
|
|
||||||
results <- version.ToObjectInfo(bucket, version.Name, versioned)
|
versionsSorter(fivs.Versions).reverse()
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, version := range fivs.Versions {
|
for _, version := range fivs.Versions {
|
||||||
versioned := vcfg != nil && vcfg.Versioned(version.Name)
|
versioned := vcfg != nil && vcfg.Versioned(version.Name)
|
||||||
results <- version.ToObjectInfo(bucket, version.Name, versioned)
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case results <- version.ToObjectInfo(bucket, version.Name, versioned):
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1924,6 +1923,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
|
|||||||
|
|
||||||
if err := listPathRaw(ctx, lopts); err != nil {
|
if err := listPathRaw(ctx, lopts); err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts))
|
logger.LogIf(ctx, fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts))
|
||||||
|
cancel()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -3007,13 +3007,13 @@ func (es *erasureSingle) Walk(ctx context.Context, bucket, prefix string, result
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vcfg, _ := globalBucketVersioningSys.Get(bucket)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
go func() {
|
go func() {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer close(results)
|
defer close(results)
|
||||||
|
|
||||||
versioned := opts.Versioned || opts.VersionSuspended
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
@ -3028,15 +3028,17 @@ func (es *erasureSingle) Walk(ctx context.Context, bucket, prefix string, result
|
|||||||
cancel()
|
cancel()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if opts.WalkAscending {
|
|
||||||
for i := len(fivs.Versions) - 1; i >= 0; i-- {
|
versionsSorter(fivs.Versions).reverse()
|
||||||
version := fivs.Versions[i]
|
|
||||||
results <- version.ToObjectInfo(bucket, version.Name, versioned)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, version := range fivs.Versions {
|
for _, version := range fivs.Versions {
|
||||||
results <- version.ToObjectInfo(bucket, version.Name, versioned)
|
versioned := vcfg != nil && vcfg.Versioned(version.Name)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case results <- version.ToObjectInfo(bucket, version.Name, versioned):
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3078,6 +3080,7 @@ func (es *erasureSingle) Walk(ctx context.Context, bucket, prefix string, result
|
|||||||
|
|
||||||
if err := listPathRaw(ctx, lopts); err != nil {
|
if err := listPathRaw(ctx, lopts); err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts))
|
logger.LogIf(ctx, fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts))
|
||||||
|
cancel()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -79,7 +79,6 @@ type ObjectOptions struct {
|
|||||||
// mainly set for certain WRITE operations.
|
// mainly set for certain WRITE operations.
|
||||||
SkipDecommissioned bool
|
SkipDecommissioned bool
|
||||||
|
|
||||||
WalkAscending bool // return Walk results in ascending order of versions
|
|
||||||
PrefixEnabledFn func(prefix string) bool // function which returns true if versioning is enabled on prefix
|
PrefixEnabledFn func(prefix string) bool // function which returns true if versioning is enabled on prefix
|
||||||
|
|
||||||
// IndexCB will return any index created but the compression.
|
// IndexCB will return any index created but the compression.
|
||||||
|
Loading…
Reference in New Issue
Block a user