Return list request when canceled (#12977)

* Return list request when canceled
* Cancel list if abandoned
This commit is contained in:
Klaus Post 2021-08-16 20:59:16 +02:00 committed by GitHub
parent 92bb2928e4
commit ad928f0078
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 7 deletions

View File

@ -186,9 +186,11 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (
rpc := globalNotificationSys.restClientFromHash(o.Bucket) rpc := globalNotificationSys.restClientFromHash(o.Bucket)
ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second) ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second)
defer cancel() defer cancel()
if c, err := rpc.GetMetacacheListing(ctx, *o); err == nil { c, err := rpc.GetMetacacheListing(ctx, *o)
if err == nil {
c.error = "no longer used" c.error = "no longer used"
c.status = scanStateError c.status = scanStateError
rpc.UpdateMetacacheListing(ctx, *c)
} }
}() }()
o.ID = "" o.ID = ""
@ -199,8 +201,8 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (
// Create filter for results. // Create filter for results.
o.debugln("Raw List", o) o.debugln("Raw List", o)
filterCh := make(chan metaCacheEntry, o.Limit) filterCh := make(chan metaCacheEntry, o.Limit)
filteredResults := o.gatherResults(filterCh)
listCtx, cancelList := context.WithCancel(ctx) listCtx, cancelList := context.WithCancel(ctx)
filteredResults := o.gatherResults(listCtx, filterCh)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
var listErr error var listErr error
@ -343,7 +345,7 @@ func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions
inCh := make(chan metaCacheEntry, metacacheBlockSize) inCh := make(chan metaCacheEntry, metacacheBlockSize)
outCh := make(chan metaCacheEntry, o.Limit) outCh := make(chan metaCacheEntry, o.Limit)
filteredResults := o.gatherResults(outCh) filteredResults := o.gatherResults(ctx, outCh)
mc := o.newMetacache() mc := o.newMetacache()
meta := metaCacheRPC{meta: &mc, cancel: cancel, rpc: globalNotificationSys.restClientFromHash(o.Bucket), o: *o} meta := metaCacheRPC{meta: &mc, cancel: cancel, rpc: globalNotificationSys.restClientFromHash(o.Bucket), o: *o}

View File

@ -131,20 +131,31 @@ func (o *listPathOptions) debugln(data ...interface{}) {
// gatherResults will collect all results on the input channel and filter results according to the options. // gatherResults will collect all results on the input channel and filter results according to the options.
// Caller should close the channel when done. // Caller should close the channel when done.
// The returned function will return the results once there is enough or input is closed. // The returned function will return the results once there is enough or input is closed,
func (o *listPathOptions) gatherResults(in <-chan metaCacheEntry) func() (metaCacheEntriesSorted, error) { // or the context is canceled.
func (o *listPathOptions) gatherResults(ctx context.Context, in <-chan metaCacheEntry) func() (metaCacheEntriesSorted, error) {
var resultsDone = make(chan metaCacheEntriesSorted) var resultsDone = make(chan metaCacheEntriesSorted)
// Copy so we can mutate // Copy so we can mutate
resCh := resultsDone resCh := resultsDone
var done bool
var mu sync.Mutex
resErr := io.EOF resErr := io.EOF
go func() { go func() {
var results metaCacheEntriesSorted var results metaCacheEntriesSorted
var returned bool
for entry := range in { for entry := range in {
if resCh == nil { if returned {
// past limit // past limit
continue continue
} }
mu.Lock()
returned = done
mu.Unlock()
if returned {
resCh = nil
continue
}
if !o.IncludeDirectories && entry.isDir() { if !o.IncludeDirectories && entry.isDir() {
continue continue
} }
@ -167,6 +178,7 @@ func (o *listPathOptions) gatherResults(in <-chan metaCacheEntry) func() (metaCa
resErr = nil resErr = nil
resCh <- results resCh <- results
resCh = nil resCh = nil
returned = true
} }
continue continue
} }
@ -178,7 +190,15 @@ func (o *listPathOptions) gatherResults(in <-chan metaCacheEntry) func() (metaCa
} }
}() }()
return func() (metaCacheEntriesSorted, error) { return func() (metaCacheEntriesSorted, error) {
return <-resultsDone, resErr select {
case <-ctx.Done():
mu.Lock()
done = true
mu.Unlock()
return metaCacheEntriesSorted{}, ctx.Err()
case r := <-resultsDone:
return r, resErr
}
} }
} }