From ae4ee95d253a8d5c9ffef5e67689f3aa2bdb76d5 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 19 Aug 2022 16:21:05 -0700 Subject: [PATCH] change default lock retry interval to 50ms (#15560) competing calls on the same object on versioned bucket mutating calls on the same object may unexpected have higher delays. This can be reproduced with a replicated bucket overwriting the same object writes, deletes repeatedly. For longer locks like scanner keep the 1sec interval --- cmd/bucket-replication.go | 16 +++++++++------- cmd/data-scanner.go | 6 +++++- cmd/dynamic-timeouts.go | 27 ++++++++++++++++++++++----- cmd/namespace-lock.go | 6 ++++-- internal/dsync/drwmutex.go | 12 +++++++++--- 5 files changed, 49 insertions(+), 18 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 3f8efd127..a0c756091 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -1059,13 +1059,15 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object VersionSuspended: versionSuspended, }) if err != nil { - sendEvent(eventArgs{ - EventName: event.ObjectReplicationNotTracked, - BucketName: bucket, - Object: objInfo, - Host: "Internal: [Replication]", - }) - logger.LogIf(ctx, fmt.Errorf("Unable to update replicate for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err)) + if !isErrObjectNotFound(err) { + sendEvent(eventArgs{ + EventName: event.ObjectReplicationNotTracked, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [Replication]", + }) + logger.LogIf(ctx, fmt.Errorf("Unable to update replicate metadata for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err)) + } return } defer func() { diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index b95d62f80..59109b65f 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -64,7 +64,11 @@ const ( var ( globalHealConfig heal.Config - dataScannerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second) + dataScannerLeaderLockTimeout = newDynamicTimeoutWithOpts(dynamicTimeoutOpts{ + timeout: 30 * time.Second, + minimum: 10 * time.Second, + retryInterval: time.Second, + }) // Sleeper values are updated when config is loaded. scannerSleeper = newDynamicSleeper(10, 10*time.Second, true) scannerCycle = uatomic.NewDuration(dataScannerStartDelay) diff --git a/cmd/dynamic-timeouts.go b/cmd/dynamic-timeouts.go index 4e16cea06..23d3b1266 100644 --- a/cmd/dynamic-timeouts.go +++ b/cmd/dynamic-timeouts.go @@ -34,11 +34,24 @@ const ( // timeouts that are dynamically adapted based on actual usage results type dynamicTimeout struct { - timeout int64 - minimum int64 - entries int64 - log [dynamicTimeoutLogSize]time.Duration - mutex sync.Mutex + timeout int64 + minimum int64 + entries int64 + log [dynamicTimeoutLogSize]time.Duration + mutex sync.Mutex + retryInterval time.Duration +} + +type dynamicTimeoutOpts struct { + timeout time.Duration + minimum time.Duration + retryInterval time.Duration +} + +func newDynamicTimeoutWithOpts(opts dynamicTimeoutOpts) *dynamicTimeout { + dt := newDynamicTimeout(opts.timeout, opts.minimum) + dt.retryInterval = opts.retryInterval + return dt } // newDynamicTimeout returns a new dynamic timeout initialized with timeout value @@ -57,6 +70,10 @@ func (dt *dynamicTimeout) Timeout() time.Duration { return time.Duration(atomic.LoadInt64(&dt.timeout)) } +func (dt *dynamicTimeout) RetryInterval() time.Duration { + return dt.retryInterval +} + // LogSuccess logs the duration of a successful action that // did not hit the timeout func (dt *dynamicTimeout) LogSuccess(duration time.Duration) { diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 19477f3e5..586d7cd8e 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -166,7 +166,8 @@ func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout newCtx, cancel := context.WithCancel(ctx) if !di.rwMutex.GetLock(newCtx, cancel, di.opsID, lockSource, dsync.Options{ - Timeout: timeout.Timeout(), + Timeout: timeout.Timeout(), + RetryInterval: timeout.RetryInterval(), }) { timeout.LogFailure() cancel() @@ -195,7 +196,8 @@ func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeou newCtx, cancel := context.WithCancel(ctx) if !di.rwMutex.GetRLock(ctx, cancel, di.opsID, lockSource, dsync.Options{ - Timeout: timeout.Timeout(), + Timeout: timeout.Timeout(), + RetryInterval: timeout.RetryInterval(), }) { timeout.LogFailure() cancel() diff --git a/internal/dsync/drwmutex.go b/internal/dsync/drwmutex.go index eeceeddeb..2c6b4c84a 100644 --- a/internal/dsync/drwmutex.go +++ b/internal/dsync/drwmutex.go @@ -59,7 +59,8 @@ const ( // dRWMutexRefreshInterval - default the interval between two refresh calls drwMutexRefreshInterval = 10 * time.Second - lockRetryInterval = 1 * time.Second + // maximum time to sleep before retrying a failed blocking lock() + lockRetryInterval = 50 * time.Millisecond drwMutexInfinite = 1<<63 - 1 ) @@ -142,7 +143,8 @@ func (dm *DRWMutex) Lock(id, source string) { // Options lock options. type Options struct { - Timeout time.Duration + Timeout time.Duration + RetryInterval time.Duration } // GetLock tries to get a write lock on dm before the timeout elapses. @@ -236,7 +238,11 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, lockLossCallback func(), i return locked } - time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval))) + lockRetryInterval := dm.lockRetryInterval + if opts.RetryInterval > 0 { + lockRetryInterval = opts.RetryInterval + } + time.Sleep(time.Duration(dm.rng.Float64() * float64(lockRetryInterval))) } } }