mirror of https://github.com/minio/minio.git
lock: Retry locking with an increasing random interval (#17200)
This commit is contained in:
parent
b62791617c
commit
684399433b
|
@ -34,23 +34,29 @@ import (
|
|||
// Indicator if logging is enabled.
|
||||
var dsyncLog bool
|
||||
|
||||
// maximum time to sleep before retrying a failed blocking lock()
|
||||
var lockRetryInterval time.Duration
|
||||
// Retry unit interval
|
||||
var lockRetryMinInterval time.Duration
|
||||
|
||||
var lockRetryBackOff func(*rand.Rand, uint) time.Duration
|
||||
|
||||
func init() {
|
||||
// Check for MINIO_DSYNC_TRACE env variable, if set logging will be enabled for failed REST operations.
|
||||
dsyncLog = os.Getenv("_MINIO_DSYNC_TRACE") == "1"
|
||||
|
||||
// lockRetryInterval specifies the maximum time between retries for failed locks.
|
||||
// Average retry time will be value / 2.
|
||||
lockRetryInterval = 250 * time.Millisecond
|
||||
lockRetryMinInterval = 250 * time.Millisecond
|
||||
if lri := os.Getenv("_MINIO_LOCK_RETRY_INTERVAL"); lri != "" {
|
||||
v, err := strconv.Atoi(lri)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
lockRetryInterval = time.Duration(v) * time.Millisecond
|
||||
lockRetryMinInterval = time.Duration(v) * time.Millisecond
|
||||
}
|
||||
|
||||
lockRetryBackOff = backoffWait(
|
||||
lockRetryMinInterval,
|
||||
100*time.Millisecond,
|
||||
5*time.Second,
|
||||
)
|
||||
}
|
||||
|
||||
func log(format string, data ...interface{}) {
|
||||
|
@ -103,15 +109,15 @@ var DefaultTimeouts = Timeouts{
|
|||
|
||||
// A DRWMutex is a distributed mutual exclusion lock.
|
||||
type DRWMutex struct {
|
||||
Names []string
|
||||
writeLocks []string // Array of nodes that granted a write lock
|
||||
readLocks []string // Array of array of nodes that granted reader locks
|
||||
rng *rand.Rand
|
||||
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
|
||||
clnt *Dsync
|
||||
cancelRefresh context.CancelFunc
|
||||
refreshInterval time.Duration
|
||||
lockRetryInterval time.Duration
|
||||
Names []string
|
||||
writeLocks []string // Array of nodes that granted a write lock
|
||||
readLocks []string // Array of array of nodes that granted reader locks
|
||||
rng *rand.Rand
|
||||
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
|
||||
clnt *Dsync
|
||||
cancelRefresh context.CancelFunc
|
||||
refreshInterval time.Duration
|
||||
lockRetryMinInterval time.Duration
|
||||
}
|
||||
|
||||
// Granted - represents a structure of a granted lock.
|
||||
|
@ -133,13 +139,13 @@ func NewDRWMutex(clnt *Dsync, names ...string) *DRWMutex {
|
|||
restClnts, _ := clnt.GetLockers()
|
||||
sort.Strings(names)
|
||||
return &DRWMutex{
|
||||
writeLocks: make([]string, len(restClnts)),
|
||||
readLocks: make([]string, len(restClnts)),
|
||||
Names: names,
|
||||
clnt: clnt,
|
||||
rng: rand.New(&lockedRandSource{src: rand.NewSource(time.Now().UTC().UnixNano())}),
|
||||
refreshInterval: drwMutexRefreshInterval,
|
||||
lockRetryInterval: lockRetryInterval,
|
||||
writeLocks: make([]string, len(restClnts)),
|
||||
readLocks: make([]string, len(restClnts)),
|
||||
Names: names,
|
||||
clnt: clnt,
|
||||
rng: rand.New(&lockedRandSource{src: rand.NewSource(time.Now().UTC().UnixNano())}),
|
||||
refreshInterval: drwMutexRefreshInterval,
|
||||
lockRetryMinInterval: lockRetryMinInterval,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -225,6 +231,7 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, lockLossCallback func(), i
|
|||
log("lockBlocking %s/%s for %#v: lockType readLock(%t), additional opts: %#v, quorum: %d, tolerance: %d, lockClients: %d\n", id, source, dm.Names, isReadLock, opts, quorum, tolerance, len(restClnts))
|
||||
|
||||
tolerance = len(restClnts) - quorum
|
||||
attempt := uint(0)
|
||||
|
||||
for {
|
||||
select {
|
||||
|
@ -251,14 +258,15 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, lockLossCallback func(), i
|
|||
return locked
|
||||
}
|
||||
|
||||
lockRetryInterval := dm.lockRetryInterval
|
||||
if opts.RetryInterval != 0 {
|
||||
lockRetryInterval = opts.RetryInterval
|
||||
}
|
||||
if lockRetryInterval < 0 {
|
||||
switch {
|
||||
case opts.RetryInterval < 0:
|
||||
return false
|
||||
case opts.RetryInterval > 0:
|
||||
time.Sleep(opts.RetryInterval)
|
||||
default:
|
||||
attempt++
|
||||
time.Sleep(lockRetryBackOff(dm.rng, attempt))
|
||||
}
|
||||
time.Sleep(time.Duration(dm.rng.Float64() * float64(lockRetryInterval)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -631,7 +639,7 @@ func (dm *DRWMutex) Unlock(ctx context.Context) {
|
|||
|
||||
isReadLock := false
|
||||
for !releaseAll(ctx, dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) {
|
||||
time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval)))
|
||||
time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryMinInterval)))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -672,7 +680,7 @@ func (dm *DRWMutex) RUnlock(ctx context.Context) {
|
|||
|
||||
isReadLock := true
|
||||
for !releaseAll(ctx, dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) {
|
||||
time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval)))
|
||||
time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryMinInterval)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
// Copyright (c) 2015-2021 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package dsync
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"time"
|
||||
)
|
||||
|
||||
func backoffWait(min, unit, cap time.Duration) func(*rand.Rand, uint) time.Duration {
|
||||
if unit > time.Hour {
|
||||
// Protect against integer overflow
|
||||
panic("unit cannot exceed one hour")
|
||||
}
|
||||
return func(r *rand.Rand, attempt uint) time.Duration {
|
||||
sleep := min
|
||||
sleep += unit * time.Duration(attempt)
|
||||
if sleep > cap {
|
||||
sleep = cap
|
||||
}
|
||||
sleep -= time.Duration(r.Float64() * float64(sleep))
|
||||
return sleep
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue