diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index 8818463b5..c78a1e7f3 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -186,9 +186,11 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) ( rpc := globalNotificationSys.restClientFromHash(o.Bucket) ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second) defer cancel() - if c, err := rpc.GetMetacacheListing(ctx, *o); err == nil { + c, err := rpc.GetMetacacheListing(ctx, *o) + if err == nil { c.error = "no longer used" c.status = scanStateError + rpc.UpdateMetacacheListing(ctx, *c) } }() o.ID = "" @@ -199,8 +201,8 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) ( // Create filter for results. o.debugln("Raw List", o) filterCh := make(chan metaCacheEntry, o.Limit) - filteredResults := o.gatherResults(filterCh) listCtx, cancelList := context.WithCancel(ctx) + filteredResults := o.gatherResults(listCtx, filterCh) var wg sync.WaitGroup wg.Add(1) var listErr error @@ -343,7 +345,7 @@ func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions inCh := make(chan metaCacheEntry, metacacheBlockSize) outCh := make(chan metaCacheEntry, o.Limit) - filteredResults := o.gatherResults(outCh) + filteredResults := o.gatherResults(ctx, outCh) mc := o.newMetacache() meta := metaCacheRPC{meta: &mc, cancel: cancel, rpc: globalNotificationSys.restClientFromHash(o.Bucket), o: *o} diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index ef2099ed1..811e47489 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -131,20 +131,31 @@ func (o *listPathOptions) debugln(data ...interface{}) { // gatherResults will collect all results on the input channel and filter results according to the options. // Caller should close the channel when done. -// The returned function will return the results once there is enough or input is closed. -func (o *listPathOptions) gatherResults(in <-chan metaCacheEntry) func() (metaCacheEntriesSorted, error) { +// The returned function will return the results once there is enough or input is closed, +// or the context is canceled. +func (o *listPathOptions) gatherResults(ctx context.Context, in <-chan metaCacheEntry) func() (metaCacheEntriesSorted, error) { var resultsDone = make(chan metaCacheEntriesSorted) // Copy so we can mutate resCh := resultsDone + var done bool + var mu sync.Mutex resErr := io.EOF go func() { var results metaCacheEntriesSorted + var returned bool for entry := range in { - if resCh == nil { + if returned { // past limit continue } + mu.Lock() + returned = done + mu.Unlock() + if returned { + resCh = nil + continue + } if !o.IncludeDirectories && entry.isDir() { continue } @@ -167,6 +178,7 @@ func (o *listPathOptions) gatherResults(in <-chan metaCacheEntry) func() (metaCa resErr = nil resCh <- results resCh = nil + returned = true } continue } @@ -178,7 +190,15 @@ func (o *listPathOptions) gatherResults(in <-chan metaCacheEntry) func() (metaCa } }() return func() (metaCacheEntriesSorted, error) { - return <-resultsDone, resErr + select { + case <-ctx.Done(): + mu.Lock() + done = true + mu.Unlock() + return metaCacheEntriesSorted{}, ctx.Err() + case r := <-resultsDone: + return r, resErr + } } }