diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index 9e637d1cf..c31c85bbc 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -153,19 +153,7 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) ( } else { // Continue listing o.ID = c.id - go func(meta metacache) { - // Continuously update while we wait. - t := time.NewTicker(metacacheMaxClientWait / 10) - defer t.Stop() - select { - case <-ctx.Done(): - // Request is done, stop updating. - return - case <-t.C: - meta.lastHandout = time.Now() - meta, _ = rpc.UpdateMetacacheListing(ctx, meta) - } - }(*c) + go c.keepAlive(ctx, rpc) } } } @@ -219,6 +207,9 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) ( o.ID = "" } + if contextCanceled(ctx) { + return entries, ctx.Err() + } // Do listing in-place. // Create output for our results. // Create filter for results. @@ -449,5 +440,10 @@ func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions xioutil.SafeClose(saveCh) }() - return filteredResults() + entries, err = filteredResults() + if err == nil { + // Check if listing recorded an error. + err = meta.getErr() + } + return entries, err } diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index db834261e..d4eda4dd6 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -805,6 +805,17 @@ func (m *metaCacheRPC) setErr(err string) { *m.meta = meta } +// getErr will return an error if the listing failed. +// The error is not type safe. +func (m *metaCacheRPC) getErr() error { + m.mu.Lock() + defer m.mu.Unlock() + if m.meta.status == scanStateError { + return errors.New(m.meta.error) + } + return nil +} + func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCacheRPC, entries <-chan metaCacheEntry) (err error) { o := mc.o o.debugf(color.Green("saveMetaCacheStream:")+" with options: %#v", o) diff --git a/cmd/metacache.go b/cmd/metacache.go index 94de00da3..7f35d391f 100644 --- a/cmd/metacache.go +++ b/cmd/metacache.go @@ -24,6 +24,8 @@ import ( "path" "strings" "time" + + "github.com/minio/pkg/v3/console" ) type scanStatus uint8 @@ -97,6 +99,37 @@ func (m *metacache) worthKeeping() bool { return true } +// keepAlive will continuously update lastHandout until ctx is canceled. +func (m metacache) keepAlive(ctx context.Context, rpc *peerRESTClient) { + // we intentionally operate on a copy of m, so we can update without locks. + t := time.NewTicker(metacacheMaxClientWait / 10) + defer t.Stop() + for { + select { + case <-ctx.Done(): + // Request is done, stop updating. + return + case <-t.C: + m.lastHandout = time.Now() + + if m2, err := rpc.UpdateMetacacheListing(ctx, m); err == nil { + if m2.status != scanStateStarted { + if serverDebugLog { + console.Debugln("returning", m.id, "due to scan state", m2.status, time.Now().Format(time.RFC3339)) + } + return + } + m = m2 + if serverDebugLog { + console.Debugln("refreshed", m.id, time.Now().Format(time.RFC3339)) + } + } else if serverDebugLog { + console.Debugln("error refreshing", m.id, time.Now().Format(time.RFC3339)) + } + } + } +} + // baseDirFromPrefix will return the base directory given an object path. // For example an object with name prefix/folder/object.ext will return `prefix/folder/`. func baseDirFromPrefix(prefix string) string { @@ -116,13 +149,17 @@ func baseDirFromPrefix(prefix string) string { // update cache with new status. // The updates are conditional so multiple callers can update with different states. func (m *metacache) update(update metacache) { - m.lastUpdate = UTCNow() + now := UTCNow() + m.lastUpdate = now - if m.lastHandout.After(m.lastHandout) { - m.lastHandout = UTCNow() + if update.lastHandout.After(m.lastHandout) { + m.lastHandout = update.lastUpdate + if m.lastHandout.After(now) { + m.lastHandout = now + } } if m.status == scanStateStarted && update.status == scanStateSuccess { - m.ended = UTCNow() + m.ended = now } if m.status == scanStateStarted && update.status != scanStateStarted { @@ -138,7 +175,7 @@ func (m *metacache) update(update metacache) { if m.error == "" && update.error != "" { m.error = update.error m.status = scanStateError - m.ended = UTCNow() + m.ended = now } m.fileNotFound = m.fileNotFound || update.fileNotFound }