fix: incorrect context timeout during listPath() (#15509)

This PR cleans up the listing code for single drive
to ensure that we do not add an incorrect context
timeout, while resuming the listing.

fixes #15508
This commit is contained in:
Harshavardhana 2022-08-10 07:35:29 -07:00 committed by GitHub
parent 172e63dbb6
commit 74418b542a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -112,14 +112,14 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (
if o.ID != "" && !o.Transient { if o.ID != "" && !o.Transient {
// Create or ping with handout... // Create or ping with handout...
rpc := globalNotificationSys.restClientFromHash(pathJoin(o.Bucket, o.Prefix)) rpc := globalNotificationSys.restClientFromHash(pathJoin(o.Bucket, o.Prefix))
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
var c *metacache var c *metacache
if rpc == nil { if rpc == nil {
resp := localMetacacheMgr.getBucket(ctx, o.Bucket).findCache(*o) resp := localMetacacheMgr.getBucket(ctx, o.Bucket).findCache(*o)
c = &resp c = &resp
} else { } else {
c, err = rpc.GetMetacacheListing(ctx, *o) rctx, cancel := context.WithTimeout(ctx, 5*time.Second)
c, err = rpc.GetMetacacheListing(rctx, *o)
cancel()
} }
if err != nil { if err != nil {
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
@ -318,58 +318,32 @@ func (es *erasureSingle) listPath(ctx context.Context, o *listPathOptions) (entr
// //
// If we don't have a list id we must ask the server if it has a cache or create a new. // If we don't have a list id we must ask the server if it has a cache or create a new.
if o.ID != "" && !o.Transient { if o.ID != "" && !o.Transient {
// Create or ping with handout... resp := localMetacacheMgr.getBucket(ctx, o.Bucket).findCache(*o)
rpc := globalNotificationSys.restClientFromHash(pathJoin(o.Bucket, o.Prefix)) c := &resp
ctx, cancel := context.WithTimeout(ctx, 5*time.Second) if c.fileNotFound {
defer cancel() // No cache found, no entries found.
var c *metacache return entries, io.EOF
if rpc == nil {
resp := localMetacacheMgr.getBucket(ctx, o.Bucket).findCache(*o)
c = &resp
} else {
c, err = rpc.GetMetacacheListing(ctx, *o)
} }
if err != nil { if c.status == scanStateError || c.status == scanStateNone {
if errors.Is(err, context.Canceled) { o.ID = ""
// Context is canceled, return at once.
// request canceled, no entries to return
return entries, io.EOF
}
if !errors.Is(err, context.DeadlineExceeded) {
o.debugln("listPath: got error", err)
}
o.Transient = true
o.Create = false o.Create = false
o.ID = mustGetUUID() o.debugln("scan status", c.status, " - waiting a roundtrip to create")
} else { } else {
if c.fileNotFound { // Continue listing
// No cache found, no entries found. o.ID = c.id
return entries, io.EOF go func(meta metacache) {
} // Continuously update while we wait.
if c.status == scanStateError || c.status == scanStateNone { t := time.NewTicker(metacacheMaxClientWait / 10)
o.ID = "" defer t.Stop()
o.Create = false select {
o.debugln("scan status", c.status, " - waiting a roundtrip to create") case <-ctx.Done():
} else { // Request is done, stop updating.
// Continue listing return
o.ID = c.id case <-t.C:
go func(meta metacache) { meta.lastHandout = time.Now()
// Continuously update while we wait. meta, _ = localMetacacheMgr.updateCacheEntry(meta)
t := time.NewTicker(metacacheMaxClientWait / 10) }
defer t.Stop() }(*c)
select {
case <-ctx.Done():
// Request is done, stop updating.
return
case <-t.C:
meta.lastHandout = time.Now()
if rpc == nil {
meta, _ = localMetacacheMgr.updateCacheEntry(meta)
}
meta, _ = rpc.UpdateMetacacheListing(ctx, meta)
}
}(*c)
}
} }
// We have an existing list ID, continue streaming. // We have an existing list ID, continue streaming.
@ -399,21 +373,7 @@ func (es *erasureSingle) listPath(ctx context.Context, o *listPathOptions) (entr
return entries, err return entries, err
} }
entries.truncate(0) entries.truncate(0)
go func() {
rpc := globalNotificationSys.restClientFromHash(pathJoin(o.Bucket, o.Prefix))
if rpc != nil {
ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second)
defer cancel()
c, err := rpc.GetMetacacheListing(ctx, *o)
if err == nil {
c.error = "no longer used"
c.status = scanStateError
rpc.UpdateMetacacheListing(ctx, *c)
}
}
}()
o.ID = "" o.ID = ""
if err != nil { if err != nil {
logger.LogIf(ctx, fmt.Errorf("Resuming listing from drives failed %w, proceeding to do raw listing", err)) logger.LogIf(ctx, fmt.Errorf("Resuming listing from drives failed %w, proceeding to do raw listing", err))
} }