diff --git a/cmd/dynamic-timeouts.go b/cmd/dynamic-timeouts.go index 1815bc61d..becb3b879 100644 --- a/cmd/dynamic-timeouts.go +++ b/cmd/dynamic-timeouts.go @@ -17,6 +17,7 @@ package cmd import ( + "math" "sync" "sync/atomic" "time" @@ -26,7 +27,8 @@ const ( dynamicTimeoutIncreaseThresholdPct = 0.33 // Upper threshold for failures in order to increase timeout dynamicTimeoutDecreaseThresholdPct = 0.10 // Lower threshold for failures in order to decrease timeout dynamicTimeoutLogSize = 16 - maxDuration = 1<<63 - 1 + maxDuration = math.MaxInt64 + maxDynamicTimeout = 24 * time.Hour // Never set timeout bigger than this. ) // timeouts that are dynamically adapted based on actual usage results @@ -40,6 +42,12 @@ type dynamicTimeout struct { // newDynamicTimeout returns a new dynamic timeout initialized with timeout value func newDynamicTimeout(timeout, minimum time.Duration) *dynamicTimeout { + if timeout <= 0 || minimum <= 0 { + panic("newDynamicTimeout: negative or zero timeout") + } + if minimum > timeout { + minimum = timeout + } return &dynamicTimeout{timeout: int64(timeout), minimum: int64(minimum)} } @@ -61,60 +69,73 @@ func (dt *dynamicTimeout) LogFailure() { // logEntry stores a log entry func (dt *dynamicTimeout) logEntry(duration time.Duration) { + if duration < 0 { + return + } entries := int(atomic.AddInt64(&dt.entries, 1)) index := entries - 1 if index < dynamicTimeoutLogSize { dt.mutex.Lock() dt.log[index] = duration + + // We leak entries while we copy + if entries == dynamicTimeoutLogSize { + + // Make copy on stack in order to call adjust() + logCopy := [dynamicTimeoutLogSize]time.Duration{} + copy(logCopy[:], dt.log[:]) + + // reset log entries + atomic.StoreInt64(&dt.entries, 0) + dt.mutex.Unlock() + + dt.adjust(logCopy) + return + } dt.mutex.Unlock() } - if entries == dynamicTimeoutLogSize { - dt.mutex.Lock() - - // Make copy on stack in order to call adjust() - logCopy := [dynamicTimeoutLogSize]time.Duration{} - copy(logCopy[:], dt.log[:]) - - // reset log entries - atomic.StoreInt64(&dt.entries, 0) - - dt.mutex.Unlock() - - dt.adjust(logCopy) - } } // adjust changes the value of the dynamic timeout based on the // previous results func (dt *dynamicTimeout) adjust(entries [dynamicTimeoutLogSize]time.Duration) { - - failures, average := 0, int64(0) - for i := 0; i < len(entries); i++ { - if entries[i] == maxDuration { + failures, max := 0, time.Duration(0) + for _, dur := range entries[:] { + if dur == maxDuration { failures++ - } else { - average += int64(entries[i]) + } else if dur > max { + max = dur } } - if failures < len(entries) { - average /= int64(len(entries) - failures) - } - timeOutHitPct := float64(failures) / float64(len(entries)) + failPct := float64(failures) / float64(len(entries)) - if timeOutHitPct > dynamicTimeoutIncreaseThresholdPct { + if failPct > dynamicTimeoutIncreaseThresholdPct { // We are hitting the timeout too often, so increase the timeout by 25% timeout := atomic.LoadInt64(&dt.timeout) * 125 / 100 - atomic.StoreInt64(&dt.timeout, timeout) - } else if timeOutHitPct < dynamicTimeoutDecreaseThresholdPct { - // We are hitting the timeout relatively few times, so decrease the timeout - average = average * 125 / 100 // Add buffer of 25% on top of average - timeout := (atomic.LoadInt64(&dt.timeout) + average) / 2 // Middle between current timeout and average success + // Set upper cap. + if timeout > int64(maxDynamicTimeout) { + timeout = int64(maxDynamicTimeout) + } + // Safety, shouldn't happen + if timeout < dt.minimum { + timeout = dt.minimum + } + atomic.StoreInt64(&dt.timeout, timeout) + } else if failPct < dynamicTimeoutDecreaseThresholdPct { + // We are hitting the timeout relatively few times, + // so decrease the timeout towards 25 % of maximum time spent. + max = max * 125 / 100 + + timeout := atomic.LoadInt64(&dt.timeout) + if max < time.Duration(timeout) { + // Move 50% toward the max. + timeout = (int64(max) + timeout) / 2 + } if timeout < dt.minimum { timeout = dt.minimum } atomic.StoreInt64(&dt.timeout, timeout) } - } diff --git a/cmd/dynamic-timeouts_test.go b/cmd/dynamic-timeouts_test.go index 699c7f8af..229e2b445 100644 --- a/cmd/dynamic-timeouts_test.go +++ b/cmd/dynamic-timeouts_test.go @@ -18,6 +18,8 @@ package cmd import ( "math/rand" + "runtime" + "sync" "testing" "time" ) @@ -98,7 +100,7 @@ func TestDynamicTimeoutDualDecrease(t *testing.T) { adjustedAgain := timeout.Timeout() if initial <= adjusted || adjusted <= adjustedAgain { - t.Errorf("Failure to decrease timeout multiple times") + t.Errorf("Failure to decrease timeout multiple times, initial: %v, adjusted: %v, again: %v", initial, adjusted, adjustedAgain) } } @@ -123,6 +125,30 @@ func TestDynamicTimeoutManyDecreases(t *testing.T) { } } +func TestDynamicTimeoutConcurrent(t *testing.T) { + // Race test. + timeout := newDynamicTimeout(time.Second, time.Millisecond) + var wg sync.WaitGroup + for i := 0; i < runtime.GOMAXPROCS(0); i++ { + wg.Add(1) + rng := rand.New(rand.NewSource(int64(i))) + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + timeout.LogFailure() + for j := 0; j < 100; j++ { + timeout.LogSuccess(time.Duration(float64(time.Second) * rng.Float64())) + } + to := timeout.Timeout() + if to < time.Millisecond || to > time.Second { + panic(to) + } + } + }() + } + wg.Wait() +} + func TestDynamicTimeoutHitMinimum(t *testing.T) { const minimum = 30 * time.Second @@ -168,7 +194,7 @@ func TestDynamicTimeoutAdjustExponential(t *testing.T) { timeout := newDynamicTimeout(time.Minute, time.Second) - rand.Seed(time.Now().UTC().UnixNano()) + rand.Seed(0) initial := timeout.Timeout() @@ -188,7 +214,7 @@ func TestDynamicTimeoutAdjustNormalized(t *testing.T) { timeout := newDynamicTimeout(time.Minute, time.Second) - rand.Seed(time.Now().UTC().UnixNano()) + rand.Seed(0) initial := timeout.Timeout() diff --git a/cmd/global-heal.go b/cmd/global-heal.go index da1a55ce4..987ac5d68 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -31,7 +31,7 @@ const ( leaderLockTimeoutSleepInterval = time.Hour ) -var leaderLockTimeout = newDynamicTimeout(30*time.Second, time.Minute) +var leaderLockTimeout = newDynamicTimeout(1*time.Minute, 30*time.Second) // NewBgHealSequence creates a background healing sequence // operation which crawls all objects and heal them. diff --git a/cmd/iam.go b/cmd/iam.go index bf5ec6d8e..e16e2b8fe 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -450,7 +450,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { wquorum := InsufficientWriteQuorum{} // allocate dynamic timeout once before the loop - iamLockTimeout := newDynamicTimeout(3*time.Second, 5*time.Second) + iamLockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second) for range retry.NewTimerWithJitter(retryCtx, time.Second, 5*time.Second, retry.MaxJitter) { // let one of the server acquire the lock, if not let them timeout. diff --git a/cmd/server-main.go b/cmd/server-main.go index 225414b1c..61d014ebf 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -231,7 +231,7 @@ func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) { } // allocate dynamic timeout once before the loop - configLockTimeout := newDynamicTimeout(3*time.Second, 5*time.Second) + configLockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second) // **** WARNING **** // Migrating to encrypted backend should happen before initialization of any