lock: Vendorize all the new changes made in minio/dsync (#4154)

Fixes #4139
This commit is contained in:
Harshavardhana 2017-04-19 14:22:35 -07:00 committed by GitHub
parent 5a3c5aec31
commit f1d7780167
4 changed files with 208 additions and 102 deletions

View File

@ -1,4 +1,4 @@
dsync
dsync [![Slack](https://slack.minio.io/slack?type=svg)](https://slack.minio.io) [![Go Report Card](https://goreportcard.com/badge/minio/minio)](https://goreportcard.com/report/minio/minio) [![codecov](https://codecov.io/gh/minio/dsync/branch/master/graph/badge.svg)](https://codecov.io/gh/minio/dsync)
=====
A distributed locking and syncing package for Go.
@ -16,7 +16,7 @@ This package was developed for the distributed server version of [Minio Object S
For [minio](https://minio.io/) the distributed version is started as follows (for a 6-server system):
```
$ minio server server1:/disk server2:/disk server3:/disk server4:/disk server5:/disk server6:/disk
$ minio server http://server1/disk http://server2/disk http://server3/disk http://server4/disk http://server5/disk http://server6/disk
```
_(note that the same identical command should be run on servers `server1` through to `server6`)_
@ -57,7 +57,7 @@ This table shows test performance on the same (EC2) instance type but with a var
| c3.2xlarge | 12 | (min=1239, max=1558) | 16782 | 25% |
| c3.2xlarge | 16 | (min=996, max=1391) | 19096 | 25% |
The mix and max locks/server/sec gradually declines but due to the larger number of nodes the overall total number of locks rises steadily (at the same CPU usage level).
The min and max locks/server/sec gradually declines but due to the larger number of nodes the overall total number of locks rises steadily (at the same CPU usage level).
### Performance with difference instance types

View File

@ -20,9 +20,6 @@ import (
cryptorand "crypto/rand"
"fmt"
golog "log"
"math"
"math/rand"
"net"
"os"
"sync"
"time"
@ -43,7 +40,7 @@ func log(msg ...interface{}) {
}
// DRWMutexAcquireTimeout - tolerance limit to wait for lock acquisition before.
const DRWMutexAcquireTimeout = 25 * time.Millisecond // 25ms.
const DRWMutexAcquireTimeout = 1 * time.Second // 1 second.
// A DRWMutex is a distributed mutual exclusion lock.
type DRWMutex struct {
@ -53,19 +50,21 @@ type DRWMutex struct {
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
}
// Granted - represents a structure of a granted lock.
type Granted struct {
index int
lockUid string // Locked if set with UID string, unlocked if empty
lockUID string // Locked if set with UID string, unlocked if empty
}
func (g *Granted) isLocked() bool {
return isLocked(g.lockUid)
return isLocked(g.lockUID)
}
func isLocked(uid string) bool {
return len(uid) > 0
}
// NewDRWMutex - initializes a new dsync RW mutex.
func NewDRWMutex(name string) *DRWMutex {
return &DRWMutex{
Name: name,
@ -98,22 +97,24 @@ func (dm *DRWMutex) RLock() {
// The call will block until the lock is granted using a built-in
// timing randomized back-off algorithm to try again until successful
func (dm *DRWMutex) lockBlocking(isReadLock bool) {
doneCh := make(chan struct{})
defer close(doneCh)
runs, backOff := 1, 1
for {
// create temp array on stack
// We timed out on the previous lock, incrementally wait
// for a longer back-off time and try again afterwards.
for range newRetryTimerSimple(doneCh) {
// Create temp array on stack.
locks := make([]string, dnodeCount)
// try to acquire the lock
// Try to acquire the lock.
success := lock(clnts, &locks, dm.Name, isReadLock)
if success {
dm.m.Lock()
defer dm.m.Unlock()
// if success, copy array to object
// If success, copy array to object
if isReadLock {
// append new array of strings at the end
// Append new array of strings at the end
dm.readersLocks = append(dm.readersLocks, make([]string, dnodeCount))
// and copy stack array into last spot
copy(dm.readersLocks[len(dm.readersLocks)-1], locks[:])
@ -123,38 +124,31 @@ func (dm *DRWMutex) lockBlocking(isReadLock bool) {
return
}
// We timed out on the previous lock, incrementally wait for a longer back-off time,
// and try again afterwards
time.Sleep(time.Duration(backOff) * time.Millisecond)
backOff += int(rand.Float64() * math.Pow(2, float64(runs)))
if backOff > 1024 {
backOff = backOff % 64
runs = 1 // reset runs
} else if runs < 10 {
runs++
}
// We timed out on the previous lock, incrementally wait
// for a longer back-off time and try again afterwards.
}
}
// lock tries to acquire the distributed lock, returning true or false
//
// lock tries to acquire the distributed lock, returning true or false.
func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) bool {
// Create buffered channel of size equal to total number of nodes.
ch := make(chan Granted, dnodeCount)
defer close(ch)
var wg sync.WaitGroup
for index, c := range clnts {
wg.Add(1)
// broadcast lock request to all nodes
go func(index int, isReadLock bool, c NetLocker) {
defer wg.Done()
// All client methods issuing RPCs are thread-safe and goroutine-safe,
// i.e. it is safe to call them from multiple concurrently running go routines.
bytesUid := [16]byte{}
cryptorand.Read(bytesUid[:])
uid := fmt.Sprintf("%X", bytesUid[:])
bytesUID := [16]byte{}
cryptorand.Read(bytesUID[:])
uid := fmt.Sprintf("%X", bytesUID[:])
args := LockArgs{
UID: uid,
@ -177,8 +171,9 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool)
g := Granted{index: index}
if locked {
g.lockUid = args.UID
g.lockUID = args.UID
}
ch <- g
}(index, isReadLock, c)
@ -186,11 +181,15 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool)
quorum := false
var wg sync.WaitGroup
wg.Add(1)
go func(isReadLock bool) {
// Wait until we have either a) received all lock responses, b) received too many 'non-'locks for quorum to be or c) time out
// Wait until we have either
//
// a) received all lock responses
// b) received too many 'non-'locks for quorum to be still possible
// c) time out
//
i, locksFailed := 0, 0
done := false
timeout := time.After(DRWMutexAcquireTimeout)
@ -201,20 +200,19 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool)
case grant := <-ch:
if grant.isLocked() {
// Mark that this node has acquired the lock
(*locks)[grant.index] = grant.lockUid
(*locks)[grant.index] = grant.lockUID
} else {
locksFailed++
if !isReadLock && locksFailed > dnodeCount-dquorum ||
isReadLock && locksFailed > dnodeCount-dquorumReads {
// We know that we are not going to get the lock anymore, so exit out
// and release any locks that did get acquired
// We know that we are not going to get the lock anymore,
// so exit out and release any locks that did get acquired
done = true
// Increment the number of grants received from the buffered channel.
i++
releaseAll(clnts, locks, lockName, isReadLock)
}
}
case <-timeout:
done = true
// timeout happened, maybe one of the nodes is slow, count
@ -229,7 +227,7 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool)
}
}
// Count locks in order to determine whterh we have quorum or not
// Count locks in order to determine whether we have quorum or not
quorum = quorumMet(locks, isReadLock)
// Signal that we have the quorum
@ -242,7 +240,7 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool)
grantToBeReleased := <-ch
if grantToBeReleased.isLocked() {
// release lock
sendRelease(clnts[grantToBeReleased.index], lockName, grantToBeReleased.lockUid, isReadLock)
sendRelease(clnts[grantToBeReleased.index], lockName, grantToBeReleased.lockUID, isReadLock)
}
}
}(isReadLock)
@ -269,11 +267,14 @@ func quorumMet(locks *[]string, isReadLock bool) bool {
}
}
var quorum bool
if isReadLock {
return count >= dquorumReads
quorum = count >= dquorumReads
} else {
return count >= dquorum
quorum = count >= dquorum
}
return quorum
}
// releaseAll releases all locks that are marked as locked
@ -360,7 +361,6 @@ func unlock(locks []string, name string, isReadLock bool) {
// ForceUnlock will forcefully clear a write or read lock.
func (dm *DRWMutex) ForceUnlock() {
{
dm.m.Lock()
defer dm.m.Unlock()
@ -379,61 +379,25 @@ func (dm *DRWMutex) ForceUnlock() {
// sendRelease sends a release message to a node that previously granted a lock
func sendRelease(c NetLocker, name, uid string, isReadLock bool) {
backOffArray := []time.Duration{
30 * time.Second, // 30secs.
1 * time.Minute, // 1min.
3 * time.Minute, // 3min.
10 * time.Minute, // 10min.
30 * time.Minute, // 30min.
1 * time.Hour, // 1hr.
args := LockArgs{
UID: uid,
Resource: name,
ServerAddr: clnts[ownNode].ServerAddr(),
ServiceEndpoint: clnts[ownNode].ServiceEndpoint(),
}
go func(c NetLocker, name string) {
for _, backOff := range backOffArray {
// All client methods issuing RPCs are thread-safe and goroutine-safe,
// i.e. it is safe to call them from multiple concurrently running goroutines.
args := LockArgs{
UID: uid,
Resource: name,
ServerAddr: clnts[ownNode].ServerAddr(),
ServiceEndpoint: clnts[ownNode].ServiceEndpoint(),
}
var err error
if len(uid) == 0 {
if _, err = c.ForceUnlock(args); err != nil {
log("Unable to call ForceUnlock", err)
}
} else if isReadLock {
if _, err = c.RUnlock(args); err != nil {
log("Unable to call RUnlock", err)
}
} else {
if _, err = c.Unlock(args); err != nil {
log("Unable to call Unlock", err)
}
}
if err != nil {
// Ignore if err is net.Error and it is occurred due to timeout.
// The cause could have been server timestamp mismatch or server may have restarted.
// FIXME: This is minio specific behaviour and we would need a way to make it generically.
if nErr, ok := err.(net.Error); ok && nErr.Timeout() {
err = nil
}
}
if err == nil {
return
}
// Wait..
time.Sleep(backOff)
if len(uid) == 0 {
if _, err := c.ForceUnlock(args); err != nil {
log("Unable to call ForceUnlock", err)
}
}(c, name)
} else if isReadLock {
if _, err := c.RUnlock(args); err != nil {
log("Unable to call RUnlock", err)
}
} else {
if _, err := c.Unlock(args); err != nil {
log("Unable to call Unlock", err)
}
}
}
// DRLocker returns a sync.Locker interface that implements

142
vendor/github.com/minio/dsync/retry.go generated vendored Normal file
View File

@ -0,0 +1,142 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dsync
import (
"math/rand"
"sync"
"time"
)
// lockedRandSource provides protected rand source, implements rand.Source interface.
type lockedRandSource struct {
lk sync.Mutex
src rand.Source
}
// Int63 returns a non-negative pseudo-random 63-bit integer as an
// int64.
func (r *lockedRandSource) Int63() (n int64) {
r.lk.Lock()
n = r.src.Int63()
r.lk.Unlock()
return
}
// Seed uses the provided seed value to initialize the generator to a
// deterministic state.
func (r *lockedRandSource) Seed(seed int64) {
r.lk.Lock()
r.src.Seed(seed)
r.lk.Unlock()
}
// MaxJitter will randomize over the full exponential backoff time
const MaxJitter = 1.0
// NoJitter disables the use of jitter for randomizing the
// exponential backoff time
const NoJitter = 0.0
// Global random source for fetching random values.
var globalRandomSource = rand.New(&lockedRandSource{
src: rand.NewSource(time.Now().UTC().UnixNano()),
})
// newRetryTimerJitter creates a timer with exponentially increasing delays
// until the maximum retry attempts are reached. - this function is a fully
// configurable version, meant for only advanced use cases. For the most part
// one should use newRetryTimerSimple and newRetryTimer.
func newRetryTimerWithJitter(unit time.Duration, cap time.Duration, jitter float64, doneCh chan struct{}) <-chan int {
attemptCh := make(chan int)
// normalize jitter to the range [0, 1.0]
if jitter < NoJitter {
jitter = NoJitter
}
if jitter > MaxJitter {
jitter = MaxJitter
}
// computes the exponential backoff duration according to
// https://www.awsarchitectureblog.com/2015/03/backoff.html
exponentialBackoffWait := func(attempt int) time.Duration {
// 1<<uint(attempt) below could overflow, so limit the value of attempt
maxAttempt := 30
if attempt > maxAttempt {
attempt = maxAttempt
}
//sleep = random_between(0, min(cap, base * 2 ** attempt))
sleep := unit * time.Duration(1<<uint(attempt))
if sleep > cap {
sleep = cap
}
if jitter != NoJitter {
sleep -= time.Duration(globalRandomSource.Float64() * float64(sleep) * jitter)
}
return sleep
}
go func() {
defer close(attemptCh)
nextBackoff := 0
// Channel used to signal after the expiry of backoff wait seconds.
var timer *time.Timer
for {
select { // Attempts starts.
case attemptCh <- nextBackoff:
nextBackoff++
case <-doneCh:
// Stop the routine.
return
}
timer = time.NewTimer(exponentialBackoffWait(nextBackoff))
// wait till next backoff time or till doneCh gets a message.
select {
case <-timer.C:
case <-doneCh:
// stop the timer and return.
timer.Stop()
return
}
}
}()
// Start reading..
return attemptCh
}
// Default retry constants.
const (
defaultRetryUnit = time.Second // 1 second.
defaultRetryCap = 1 * time.Second // 1 second.
)
// newRetryTimer creates a timer with exponentially increasing delays
// until the maximum retry attempts are reached. - this function provides
// resulting retry values to be of maximum jitter.
func newRetryTimer(unit time.Duration, cap time.Duration, doneCh chan struct{}) <-chan int {
return newRetryTimerWithJitter(unit, cap, MaxJitter, doneCh)
}
// newRetryTimerSimple creates a timer with exponentially increasing delays
// until the maximum retry attempts are reached. - this function is a
// simpler version with all default values.
func newRetryTimerSimple(doneCh chan struct{}) <-chan int {
return newRetryTimerWithJitter(defaultRetryUnit, defaultRetryCap, MaxJitter, doneCh)
}

6
vendor/vendor.json vendored
View File

@ -193,10 +193,10 @@
"revisionTime": "2017-02-27T07:32:28Z"
},
{
"checksumSHA1": "NBGyq2+iTtJvJ+ElG4FzHLe1WSY=",
"checksumSHA1": "vrIbl0L+RLwyPRCxMss5+eZtADE=",
"path": "github.com/minio/dsync",
"revision": "9cafd4d729eb71b31ef7851a8c8f6ceb855d0915",
"revisionTime": "2016-12-23T07:07:24Z"
"revision": "535db94aebce49cacce4de9c6f5f5821601281cd",
"revisionTime": "2017-04-19T20:41:15Z"
},
{
"path": "github.com/minio/go-homedir",