From 88c1bb07202fd9642675ec1760704a611199dff7 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 5 Feb 2021 19:23:48 -0800 Subject: [PATCH] fix: improper ticker usage in goroutines (#11468) - lock maintenance loop was incorrectly sleeping as well as using ticker badly, leading to extra expiration routines getting triggered that could flood the network. - multipart upload cleanup should be based on timer instead of ticker, to ensure that long running jobs don't get triggered twice. - make sure to get right lockers for object name --- cmd/erasure-sets.go | 9 +++++--- cmd/fs-v1-multipart.go | 9 +++++--- cmd/local-locker.go | 7 +++++-- cmd/lock-rest-server.go | 44 ++++++++++++++++++++-------------------- cmd/metacache-manager.go | 2 ++ pkg/dsync/drwmutex.go | 8 ++------ 6 files changed, 43 insertions(+), 36 deletions(-) diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 65071691a..05db578e1 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -423,14 +423,17 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto } func (s *erasureSets) cleanupStaleUploads(ctx context.Context, cleanupInterval, expiry time.Duration) { - ticker := time.NewTicker(cleanupInterval) - defer ticker.Stop() + timer := time.NewTimer(cleanupInterval) + defer timer.Stop() for { select { case <-ctx.Done(): return - case <-ticker.C: + case <-timer.C: + // Reset for the next interval + timer.Reset(cleanupInterval) + for _, set := range s.sets { set.cleanupStaleUploads(ctx, expiry) } diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 7744c07b4..9b8e7b146 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -849,14 +849,17 @@ func (fs *FSObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u // on all buckets for every `cleanupInterval`, this function is // blocking and should be run in a go-routine. func (fs *FSObjects) cleanupStaleUploads(ctx context.Context, cleanupInterval, expiry time.Duration) { - ticker := time.NewTicker(cleanupInterval) - defer ticker.Stop() + timer := time.NewTimer(cleanupInterval) + defer timer.Stop() for { select { case <-ctx.Done(): return - case <-ticker.C: + case <-timer.C: + // Reset for the next interval + timer.Reset(cleanupInterval) + now := time.Now() entries, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket)) if err != nil { diff --git a/cmd/local-locker.go b/cmd/local-locker.go index cd42f44df..d0702b727 100644 --- a/cmd/local-locker.go +++ b/cmd/local-locker.go @@ -249,7 +249,10 @@ func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired // Lock found, proceed to verify if belongs to given uid. lri, ok := l.lockMap[resource] if !ok { - return true, nil + // lock doesn't exist yet not reason to + // expire that doesn't exist yet - it may be + // racing with other active lock requests. + return false, nil } // Check whether uid is still active @@ -258,7 +261,7 @@ func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired ep := globalRemoteEndpoints[args.Owner] if !ep.IsLocal { // check if the owner is online - return isServerResolvable(ep, 1*time.Second) != nil, nil + return isServerResolvable(ep, 3*time.Second) != nil, nil } return false, nil } diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index cb24bbd30..757e4bcf1 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -33,7 +33,7 @@ import ( const ( // Lock maintenance interval. - lockMaintenanceInterval = 10 * time.Second + lockMaintenanceInterval = 15 * time.Second // Lock validity check interval. lockValidityCheckInterval = 30 * time.Second @@ -254,15 +254,15 @@ func getLongLivedLocks(interval time.Duration) []lockRequesterInfo { // - some network error (and server is up normally) // // We will ignore the error, and we will retry later to get a resolve on this lock -func lockMaintenance(ctx context.Context, interval time.Duration) error { +func lockMaintenance(ctx context.Context, interval time.Duration) { objAPI := newObjectLayerFn() if objAPI == nil { - return nil + return } z, ok := objAPI.(*erasureServerPools) if !ok { - return nil + return } type nlock struct { @@ -295,14 +295,15 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error { if lrip.Group { lockers, _ = z.serverPools[0].getHashedSet("").getLockers() } else { - lockers, _ = z.serverPools[0].getHashedSet(lrip.Name).getLockers() + _, objName := path2BucketObject(lrip.Name) + lockers, _ = z.serverPools[0].getHashedSet(objName).getLockers() } var wg sync.WaitGroup wg.Add(len(lockers)) for _, c := range lockers { go func(lrip lockRequesterInfo, c dsync.NetLocker) { defer wg.Done() - ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Call back to all participating servers, verify // if each of those servers think lock is still @@ -336,8 +337,6 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error { globalLockServer.removeEntryIfExists(lrip) } } - - return nil } // Start lock maintenance from all lock servers. @@ -354,27 +353,28 @@ func startLockMaintenance(ctx context.Context) { break } - // Initialize a new ticker with a minute between each ticks. - ticker := time.NewTicker(lockMaintenanceInterval) - // Stop the timer upon service closure and cleanup the go-routine. - defer ticker.Stop() + // Initialize a new ticker with 15secs between each ticks. + lkTimer := time.NewTimer(lockMaintenanceInterval) + // Stop the timer upon returning. + defer lkTimer.Stop() r := rand.New(rand.NewSource(UTCNow().UnixNano())) + + // Start with random sleep time, so as to avoid + // "synchronous checks" between servers + duration := time.Duration(r.Float64() * float64(lockMaintenanceInterval)) + time.Sleep(duration) + for { // Verifies every minute for locks held more than 2 minutes. select { case <-ctx.Done(): return - case <-ticker.C: - // Start with random sleep time, so as to avoid - // "synchronous checks" between servers - duration := time.Duration(r.Float64() * float64(lockMaintenanceInterval)) - time.Sleep(duration) - if err := lockMaintenance(ctx, lockValidityCheckInterval); err != nil { - // Sleep right after an error. - duration := time.Duration(r.Float64() * float64(lockMaintenanceInterval)) - time.Sleep(duration) - } + case <-lkTimer.C: + // Reset the timer for next cycle. + lkTimer.Reset(time.Duration(r.Float64() * float64(lockMaintenanceInterval))) + + lockMaintenance(ctx, lockValidityCheckInterval) } } } diff --git a/cmd/metacache-manager.go b/cmd/metacache-manager.go index 099a9c6f9..ed1d2432e 100644 --- a/cmd/metacache-manager.go +++ b/cmd/metacache-manager.go @@ -64,6 +64,8 @@ func (m *metacacheManager) initManager() { } t := time.NewTicker(time.Minute) + defer t.Stop() + var exit bool bg := context.Background() for !exit { diff --git a/pkg/dsync/drwmutex.go b/pkg/dsync/drwmutex.go index 822ee63a5..a11ca005e 100644 --- a/pkg/dsync/drwmutex.go +++ b/pkg/dsync/drwmutex.go @@ -92,8 +92,7 @@ func (dm *DRWMutex) Lock(id, source string) { // Options lock options. type Options struct { - Timeout time.Duration - Tolerance int + Timeout time.Duration } // GetLock tries to get a write lock on dm before the timeout elapses. @@ -155,10 +154,7 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL defer cancel() // Tolerance is not set, defaults to half of the locker clients. - tolerance := opts.Tolerance - if tolerance == 0 { - tolerance = len(restClnts) / 2 - } + tolerance := len(restClnts) / 2 // Quorum is effectively = total clients subtracted with tolerance limit quorum := len(restClnts) - tolerance