mirror of
https://github.com/minio/minio.git
synced 2025-04-22 03:24:38 -04:00
do not block on send channels under high load (#19090)
all send channels must compete with `ctx` if not they will perpetually stay alive.
This commit is contained in:
parent
c7f7c47388
commit
35deb1a8e2
@ -937,7 +937,11 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
|
|||||||
onlineDisks[i] = nil
|
onlineDisks[i] = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
mrfCheck <- fi.ShallowCopy()
|
select {
|
||||||
|
case mrfCheck <- fi.ShallowCopy():
|
||||||
|
case <-ctx.Done():
|
||||||
|
return fi, onlineMeta, onlineDisks, toObjectErr(ctx.Err(), bucket, object)
|
||||||
|
}
|
||||||
|
|
||||||
return fi, onlineMeta, onlineDisks, nil
|
return fi, onlineMeta, onlineDisks, nil
|
||||||
}
|
}
|
||||||
|
@ -219,7 +219,10 @@ func (o *listPathOptions) gatherResults(ctx context.Context, in <-chan metaCache
|
|||||||
// Do not return io.EOF
|
// Do not return io.EOF
|
||||||
if resCh != nil {
|
if resCh != nil {
|
||||||
resErr = nil
|
resErr = nil
|
||||||
resCh <- results
|
select {
|
||||||
|
case resCh <- results:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
resCh = nil
|
resCh = nil
|
||||||
returned = true
|
returned = true
|
||||||
}
|
}
|
||||||
|
@ -299,7 +299,11 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||||||
// If directory entry on stack before this, pop it now.
|
// If directory entry on stack before this, pop it now.
|
||||||
for len(dirStack) > 0 && dirStack[len(dirStack)-1] < meta.name {
|
for len(dirStack) > 0 && dirStack[len(dirStack)-1] < meta.name {
|
||||||
pop := dirStack[len(dirStack)-1]
|
pop := dirStack[len(dirStack)-1]
|
||||||
out <- metaCacheEntry{name: pop}
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case out <- metaCacheEntry{name: pop}:
|
||||||
|
}
|
||||||
if opts.Recursive {
|
if opts.Recursive {
|
||||||
// Scan folder we found. Should be in correct sort order where we are.
|
// Scan folder we found. Should be in correct sort order where we are.
|
||||||
err := scanDir(pop)
|
err := scanDir(pop)
|
||||||
@ -368,7 +372,11 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
pop := dirStack[len(dirStack)-1]
|
pop := dirStack[len(dirStack)-1]
|
||||||
out <- metaCacheEntry{name: pop}
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case out <- metaCacheEntry{name: pop}:
|
||||||
|
}
|
||||||
if opts.Recursive {
|
if opts.Recursive {
|
||||||
// Scan folder we found. Should be in correct sort order where we are.
|
// Scan folder we found. Should be in correct sort order where we are.
|
||||||
logger.LogIf(ctx, scanDir(pop))
|
logger.LogIf(ctx, scanDir(pop))
|
||||||
|
@ -2869,14 +2869,22 @@ func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp
|
|||||||
if req.MaxSize > 0 && int64(len(data)) > req.MaxSize {
|
if req.MaxSize > 0 && int64(len(data)) > req.MaxSize {
|
||||||
r.Exists = true
|
r.Exists = true
|
||||||
r.Error = fmt.Sprintf("max size (%d) exceeded: %d", req.MaxSize, len(data))
|
r.Error = fmt.Sprintf("max size (%d) exceeded: %d", req.MaxSize, len(data))
|
||||||
resp <- r
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case resp <- r:
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
}
|
||||||
found++
|
found++
|
||||||
r.Exists = true
|
r.Exists = true
|
||||||
r.Data = data
|
r.Data = data
|
||||||
r.Modtime = mt
|
r.Modtime = mt
|
||||||
resp <- r
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case resp <- r:
|
||||||
|
}
|
||||||
if req.MaxResults > 0 && found >= req.MaxResults {
|
if req.MaxResults > 0 && found >= req.MaxResults {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user