From 74418b542a4a1e0eb27375060d0982927899ee2b Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 10 Aug 2022 07:35:29 -0700 Subject: [PATCH] fix: incorrect context timeout during listPath() (#15509) This PR cleans up the listing code for single drive to ensure that we do not add an incorrect context timeout, while resuming the listing. fixes #15508 --- cmd/metacache-server-pool.go | 92 ++++++++++-------------------------- 1 file changed, 26 insertions(+), 66 deletions(-) diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index 26c19b748..91f572b76 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -112,14 +112,14 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) ( if o.ID != "" && !o.Transient { // Create or ping with handout... rpc := globalNotificationSys.restClientFromHash(pathJoin(o.Bucket, o.Prefix)) - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() var c *metacache if rpc == nil { resp := localMetacacheMgr.getBucket(ctx, o.Bucket).findCache(*o) c = &resp } else { - c, err = rpc.GetMetacacheListing(ctx, *o) + rctx, cancel := context.WithTimeout(ctx, 5*time.Second) + c, err = rpc.GetMetacacheListing(rctx, *o) + cancel() } if err != nil { if errors.Is(err, context.Canceled) { @@ -318,58 +318,32 @@ func (es *erasureSingle) listPath(ctx context.Context, o *listPathOptions) (entr // // If we don't have a list id we must ask the server if it has a cache or create a new. if o.ID != "" && !o.Transient { - // Create or ping with handout... - rpc := globalNotificationSys.restClientFromHash(pathJoin(o.Bucket, o.Prefix)) - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - var c *metacache - if rpc == nil { - resp := localMetacacheMgr.getBucket(ctx, o.Bucket).findCache(*o) - c = &resp - } else { - c, err = rpc.GetMetacacheListing(ctx, *o) + resp := localMetacacheMgr.getBucket(ctx, o.Bucket).findCache(*o) + c := &resp + if c.fileNotFound { + // No cache found, no entries found. + return entries, io.EOF } - if err != nil { - if errors.Is(err, context.Canceled) { - // Context is canceled, return at once. - // request canceled, no entries to return - return entries, io.EOF - } - if !errors.Is(err, context.DeadlineExceeded) { - o.debugln("listPath: got error", err) - } - o.Transient = true + if c.status == scanStateError || c.status == scanStateNone { + o.ID = "" o.Create = false - o.ID = mustGetUUID() + o.debugln("scan status", c.status, " - waiting a roundtrip to create") } else { - if c.fileNotFound { - // No cache found, no entries found. - return entries, io.EOF - } - if c.status == scanStateError || c.status == scanStateNone { - o.ID = "" - o.Create = false - o.debugln("scan status", c.status, " - waiting a roundtrip to create") - } else { - // Continue listing - o.ID = c.id - go func(meta metacache) { - // Continuously update while we wait. - t := time.NewTicker(metacacheMaxClientWait / 10) - defer t.Stop() - select { - case <-ctx.Done(): - // Request is done, stop updating. - return - case <-t.C: - meta.lastHandout = time.Now() - if rpc == nil { - meta, _ = localMetacacheMgr.updateCacheEntry(meta) - } - meta, _ = rpc.UpdateMetacacheListing(ctx, meta) - } - }(*c) - } + // Continue listing + o.ID = c.id + go func(meta metacache) { + // Continuously update while we wait. + t := time.NewTicker(metacacheMaxClientWait / 10) + defer t.Stop() + select { + case <-ctx.Done(): + // Request is done, stop updating. + return + case <-t.C: + meta.lastHandout = time.Now() + meta, _ = localMetacacheMgr.updateCacheEntry(meta) + } + }(*c) } // We have an existing list ID, continue streaming. @@ -399,21 +373,7 @@ func (es *erasureSingle) listPath(ctx context.Context, o *listPathOptions) (entr return entries, err } entries.truncate(0) - go func() { - rpc := globalNotificationSys.restClientFromHash(pathJoin(o.Bucket, o.Prefix)) - if rpc != nil { - ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second) - defer cancel() - c, err := rpc.GetMetacacheListing(ctx, *o) - if err == nil { - c.error = "no longer used" - c.status = scanStateError - rpc.UpdateMetacacheListing(ctx, *c) - } - } - }() o.ID = "" - if err != nil { logger.LogIf(ctx, fmt.Errorf("Resuming listing from drives failed %w, proceeding to do raw listing", err)) }