mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
Fix ListObjects aborting after 3 minute on async request (#20074)
When creating the async listing, if the first request does not return within 3 minutes, it is stopped, since it isn't being kept alive. Keep updating `lastHandout` while we are waiting for the initial request to be fulfilled.
This commit is contained in:
parent
989c318a28
commit
83adc2eebf
@ -153,19 +153,7 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (
|
|||||||
} else {
|
} else {
|
||||||
// Continue listing
|
// Continue listing
|
||||||
o.ID = c.id
|
o.ID = c.id
|
||||||
go func(meta metacache) {
|
go c.keepAlive(ctx, rpc)
|
||||||
// Continuously update while we wait.
|
|
||||||
t := time.NewTicker(metacacheMaxClientWait / 10)
|
|
||||||
defer t.Stop()
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
// Request is done, stop updating.
|
|
||||||
return
|
|
||||||
case <-t.C:
|
|
||||||
meta.lastHandout = time.Now()
|
|
||||||
meta, _ = rpc.UpdateMetacacheListing(ctx, meta)
|
|
||||||
}
|
|
||||||
}(*c)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -219,6 +207,9 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (
|
|||||||
o.ID = ""
|
o.ID = ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if contextCanceled(ctx) {
|
||||||
|
return entries, ctx.Err()
|
||||||
|
}
|
||||||
// Do listing in-place.
|
// Do listing in-place.
|
||||||
// Create output for our results.
|
// Create output for our results.
|
||||||
// Create filter for results.
|
// Create filter for results.
|
||||||
@ -449,5 +440,10 @@ func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions
|
|||||||
xioutil.SafeClose(saveCh)
|
xioutil.SafeClose(saveCh)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return filteredResults()
|
entries, err = filteredResults()
|
||||||
|
if err == nil {
|
||||||
|
// Check if listing recorded an error.
|
||||||
|
err = meta.getErr()
|
||||||
|
}
|
||||||
|
return entries, err
|
||||||
}
|
}
|
||||||
|
@ -805,6 +805,17 @@ func (m *metaCacheRPC) setErr(err string) {
|
|||||||
*m.meta = meta
|
*m.meta = meta
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getErr will return an error if the listing failed.
|
||||||
|
// The error is not type safe.
|
||||||
|
func (m *metaCacheRPC) getErr() error {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
if m.meta.status == scanStateError {
|
||||||
|
return errors.New(m.meta.error)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCacheRPC, entries <-chan metaCacheEntry) (err error) {
|
func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCacheRPC, entries <-chan metaCacheEntry) (err error) {
|
||||||
o := mc.o
|
o := mc.o
|
||||||
o.debugf(color.Green("saveMetaCacheStream:")+" with options: %#v", o)
|
o.debugf(color.Green("saveMetaCacheStream:")+" with options: %#v", o)
|
||||||
|
@ -24,6 +24,8 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/minio/pkg/v3/console"
|
||||||
)
|
)
|
||||||
|
|
||||||
type scanStatus uint8
|
type scanStatus uint8
|
||||||
@ -97,6 +99,37 @@ func (m *metacache) worthKeeping() bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// keepAlive will continuously update lastHandout until ctx is canceled.
|
||||||
|
func (m metacache) keepAlive(ctx context.Context, rpc *peerRESTClient) {
|
||||||
|
// we intentionally operate on a copy of m, so we can update without locks.
|
||||||
|
t := time.NewTicker(metacacheMaxClientWait / 10)
|
||||||
|
defer t.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// Request is done, stop updating.
|
||||||
|
return
|
||||||
|
case <-t.C:
|
||||||
|
m.lastHandout = time.Now()
|
||||||
|
|
||||||
|
if m2, err := rpc.UpdateMetacacheListing(ctx, m); err == nil {
|
||||||
|
if m2.status != scanStateStarted {
|
||||||
|
if serverDebugLog {
|
||||||
|
console.Debugln("returning", m.id, "due to scan state", m2.status, time.Now().Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m = m2
|
||||||
|
if serverDebugLog {
|
||||||
|
console.Debugln("refreshed", m.id, time.Now().Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
} else if serverDebugLog {
|
||||||
|
console.Debugln("error refreshing", m.id, time.Now().Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// baseDirFromPrefix will return the base directory given an object path.
|
// baseDirFromPrefix will return the base directory given an object path.
|
||||||
// For example an object with name prefix/folder/object.ext will return `prefix/folder/`.
|
// For example an object with name prefix/folder/object.ext will return `prefix/folder/`.
|
||||||
func baseDirFromPrefix(prefix string) string {
|
func baseDirFromPrefix(prefix string) string {
|
||||||
@ -116,13 +149,17 @@ func baseDirFromPrefix(prefix string) string {
|
|||||||
// update cache with new status.
|
// update cache with new status.
|
||||||
// The updates are conditional so multiple callers can update with different states.
|
// The updates are conditional so multiple callers can update with different states.
|
||||||
func (m *metacache) update(update metacache) {
|
func (m *metacache) update(update metacache) {
|
||||||
m.lastUpdate = UTCNow()
|
now := UTCNow()
|
||||||
|
m.lastUpdate = now
|
||||||
|
|
||||||
if m.lastHandout.After(m.lastHandout) {
|
if update.lastHandout.After(m.lastHandout) {
|
||||||
m.lastHandout = UTCNow()
|
m.lastHandout = update.lastUpdate
|
||||||
|
if m.lastHandout.After(now) {
|
||||||
|
m.lastHandout = now
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if m.status == scanStateStarted && update.status == scanStateSuccess {
|
if m.status == scanStateStarted && update.status == scanStateSuccess {
|
||||||
m.ended = UTCNow()
|
m.ended = now
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.status == scanStateStarted && update.status != scanStateStarted {
|
if m.status == scanStateStarted && update.status != scanStateStarted {
|
||||||
@ -138,7 +175,7 @@ func (m *metacache) update(update metacache) {
|
|||||||
if m.error == "" && update.error != "" {
|
if m.error == "" && update.error != "" {
|
||||||
m.error = update.error
|
m.error = update.error
|
||||||
m.status = scanStateError
|
m.status = scanStateError
|
||||||
m.ended = UTCNow()
|
m.ended = now
|
||||||
}
|
}
|
||||||
m.fileNotFound = m.fileNotFound || update.fileNotFound
|
m.fileNotFound = m.fileNotFound || update.fileNotFound
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user