mirror of
https://github.com/minio/minio.git
synced 2025-04-21 11:04:20 -04:00
Fix listPathRaw/WalkDir cancelation (#11905)
In #11888 we observe a lot of running, WalkDir calls. There doesn't appear to be any listerners for these calls, so they should be aborted. Ensure that WalkDir aborts when upstream cancels the request. Fixes #11888
This commit is contained in:
parent
8d5456c15a
commit
9efcb9e15c
@ -428,3 +428,13 @@ func getTLSConfig() (x509Certs []*x509.Certificate, manager *certs.Manager, secu
|
|||||||
secureConn = true
|
secureConn = true
|
||||||
return x509Certs, manager, secureConn, nil
|
return x509Certs, manager, secureConn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// contextCanceled returns whether a context is canceled.
|
||||||
|
func contextCanceled(ctx context.Context) bool {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -811,6 +811,9 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
|
|||||||
if len(disks) == 0 {
|
if len(disks) == 0 {
|
||||||
return fmt.Errorf("listPathRaw: 0 drives provided")
|
return fmt.Errorf("listPathRaw: 0 drives provided")
|
||||||
}
|
}
|
||||||
|
// Cancel upstream if we finish before we expect.
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
askDisks := len(disks)
|
askDisks := len(disks)
|
||||||
readers := make([]*metacacheReader, askDisks)
|
readers := make([]*metacacheReader, askDisks)
|
||||||
@ -821,6 +824,8 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// Make sure we close the pipe so blocked writes doesn't stay around.
|
||||||
|
defer r.CloseWithError(context.Canceled)
|
||||||
// Send request to each disk.
|
// Send request to each disk.
|
||||||
go func() {
|
go func() {
|
||||||
werr := d.WalkDir(ctx, WalkDirOptions{
|
werr := d.WalkDir(ctx, WalkDirOptions{
|
||||||
@ -832,7 +837,10 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
|
|||||||
ForwardTo: opts.forwardTo,
|
ForwardTo: opts.forwardTo,
|
||||||
}, w)
|
}, w)
|
||||||
w.CloseWithError(werr)
|
w.CloseWithError(werr)
|
||||||
if werr != io.EOF && werr != nil && werr.Error() != errFileNotFound.Error() && werr.Error() != errVolumeNotFound.Error() {
|
if werr != io.EOF && werr != nil &&
|
||||||
|
werr.Error() != errFileNotFound.Error() &&
|
||||||
|
werr.Error() != errVolumeNotFound.Error() &&
|
||||||
|
!errors.Is(werr, context.Canceled) {
|
||||||
logger.LogIf(ctx, werr)
|
logger.LogIf(ctx, werr)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -107,6 +107,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||||||
forward := opts.ForwardTo
|
forward := opts.ForwardTo
|
||||||
var scanDir func(path string) error
|
var scanDir func(path string) error
|
||||||
scanDir = func(current string) error {
|
scanDir = func(current string) error {
|
||||||
|
if contextCanceled(ctx) {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
entries, err := s.ListDir(ctx, opts.Bucket, current, -1)
|
entries, err := s.ListDir(ctx, opts.Bucket, current, -1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Folder could have gone away in-between
|
// Folder could have gone away in-between
|
||||||
@ -142,6 +145,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||||||
// Do do not retain the file.
|
// Do do not retain the file.
|
||||||
entries[i] = ""
|
entries[i] = ""
|
||||||
|
|
||||||
|
if contextCanceled(ctx) {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
// If root was an object return it as such.
|
// If root was an object return it as such.
|
||||||
if HasSuffix(entry, xlStorageFormatFile) {
|
if HasSuffix(entry, xlStorageFormatFile) {
|
||||||
var meta metaCacheEntry
|
var meta metaCacheEntry
|
||||||
@ -183,6 +189,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||||||
if entry == "" {
|
if entry == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if contextCanceled(ctx) {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
meta := metaCacheEntry{name: PathJoin(current, entry)}
|
meta := metaCacheEntry{name: PathJoin(current, entry)}
|
||||||
|
|
||||||
// If directory entry on stack before this, pop it now.
|
// If directory entry on stack before this, pop it now.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user