mirror of
https://github.com/minio/minio.git
synced 2025-01-12 15:33:22 -05:00
Tweaks to dynamic locks (#10508)
* Fix cases where minimum timeout > default timeout. * Add defensive code for too small/negative timeouts. * Never set timeout below the maximum value of a request. * Protect against (unlikely) int64 wraps. * Decrease timeout slower. * Don't re-lock before copying.
This commit is contained in:
parent
6f45e303f5
commit
c851e022b7
@ -17,6 +17,7 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -26,7 +27,8 @@ const (
|
|||||||
dynamicTimeoutIncreaseThresholdPct = 0.33 // Upper threshold for failures in order to increase timeout
|
dynamicTimeoutIncreaseThresholdPct = 0.33 // Upper threshold for failures in order to increase timeout
|
||||||
dynamicTimeoutDecreaseThresholdPct = 0.10 // Lower threshold for failures in order to decrease timeout
|
dynamicTimeoutDecreaseThresholdPct = 0.10 // Lower threshold for failures in order to decrease timeout
|
||||||
dynamicTimeoutLogSize = 16
|
dynamicTimeoutLogSize = 16
|
||||||
maxDuration = 1<<63 - 1
|
maxDuration = math.MaxInt64
|
||||||
|
maxDynamicTimeout = 24 * time.Hour // Never set timeout bigger than this.
|
||||||
)
|
)
|
||||||
|
|
||||||
// timeouts that are dynamically adapted based on actual usage results
|
// timeouts that are dynamically adapted based on actual usage results
|
||||||
@ -40,6 +42,12 @@ type dynamicTimeout struct {
|
|||||||
|
|
||||||
// newDynamicTimeout returns a new dynamic timeout initialized with timeout value
|
// newDynamicTimeout returns a new dynamic timeout initialized with timeout value
|
||||||
func newDynamicTimeout(timeout, minimum time.Duration) *dynamicTimeout {
|
func newDynamicTimeout(timeout, minimum time.Duration) *dynamicTimeout {
|
||||||
|
if timeout <= 0 || minimum <= 0 {
|
||||||
|
panic("newDynamicTimeout: negative or zero timeout")
|
||||||
|
}
|
||||||
|
if minimum > timeout {
|
||||||
|
minimum = timeout
|
||||||
|
}
|
||||||
return &dynamicTimeout{timeout: int64(timeout), minimum: int64(minimum)}
|
return &dynamicTimeout{timeout: int64(timeout), minimum: int64(minimum)}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -61,15 +69,17 @@ func (dt *dynamicTimeout) LogFailure() {
|
|||||||
|
|
||||||
// logEntry stores a log entry
|
// logEntry stores a log entry
|
||||||
func (dt *dynamicTimeout) logEntry(duration time.Duration) {
|
func (dt *dynamicTimeout) logEntry(duration time.Duration) {
|
||||||
|
if duration < 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
entries := int(atomic.AddInt64(&dt.entries, 1))
|
entries := int(atomic.AddInt64(&dt.entries, 1))
|
||||||
index := entries - 1
|
index := entries - 1
|
||||||
if index < dynamicTimeoutLogSize {
|
if index < dynamicTimeoutLogSize {
|
||||||
dt.mutex.Lock()
|
dt.mutex.Lock()
|
||||||
dt.log[index] = duration
|
dt.log[index] = duration
|
||||||
dt.mutex.Unlock()
|
|
||||||
}
|
// We leak entries while we copy
|
||||||
if entries == dynamicTimeoutLogSize {
|
if entries == dynamicTimeoutLogSize {
|
||||||
dt.mutex.Lock()
|
|
||||||
|
|
||||||
// Make copy on stack in order to call adjust()
|
// Make copy on stack in order to call adjust()
|
||||||
logCopy := [dynamicTimeoutLogSize]time.Duration{}
|
logCopy := [dynamicTimeoutLogSize]time.Duration{}
|
||||||
@ -77,44 +87,55 @@ func (dt *dynamicTimeout) logEntry(duration time.Duration) {
|
|||||||
|
|
||||||
// reset log entries
|
// reset log entries
|
||||||
atomic.StoreInt64(&dt.entries, 0)
|
atomic.StoreInt64(&dt.entries, 0)
|
||||||
|
|
||||||
dt.mutex.Unlock()
|
dt.mutex.Unlock()
|
||||||
|
|
||||||
dt.adjust(logCopy)
|
dt.adjust(logCopy)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dt.mutex.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// adjust changes the value of the dynamic timeout based on the
|
// adjust changes the value of the dynamic timeout based on the
|
||||||
// previous results
|
// previous results
|
||||||
func (dt *dynamicTimeout) adjust(entries [dynamicTimeoutLogSize]time.Duration) {
|
func (dt *dynamicTimeout) adjust(entries [dynamicTimeoutLogSize]time.Duration) {
|
||||||
|
failures, max := 0, time.Duration(0)
|
||||||
failures, average := 0, int64(0)
|
for _, dur := range entries[:] {
|
||||||
for i := 0; i < len(entries); i++ {
|
if dur == maxDuration {
|
||||||
if entries[i] == maxDuration {
|
|
||||||
failures++
|
failures++
|
||||||
} else {
|
} else if dur > max {
|
||||||
average += int64(entries[i])
|
max = dur
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if failures < len(entries) {
|
|
||||||
average /= int64(len(entries) - failures)
|
|
||||||
}
|
|
||||||
|
|
||||||
timeOutHitPct := float64(failures) / float64(len(entries))
|
failPct := float64(failures) / float64(len(entries))
|
||||||
|
|
||||||
if timeOutHitPct > dynamicTimeoutIncreaseThresholdPct {
|
if failPct > dynamicTimeoutIncreaseThresholdPct {
|
||||||
// We are hitting the timeout too often, so increase the timeout by 25%
|
// We are hitting the timeout too often, so increase the timeout by 25%
|
||||||
timeout := atomic.LoadInt64(&dt.timeout) * 125 / 100
|
timeout := atomic.LoadInt64(&dt.timeout) * 125 / 100
|
||||||
atomic.StoreInt64(&dt.timeout, timeout)
|
|
||||||
} else if timeOutHitPct < dynamicTimeoutDecreaseThresholdPct {
|
|
||||||
// We are hitting the timeout relatively few times, so decrease the timeout
|
|
||||||
average = average * 125 / 100 // Add buffer of 25% on top of average
|
|
||||||
|
|
||||||
timeout := (atomic.LoadInt64(&dt.timeout) + average) / 2 // Middle between current timeout and average success
|
// Set upper cap.
|
||||||
|
if timeout > int64(maxDynamicTimeout) {
|
||||||
|
timeout = int64(maxDynamicTimeout)
|
||||||
|
}
|
||||||
|
// Safety, shouldn't happen
|
||||||
|
if timeout < dt.minimum {
|
||||||
|
timeout = dt.minimum
|
||||||
|
}
|
||||||
|
atomic.StoreInt64(&dt.timeout, timeout)
|
||||||
|
} else if failPct < dynamicTimeoutDecreaseThresholdPct {
|
||||||
|
// We are hitting the timeout relatively few times,
|
||||||
|
// so decrease the timeout towards 25 % of maximum time spent.
|
||||||
|
max = max * 125 / 100
|
||||||
|
|
||||||
|
timeout := atomic.LoadInt64(&dt.timeout)
|
||||||
|
if max < time.Duration(timeout) {
|
||||||
|
// Move 50% toward the max.
|
||||||
|
timeout = (int64(max) + timeout) / 2
|
||||||
|
}
|
||||||
if timeout < dt.minimum {
|
if timeout < dt.minimum {
|
||||||
timeout = dt.minimum
|
timeout = dt.minimum
|
||||||
}
|
}
|
||||||
atomic.StoreInt64(&dt.timeout, timeout)
|
atomic.StoreInt64(&dt.timeout, timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,8 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -98,7 +100,7 @@ func TestDynamicTimeoutDualDecrease(t *testing.T) {
|
|||||||
adjustedAgain := timeout.Timeout()
|
adjustedAgain := timeout.Timeout()
|
||||||
|
|
||||||
if initial <= adjusted || adjusted <= adjustedAgain {
|
if initial <= adjusted || adjusted <= adjustedAgain {
|
||||||
t.Errorf("Failure to decrease timeout multiple times")
|
t.Errorf("Failure to decrease timeout multiple times, initial: %v, adjusted: %v, again: %v", initial, adjusted, adjustedAgain)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -123,6 +125,30 @@ func TestDynamicTimeoutManyDecreases(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDynamicTimeoutConcurrent(t *testing.T) {
|
||||||
|
// Race test.
|
||||||
|
timeout := newDynamicTimeout(time.Second, time.Millisecond)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
rng := rand.New(rand.NewSource(int64(i)))
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
timeout.LogFailure()
|
||||||
|
for j := 0; j < 100; j++ {
|
||||||
|
timeout.LogSuccess(time.Duration(float64(time.Second) * rng.Float64()))
|
||||||
|
}
|
||||||
|
to := timeout.Timeout()
|
||||||
|
if to < time.Millisecond || to > time.Second {
|
||||||
|
panic(to)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
func TestDynamicTimeoutHitMinimum(t *testing.T) {
|
func TestDynamicTimeoutHitMinimum(t *testing.T) {
|
||||||
|
|
||||||
const minimum = 30 * time.Second
|
const minimum = 30 * time.Second
|
||||||
@ -168,7 +194,7 @@ func TestDynamicTimeoutAdjustExponential(t *testing.T) {
|
|||||||
|
|
||||||
timeout := newDynamicTimeout(time.Minute, time.Second)
|
timeout := newDynamicTimeout(time.Minute, time.Second)
|
||||||
|
|
||||||
rand.Seed(time.Now().UTC().UnixNano())
|
rand.Seed(0)
|
||||||
|
|
||||||
initial := timeout.Timeout()
|
initial := timeout.Timeout()
|
||||||
|
|
||||||
@ -188,7 +214,7 @@ func TestDynamicTimeoutAdjustNormalized(t *testing.T) {
|
|||||||
|
|
||||||
timeout := newDynamicTimeout(time.Minute, time.Second)
|
timeout := newDynamicTimeout(time.Minute, time.Second)
|
||||||
|
|
||||||
rand.Seed(time.Now().UTC().UnixNano())
|
rand.Seed(0)
|
||||||
|
|
||||||
initial := timeout.Timeout()
|
initial := timeout.Timeout()
|
||||||
|
|
||||||
|
@ -31,7 +31,7 @@ const (
|
|||||||
leaderLockTimeoutSleepInterval = time.Hour
|
leaderLockTimeoutSleepInterval = time.Hour
|
||||||
)
|
)
|
||||||
|
|
||||||
var leaderLockTimeout = newDynamicTimeout(30*time.Second, time.Minute)
|
var leaderLockTimeout = newDynamicTimeout(1*time.Minute, 30*time.Second)
|
||||||
|
|
||||||
// NewBgHealSequence creates a background healing sequence
|
// NewBgHealSequence creates a background healing sequence
|
||||||
// operation which crawls all objects and heal them.
|
// operation which crawls all objects and heal them.
|
||||||
|
@ -450,7 +450,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
|
|||||||
wquorum := InsufficientWriteQuorum{}
|
wquorum := InsufficientWriteQuorum{}
|
||||||
|
|
||||||
// allocate dynamic timeout once before the loop
|
// allocate dynamic timeout once before the loop
|
||||||
iamLockTimeout := newDynamicTimeout(3*time.Second, 5*time.Second)
|
iamLockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second)
|
||||||
|
|
||||||
for range retry.NewTimerWithJitter(retryCtx, time.Second, 5*time.Second, retry.MaxJitter) {
|
for range retry.NewTimerWithJitter(retryCtx, time.Second, 5*time.Second, retry.MaxJitter) {
|
||||||
// let one of the server acquire the lock, if not let them timeout.
|
// let one of the server acquire the lock, if not let them timeout.
|
||||||
|
@ -231,7 +231,7 @@ func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// allocate dynamic timeout once before the loop
|
// allocate dynamic timeout once before the loop
|
||||||
configLockTimeout := newDynamicTimeout(3*time.Second, 5*time.Second)
|
configLockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second)
|
||||||
|
|
||||||
// **** WARNING ****
|
// **** WARNING ****
|
||||||
// Migrating to encrypted backend should happen before initialization of any
|
// Migrating to encrypted backend should happen before initialization of any
|
||||||
|
Loading…
Reference in New Issue
Block a user