Upgrade to new dsync version incl. stale lock detection (#2708)

This commit is contained in:
Frank 2016-09-16 09:30:55 +02:00 committed by Harshavardhana
parent 7a549096de
commit df2ef64d20
9 changed files with 735 additions and 240 deletions

View File

@ -161,3 +161,19 @@ func (authClient *AuthRPCClient) Call(serviceMethod string, args interface {
}
return err
}
// Node returns the node (network address) of the connection
func (authClient *AuthRPCClient) Node() string {
if authClient.rpc != nil {
return authClient.rpc.node
}
return ""
}
// RPCPath returns the RPC path of the connection
func (authClient *AuthRPCClient) RPCPath() string {
if authClient.rpc != nil {
return authClient.rpc.rpcPath
}
return ""
}

View File

@ -18,6 +18,7 @@ package cmd
import (
"fmt"
"math/rand"
"net/rpc"
"path"
"strings"
@ -28,6 +29,8 @@ import (
)
const lockRPCPath = "/minio/lock"
const lockMaintenanceLoop = 1 * time.Minute
const lockCheckValidityInterval = 2 * time.Minute
// LockArgs besides lock name, holds Token and Timestamp for session
// authentication and validation server restart.
@ -35,6 +38,9 @@ type LockArgs struct {
Name string
Token string
Timestamp time.Time
Node string
RPCPath string
UID string
}
// SetToken - sets the token to the supplied value.
@ -47,12 +53,26 @@ func (l *LockArgs) SetTimestamp(tstamp time.Time) {
l.Timestamp = tstamp
}
// lockRequesterInfo stores various info from the client for each lock that is requested
type lockRequesterInfo struct {
writer bool // Bool whether write or read lock
node string // Network address of client claiming lock
rpcPath string // RPC path of client claiming lock
uid string // Uid to uniquely identify request of client
timestamp time.Time // Timestamp set at the time of initialization
timeLastCheck time.Time // Timestamp for last check of validity of lock
}
// isWriteLock returns whether the lock is a write or read lock
func isWriteLock(lri []lockRequesterInfo) bool {
return len(lri) == 1 && lri[0].writer
}
// lockServer is type for RPC handlers
type lockServer struct {
rpcPath string
mutex sync.Mutex
// e.g, when a Lock(name) is held, map[string][]bool{"name" : []bool{true}}
// when one or more RLock() is held, map[string][]bool{"name" : []bool{false, false}}
lockMap map[string][]bool
rpcPath string
mutex sync.Mutex
lockMap map[string][]lockRequesterInfo
timestamp time.Time // Timestamp set at the time of initialization. Resets naturally on minio server restart.
}
@ -93,15 +113,11 @@ func (l *lockServer) Lock(args *LockArgs, reply *bool) error {
if err := l.verifyArgs(args); err != nil {
return err
}
_, ok := l.lockMap[args.Name]
// No locks held on the given name.
if !ok {
*reply = true
l.lockMap[args.Name] = []bool{true}
} else {
// Either a read or write lock is held on the given name.
*reply = false
_, *reply = l.lockMap[args.Name]
if !*reply { // No locks held on the given name, so claim write lock
l.lockMap[args.Name] = []lockRequesterInfo{lockRequesterInfo{writer: true, node: args.Node, rpcPath: args.RPCPath, uid: args.UID, timestamp: time.Now(), timeLastCheck: time.Now()}}
}
*reply = !*reply // Negate *reply to return true when lock is granted or false otherwise
return nil
}
@ -112,19 +128,18 @@ func (l *lockServer) Unlock(args *LockArgs, reply *bool) error {
if err := l.verifyArgs(args); err != nil {
return err
}
locksHeld, ok := l.lockMap[args.Name]
// No lock is held on the given name, there must be some issue at the lock client side.
if !ok {
*reply = false
return fmt.Errorf("Unlock attempted on an un-locked entity: %s", args.Name)
} else if len(locksHeld) == 1 && locksHeld[0] == true {
*reply = true
delete(l.lockMap, args.Name)
return nil
} else {
*reply = false
return fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.Name, len(locksHeld))
var lri []lockRequesterInfo
lri, *reply = l.lockMap[args.Name]
if !*reply { // No lock is held on the given name
return fmt.Errorf("Unlock attempted on an unlocked entity: %s", args.Name)
}
if *reply = isWriteLock(lri); !*reply { // Unless it is a write lock
return fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.Name, len(lri))
}
if l.removeEntry(args.Name, args.UID, &lri) {
return nil
}
return fmt.Errorf("Unlock unable to find corresponding lock for uid: %s", args.UID)
}
// RLock - rpc handler for read lock operation.
@ -134,19 +149,15 @@ func (l *lockServer) RLock(args *LockArgs, reply *bool) error {
if err := l.verifyArgs(args); err != nil {
return err
}
locksHeld, ok := l.lockMap[args.Name]
// No locks held on the given name.
if !ok {
// First read-lock to be held on *name.
l.lockMap[args.Name] = []bool{false}
var lri []lockRequesterInfo
lri, *reply = l.lockMap[args.Name]
if !*reply { // No locks held on the given name, so claim (first) read lock
l.lockMap[args.Name] = []lockRequesterInfo{lockRequesterInfo{writer: false, node: args.Node, rpcPath: args.RPCPath, uid: args.UID, timestamp: time.Now(), timeLastCheck: time.Now()}}
*reply = true
} else if len(locksHeld) == 1 && locksHeld[0] == true {
// A write-lock is held, read lock can't be granted.
*reply = false
} else {
// Add an entry for this read lock.
l.lockMap[args.Name] = append(locksHeld, false)
*reply = true
if *reply = !isWriteLock(lri); *reply { // Unless there is a write lock
l.lockMap[args.Name] = append(l.lockMap[args.Name], lockRequesterInfo{writer: false, node: args.Node, rpcPath: args.RPCPath, uid: args.UID, timestamp: time.Now(), timeLastCheck: time.Now()})
}
}
return nil
}
@ -158,26 +169,132 @@ func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error {
if err := l.verifyArgs(args); err != nil {
return err
}
locksHeld, ok := l.lockMap[args.Name]
if !ok {
*reply = false
return fmt.Errorf("RUnlock attempted on an un-locked entity: %s", args.Name)
} else if len(locksHeld) == 1 && locksHeld[0] == true {
// A write-lock is held, cannot release a read lock
*reply = false
return fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.Name)
} else if len(locksHeld) > 1 {
// Remove one of the read locks held.
locksHeld = locksHeld[1:]
l.lockMap[args.Name] = locksHeld
*reply = true
} else {
// Delete the map entry since this is the last read lock held
// on *name.
delete(l.lockMap, args.Name)
*reply = true
var lri []lockRequesterInfo
if lri, *reply = l.lockMap[args.Name]; !*reply { // No lock is held on the given name
return fmt.Errorf("RUnlock attempted on an unlocked entity: %s", args.Name)
}
if *reply = !isWriteLock(lri); !*reply { // A write-lock is held, cannot release a read lock
return fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.Name)
}
if l.removeEntry(args.Name, args.UID, &lri) {
return nil
}
return fmt.Errorf("RUnlock unable to find corresponding read lock for uid: %s", args.UID)
}
// Active - rpc handler for active lock status.
func (l *lockServer) Active(args *LockArgs, reply *bool) error {
l.mutex.Lock()
defer l.mutex.Unlock()
if err := l.verifyArgs(args); err != nil {
return err
}
var lri []lockRequesterInfo
if lri, *reply = l.lockMap[args.Name]; !*reply {
return nil // No lock is held on the given name so return false
}
// Check whether uid is still active
for _, entry := range lri {
if *reply = entry.uid == args.UID; *reply {
return nil // When uid found return true
}
}
return nil // None found so return false
}
// removeEntry either, based on the uid of the lock message, removes a single entry from the
// lockRequesterInfo array or the whole array from the map (in case of a write lock or last read lock)
func (l *lockServer) removeEntry(name, uid string, lri *[]lockRequesterInfo) bool {
// Find correct entry to remove based on uid
for index, entry := range *lri {
if entry.uid == uid {
if len(*lri) == 1 {
delete(l.lockMap, name) // Remove the (last) lock
} else {
// Remove the appropriate read lock
*lri = append((*lri)[:index], (*lri)[index+1:]...)
l.lockMap[name] = *lri
}
return true
}
}
return false
}
// nameLockRequesterInfoPair is a helper type for lock maintenance
type nameLockRequesterInfoPair struct {
name string
lri lockRequesterInfo
}
// getLongLivedLocks returns locks that are older than a certain time and
// have not been 'checked' for validity too soon enough
func getLongLivedLocks(m map[string][]lockRequesterInfo, interval time.Duration) []nameLockRequesterInfoPair {
rslt := []nameLockRequesterInfoPair{}
for name, lriArray := range m {
for idx := range lriArray {
// Check whether enough time has gone by since last check
if time.Since(lriArray[idx].timeLastCheck) >= interval {
rslt = append(rslt, nameLockRequesterInfoPair{name: name, lri: lriArray[idx]})
lriArray[idx].timeLastCheck = time.Now()
}
}
}
return rslt
}
// lockMaintenance loops over locks that have been active for some time and checks back
// with the original server whether it is still alive or not
func (l *lockServer) lockMaintenance(interval time.Duration) {
l.mutex.Lock()
// get list of locks to check
nlripLongLived := getLongLivedLocks(l.lockMap, interval)
l.mutex.Unlock()
for _, nlrip := range nlripLongLived {
c := newClient(nlrip.lri.node, nlrip.lri.rpcPath)
var active bool
// Call back to original server verify whether the lock is still active (based on name & uid)
if err := c.Call("Dsync.Active", &LockArgs{Name: nlrip.name, UID: nlrip.lri.uid}, &active); err != nil {
// We failed to connect back to the server that originated the lock, this can either be due to
// - server at client down
// - some network error (and server is up normally)
//
// We will ignore the error, and we will retry later to get resolve on this lock
c.Close()
} else {
c.Close()
if !active { // The lock is no longer active at server that originated the lock
// so remove the lock from the map
l.mutex.Lock()
// Check if entry is still in map (could have been removed altogether by 'concurrent' (R)Unlock of last entry)
if lri, ok := l.lockMap[nlrip.name]; ok {
if !l.removeEntry(nlrip.name, nlrip.lri.uid, &lri) {
// Remove failed, in case it is a:
if nlrip.lri.writer {
// Writer: this should never happen as the whole (mapped) entry should have been deleted
log.Errorln("Lock maintenance failed to remove entry for write lock (should never happen)", nlrip.name, nlrip.lri, lri)
} else {
// Reader: this can happen if multiple read locks were active and the one we are looking for
// has been released concurrently (so it is fine)
}
} else {
// remove went okay, all is fine
}
}
l.mutex.Unlock()
}
}
}
return nil
}
// Initialize distributed lock.
@ -205,12 +322,26 @@ func newLockServers(serverConfig serverCmdConfig) (lockServers []*lockServer) {
if idx := strings.LastIndex(export, ":"); idx != -1 {
export = export[idx+1:]
}
lockServers = append(lockServers, &lockServer{
// Create handler for lock RPCs
locker := &lockServer{
rpcPath: export,
mutex: sync.Mutex{},
lockMap: make(map[string][]bool),
lockMap: make(map[string][]lockRequesterInfo),
timestamp: time.Now().UTC(),
})
}
// Start loop for stale lock maintenance
go func() {
// Start with random sleep time, so as to avoid "synchronous checks" between servers
time.Sleep(time.Duration(rand.Float64() * float64(lockMaintenanceLoop)))
for {
time.Sleep(lockMaintenanceLoop)
locker.lockMaintenance(lockCheckValidityInterval)
}
}()
lockServers = append(lockServers, locker)
}
}
return lockServers

View File

@ -38,6 +38,7 @@ func initDsyncNodes(disks []string, port int) error {
cred := serverConfig.GetCredential()
// Initialize rpc lock client information only if this instance is a distributed setup.
var clnts []dsync.RPC
myNode := -1
for _, disk := range disks {
if idx := strings.LastIndex(disk, ":"); idx != -1 {
clnts = append(clnts, newAuthClient(&authConfig{
@ -49,9 +50,14 @@ func initDsyncNodes(disks []string, port int) error {
path: pathutil.Join(lockRPCPath, disk[idx+1:]),
loginMethod: "Dsync.LoginHandler",
}))
if isLocalStorage(disk) && myNode == -1 {
myNode = len(clnts) - 1
}
}
}
return dsync.SetNodesWithClients(clnts)
return dsync.SetNodesWithClients(clnts, myNode)
}
// initNSLock - initialize name space lock map.
@ -86,9 +92,8 @@ type nsParam struct {
// nsLock - provides primitives for locking critical namespace regions.
type nsLock struct {
writer RWLocker
readerArray []RWLocker
ref uint
RWLocker
ref uint
}
// nsLockMap - namespace lock map, provides primitives to Lock,
@ -114,7 +119,7 @@ func (n *nsLockMap) lock(volume, path string, lockOrigin, opsID string, readLock
nsLk, found := n.lockMap[param]
if !found {
nsLk = &nsLock{
writer: func() RWLocker {
RWLocker: func() RWLocker {
if n.isDist {
return dsync.NewDRWMutex(pathutil.Join(volume, path))
}
@ -125,10 +130,6 @@ func (n *nsLockMap) lock(volume, path string, lockOrigin, opsID string, readLock
n.lockMap[param] = nsLk
}
nsLk.ref++ // Update ref count here to avoid multiple races.
rwlock := nsLk.writer
if readLock && n.isDist {
rwlock = dsync.NewDRWMutex(pathutil.Join(volume, path))
}
if globalDebugLock {
// change the state of the lock to be blocked for the given pair of <volume, path> and <OperationID> till the lock unblocks.
@ -143,21 +144,9 @@ func (n *nsLockMap) lock(volume, path string, lockOrigin, opsID string, readLock
// Locking here can block.
if readLock {
rwlock.RLock()
if n.isDist {
// Only add (for reader case) to array after RLock() succeeds
// (so that we know for sure that element in [0] can be RUnlocked())
n.lockMapMutex.Lock()
if len(nsLk.readerArray) == 0 {
nsLk.readerArray = []RWLocker{rwlock}
} else {
nsLk.readerArray = append(nsLk.readerArray, rwlock)
}
n.lockMapMutex.Unlock()
}
nsLk.RLock()
} else {
rwlock.Lock()
nsLk.Lock()
}
// check if lock debugging enabled.
@ -180,19 +169,9 @@ func (n *nsLockMap) unlock(volume, path, opsID string, readLock bool) {
param := nsParam{volume, path}
if nsLk, found := n.lockMap[param]; found {
if readLock {
if n.isDist {
if len(nsLk.readerArray) == 0 {
errorIf(errors.New("Length of reader lock array cannot be 0."), "Invalid reader lock array length detected.")
}
// Release first lock first (FIFO)
nsLk.readerArray[0].RUnlock()
// And discard first element
nsLk.readerArray = nsLk.readerArray[1:]
} else {
nsLk.writer.RUnlock()
}
nsLk.RUnlock()
} else {
nsLk.writer.Unlock()
nsLk.Unlock()
}
if nsLk.ref == 0 {
errorIf(errors.New("Namespace reference count cannot be 0."), "Invalid reference count detected.")
@ -208,10 +187,6 @@ func (n *nsLockMap) unlock(volume, path, opsID string, readLock bool) {
}
}
if nsLk.ref == 0 {
if len(nsLk.readerArray) != 0 && n.isDist {
errorIf(errors.New("Length of reader lock array should be 0 upon deleting map entry."), "Invalid reader lock array length detected.")
}
// Remove from the map if there are no more references.
delete(n.lockMap, param)

View File

@ -123,3 +123,13 @@ func (rpcClient *RPCClient) Close() error {
rpcClient.clearRPCClient()
return rpcLocalStack.Close()
}
// Node returns the node (network address) of the connection
func (rpcClient *RPCClient) Node() string {
return rpcClient.node
}
// RPCPath returns the RPC path of the connection
func (rpcClient *RPCClient) RPCPath() string {
return rpcClient.rpcPath
}

View File

@ -1,74 +1,377 @@
dsync
=====
A distributed sync package.
A distributed locking and syncing package for Go.
Introduction
------------
`dsync` is a package for doing distributed locks over a network of `n` nodes. It is designed with simplicity in mind and hence offers limited scalability (`n <= 16`). Each node will be connected to all other nodes and lock requests from any node will be broadcast to all connected nodes. A node will succeed in getting the lock if `n/2 + 1` nodes (including itself) respond positively. If the lock is acquired it can be held for some time and needs to be released afterwards. This will cause the release to be broadcast to all nodes after which the lock becomes available again.
`dsync` is a package for doing distributed locks over a network of `n` nodes. It is designed with simplicity in mind and hence offers limited scalability (`n <= 16`). Each node will be connected to all other nodes and lock requests from any node will be broadcast to all connected nodes. A node will succeed in getting the lock if `n/2 + 1` nodes (whether or not including itself) respond positively. If the lock is acquired it can be held for as long as the client desires and needs to be released afterwards. This will cause the release to be broadcast to all nodes after which the lock becomes available again.
Motivation
----------
This package was developed for the distributed server version of [Minio Object Storage](https://minio.io/). For this we needed a distributed locking mechanism for up to 16 servers that each would be running `minio server`. The locking mechanism itself should be a reader/writer mutual exclusion lock meaning that it can be held by a single writer or an arbitrary number of readers.
For [minio](https://minio.io/) the distributed version is started as follows (for a 6-server system):
```
$ minio server server1/disk server2/disk server3/disk server4/disk server5/disk server6/disk
```
_(note that the same identical command should be run on servers `server1` through to `server6`)_
Design goals
------------
* Simple design: by keeping the design simple, many tricky edge cases can be avoided.
* No master node: there is no concept of a master node which, if this would be used and the master would be down, causes locking to come to a complete stop. (Unless you have a design with a slave node but this adds yet more complexity.)
* Resilient: if one or more nodes go down, the other nodes should not be affected and can continue to acquire locks (provided not more than `n/2 - 1` nodes are down).
* **Simple design**: by keeping the design simple, many tricky edge cases can be avoided.
* **No master node**: there is no concept of a master node which, if this would be used and the master would be down, causes locking to come to a complete stop. (Unless you have a design with a slave node but this adds yet more complexity.)
* **Resilient**: if one or more nodes go down, the other nodes should not be affected and can continue to acquire locks (provided not more than `n/2 - 1` nodes are down).
* Drop-in replacement for `sync.RWMutex` and supports [`sync.Locker`](https://github.com/golang/go/blob/master/src/sync/mutex.go#L30) interface.
* Automatically reconnect to (restarted) nodes.
* Compatible with `sync/mutex` API.
Restrictions
------------
* Limited scalability: up to 16 nodes.
* Fixed configuration: changes in the number and/or network names/IP addresses need a restart of all nodes in order to take effect.
* If a down node comes up, it will not in any way (re)acquire any locks that it may have held.
* Not designed for high performance applications such as key/value stores
* If a down node comes up, it will not try to (re)acquire any locks that it may have held.
* Not designed for high performance applications such as key/value stores.
Performance
-----------
* Lock requests (successful) should not take longer than 1ms (provided decent network connection of 1 Gbit or more between the nodes)
* Support up to 4000 locks per node per second.
* Scale linearly with the number of locks. For the maximum size case of 16 nodes this means a maximum of 64K locks/sec (and 2048K lock request & release messages/sec)
* Do not take more than (overall) 10% CPU usage
* Support up to a total of 7500 locks/second for maximum size of 16 nodes (consuming 10% CPU usage per server) on moderately powerful server hardware.
* Lock requests (successful) should not take longer than 1ms (provided decent network connection of 1 Gbit or more between the nodes).
Issues
------
The tables below show detailed performance numbers.
* In case the node that has the lock goes down, the lock release will not be broadcast: what do we do? (periodically ping 'back' to requesting node from all nodes that have the lock?) Or detect that the network connection has gone down.
* If one of the nodes that participated in the lock goes down, this is not a problem since (when it comes back online) the node that originally acquired the lock will still have it, and a request for a new lock will fail due to only `n/2` being available.
* If two nodes go down and both participated in the lock then there is a chance that a new lock will acquire locks from `n/2 + 1` nodes and will success, so we would have two concurrent locks. One way to counter this would be to monitor the network connections from the nodes that originated the lock, and, upon losing a connection to a node that granted a lock, get a new lock from a free node.
* When two nodes want to acquire the same lock, it is possible for both to just acquire `n` locks and there is no majority winner so both would fail (and presumably fail back to their clients?). This then requires a retry in order to acquire the lock at a later time.
* What if late acquire response still comes in after lock has been obtained (quorum is in) and has already been released again.
### Performance with varying number of nodes
Comparison to other techniques
------------------------------
This table shows test performance on the same (EC2) instance type but with a varying number of nodes:
We are well aware that there are more sophisticated systems such as zookeeper, raft, etc but we found that for our limited use case this was adding too much complexity. So if `dsync` does not meet your requirements than you are probably better off using one of those systems.
| EC2 Instance Type | Nodes | Locks/server/sec | Total Locks/sec | CPU Usage |
| -------------------- | -----:| --------------------:| ---------------:| ---------:|
| c3.2xlarge | 4 | (min=3110, max=3376) | 12972 | 25% |
| c3.2xlarge | 8 | (min=1884, max=2096) | 15920 | 25% |
| c3.2xlarge | 12 | (min=1239, max=1558) | 16782 | 25% |
| c3.2xlarge | 16 | (min=996, max=1391) | 19096 | 25% |
Performance
-----------
The mix and max locks/server/sec gradually declines but due to the larger number of nodes the overall total number of locks rises steadily (at the same CPU usage level).
```
benchmark old ns/op new ns/op delta
BenchmarkMutexUncontended-8 4.22 1164018 +27583264.93%
BenchmarkMutex-8 96.5 1223266 +1267533.16%
BenchmarkMutexSlack-8 120 1192900 +993983.33%
BenchmarkMutexWork-8 108 1239893 +1147949.07%
BenchmarkMutexWorkSlack-8 142 1210129 +852103.52%
BenchmarkMutexNoSpin-8 292 319479 +109310.62%
BenchmarkMutexSpin-8 1163 1270066 +109106.02%
```
### Performance with difference instance types
This table shows test performance for a fixed number of 8 nodes on different EC2 instance types:
| EC2 Instance Type | Nodes | Locks/server/sec | Total Locks/sec | CPU Usage |
| -------------------- | -----:| --------------------:| ---------------:| ---------:|
| c3.large (2 vCPU) | 8 | (min=823, max=896) | 6876 | 75% |
| c3.2xlarge (8 vCPU) | 8 | (min=1884, max=2096) | 15920 | 25% |
| c3.8xlarge (32 vCPU) | 8 | (min=2601, max=2898) | 21996 | 10% |
With the rise in the number of cores the CPU load decreases and overall performance increases.
### Stress test
Stress test on a c3.8xlarge (32 vCPU) instance type:
| EC2 Instance Type | Nodes | Locks/server/sec | Total Locks/sec | CPU Usage |
| -------------------- | -----:| --------------------:| ---------------:| ---------:|
| c3.8xlarge | 8 | (min=2601, max=2898) | 21996 | 10% |
| c3.8xlarge | 8 | (min=4756, max=5227) | 39932 | 20% |
| c3.8xlarge | 8 | (min=7979, max=8517) | 65984 | 40% |
| c3.8xlarge | 8 | (min=9267, max=9469) | 74944 | 50% |
The system can be pushed to 75K locks/sec at 50% CPU load.
Usage
-----
Explain usage
### Exclusive lock
Here is a simple example showing how to protect a single resource (drop-in replacement for `sync.Mutex`):
```
import (
"github.com/minio/dsync"
)
func lockSameResource() {
// Create distributed mutex to protect resource 'test'
dm := dsync.NewDRWMutex("test")
dm.Lock()
log.Println("first lock granted")
// Release 1st lock after 5 seconds
go func() {
time.Sleep(5 * time.Second)
log.Println("first lock unlocked")
dm.Unlock()
}()
// Try to acquire lock again, will block until initial lock is released
log.Println("about to lock same resource again...")
dm.Lock()
log.Println("second lock granted")
time.Sleep(2 * time.Second)
dm.Unlock()
}
```
which gives the following output:
```
2016/09/02 14:50:00 first lock granted
2016/09/02 14:50:00 about to lock same resource again...
2016/09/02 14:50:05 first lock unlocked
2016/09/02 14:50:05 second lock granted
```
### Read locks
DRWMutex also supports multiple simultaneous read locks as shown below (analogous to `sync.RWMutex`)
```
func twoReadLocksAndSingleWriteLock() {
drwm := dsync.NewDRWMutex("resource")
drwm.RLock()
log.Println("1st read lock acquired, waiting...")
drwm.RLock()
log.Println("2nd read lock acquired, waiting...")
go func() {
time.Sleep(1 * time.Second)
drwm.RUnlock()
log.Println("1st read lock released, waiting...")
}()
go func() {
time.Sleep(2 * time.Second)
drwm.RUnlock()
log.Println("2nd read lock released, waiting...")
}()
log.Println("Trying to acquire write lock, waiting...")
drwm.Lock()
log.Println("Write lock acquired, waiting...")
time.Sleep(3 * time.Second)
drwm.Unlock()
}
```
which gives the following output:
```
2016/09/02 15:05:20 1st read lock acquired, waiting...
2016/09/02 15:05:20 2nd read lock acquired, waiting...
2016/09/02 15:05:20 Trying to acquire write lock, waiting...
2016/09/02 15:05:22 1st read lock released, waiting...
2016/09/02 15:05:24 2nd read lock released, waiting...
2016/09/02 15:05:24 Write lock acquired, waiting...
```
Basic architecture
------------------
### Lock process
The basic steps in the lock process are as follows:
- broadcast lock message to all `n` nodes
- collect all responses within certain time-out window
- if quorum met (minimally `n/2 + 1` responded positively) then grant lock
- otherwise release all underlying locks and try again after a (semi-)random delay
- release any locks that (still) came in after time time-out window
### Unlock process
The unlock process is really simple:
- boardcast unlock message to all nodes that granted lock
- if a destination is not available, retry with gradually longer back-off window to still deliver
- ignore the 'result' (cover for cases where destination node has gone down and came back up)
Dealing with Stale Locks
------------------------
A 'stale' lock is a lock that is left at a node while the client that originally acquired the client either:
- never released the lock (due to eg a crash) or
- is disconnected from the network and henceforth not able to deliver the unlock message.
Too many stale locks can prevent a new lock on a resource from being acquired, that is, if the sum of the stale locks and the number of down nodes is greater than `n/2 - 1`. In `dsync` a recovery mechanism is implemented to remove stale locks (see [here](https://github.com/minio/dsync/pull/22#issue-176751755) for the details).
Known deficiencies
------------------
Known deficiencies can be divided into two categories, namely a) more than one write lock granted and b) lock not becoming available anymore.
### More than one write lock
So far we have identified one case during which this can happen (example for 8 node system):
- 3 nodes are down (say 6, 7, and 8)
- node 1 acquires a lock on "test" (nodes 1 through to 5 giving quorum)
- node 4 and 5 crash (dropping the lock)
- nodes 4 through to 8 restart
- node 4 acquires a lock on "test" (nodes 4 through to 8 giving quorum)
Now we have two concurrent locks on the same resource name which violates the core requirement. Note that if just a single server out of 4 or 5 crashes that we are still fine because the second lock cannot acquire quorum.
This table summarizes the conditions for different configurations during which this can happen:
| Nodes | Down nodes | Crashed nodes | Total nodes |
| -----:| ----------:| -------------:| -----------:|
| 4 | 1 | 2 | 3 |
| 8 | 3 | 2 | 5 |
| 12 | 5 | 2 | 7 |
| 16 | 7 | 2 | 9 |
(for more info see `testMultipleServersOverQuorumDownDuringLockKnownError` in [chaos.go](https://github.com/minio/dsync/blob/master/chaos/chaos.go))
### Lock not available anymore
This would be due to too many stale locks and/or too many servers down (total over `n/2 - 1`). The following table shows the maximum toterable number for different node sizes:
| Nodes | Max tolerable |
| -----:| -------------:|
| 4 | 1 |
| 8 | 3 |
| 12 | 5 |
| 16 | 7 |
If you see any other short comings, we would be interested in hearing about them.
Tackled issues
--------------
* When two nodes want to acquire the same lock at precisely the same time, it is possible for both to just acquire `n/2` locks and there is no majority winner. Both will fail back to their clients and will retry later after a semi-randomized delay.
Server side logic
-----------------
On the server side just the following logic needs to be added (barring some extra error checking):
```
const WriteLock = -1
type lockServer struct {
mutex sync.Mutex
lockMap map[string]int64 // Map of locks, with negative value indicating (exclusive) write lock
// and positive values indicating number of read locks
}
func (l *lockServer) Lock(args *LockArgs, reply *bool) error {
l.mutex.Lock()
defer l.mutex.Unlock()
if _, *reply = l.lockMap[args.Name]; !*reply {
l.lockMap[args.Name] = WriteLock // No locks held on the given name, so claim write lock
}
*reply = !*reply // Negate *reply to return true when lock is granted or false otherwise
return nil
}
func (l *lockServer) Unlock(args *LockArgs, reply *bool) error {
l.mutex.Lock()
defer l.mutex.Unlock()
var locksHeld int64
if locksHeld, *reply = l.lockMap[args.Name]; !*reply { // No lock is held on the given name
return fmt.Errorf("Unlock attempted on an unlocked entity: %s", args.Name)
}
if *reply = locksHeld == WriteLock; !*reply { // Unless it is a write lock
return fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.Name, locksHeld)
}
delete(l.lockMap, args.Name) // Remove the write lock
return nil
}
```
If you also want RLock()/RUnlock() functionality, then add this as well:
```
const ReadLock = 1
func (l *lockServer) RLock(args *LockArgs, reply *bool) error {
l.mutex.Lock()
defer l.mutex.Unlock()
var locksHeld int64
if locksHeld, *reply = l.lockMap[args.Name]; !*reply {
l.lockMap[args.Name] = ReadLock // No locks held on the given name, so claim (first) read lock
*reply = true
} else {
if *reply = locksHeld != WriteLock; *reply { // Unless there is a write lock
l.lockMap[args.Name] = locksHeld + ReadLock // Grant another read lock
}
}
return nil
}
func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error {
l.mutex.Lock()
defer l.mutex.Unlock()
var locksHeld int64
if locksHeld, *reply = l.lockMap[args.Name]; !*reply { // No lock is held on the given name
return fmt.Errorf("RUnlock attempted on an unlocked entity: %s", args.Name)
}
if *reply = locksHeld != WriteLock; !*reply { // A write-lock is held, cannot release a read lock
return fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.Name)
}
if locksHeld > ReadLock {
l.lockMap[args.Name] = locksHeld - ReadLock // Remove one of the read locks held
} else {
delete(l.lockMap, args.Name) // Remove the (last) read lock
}
return nil
}
```
See [dsync-server_test.go](https://github.com/fwessels/dsync/blob/master/dsync-server_test.go) for a full implementation.
Sub projects
------------
* See [performance](https://github.com/minio/dsync/tree/master/performance) directory for performance measurements
* See [chaos](https://github.com/minio/dsync/tree/master/chaos) directory for some edge cases
Testing
-------
The full test code (including benchmarks) from `sync/rwmutex_test.go` is used for testing purposes.
Extensions / Other use cases
----------------------------
### Robustness vs Performance
It is possible to trade some level of robustness with overall performance by not contacting each node for every Lock()/Unlock() cycle. In the normal case (example for `n = 16` nodes) a total of 32 RPC messages is sent and the lock is granted if at least a quorum of `n/2 + 1` nodes respond positively. When all nodes are functioning normally this would mean `n = 16` positive responses and, in fact, `n/2 - 1 = 7` responses over the (minimum) quorum of `n/2 + 1 = 9`. So you could say that this is some overkill, meaning that even if 6 nodes are down you still have an extra node over the quorum.
For this case it is possible to reduce the number of nodes to be contacted to for example `12`. Instead of 32 RPC messages now 24 message will be sent which is 25% less. As the performance is mostly depending on the number of RPC messages sent, the total locks/second handled by all nodes would increase by 33% (given the same CPU load).
You do however want to make sure that you have some sort of 'random' selection of which 12 out of the 16 nodes will participate in every lock. See [here](https://gist.github.com/fwessels/dbbafd537c13ec8f88b360b3a0091ac0) for some sample code that could help with this.
### Scale beyond 16 nodes?
Building on the previous example and depending on how resilient you want to be for outages of nodes, you can also go the other way, namely to increase the total number of nodes while keeping the number of nodes contacted per lock the same.
For instance you could imagine a system of 32 nodes where only a quorom majority of `9` would be needed out of `12` nodes. Again this requires some sort of pseudo-random 'deterministic' selection of 12 nodes out of the total of 32 servers (same [example](https://gist.github.com/fwessels/dbbafd537c13ec8f88b360b3a0091ac0) as above).
Other techniques
----------------
We are well aware that there are more sophisticated systems such as zookeeper, raft, etc. However we found that for our limited use case this was adding too much complexity. So if `dsync` does not meet your requirements than you are probably better off using one of those systems.
Other links that you may find interesting:
- [Distributed locks with Redis](http://redis.io/topics/distlock)
- Based on the above: [Redis-based distributed mutual exclusion lock implementation for Go](https://github.com/hjr265/redsync.go)
Performance of `net/rpc` vs `grpc`
----------------------------------
We did an analysis of the performance of `net/rpc` vs `grpc`, see [here](https://github.com/golang/go/issues/16844#issuecomment-245261755), so we'll stick with `net/rpc` for now.
License
-------

View File

@ -17,6 +17,8 @@
package dsync
import (
cryptorand "crypto/rand"
"fmt"
"log"
"math"
"math/rand"
@ -39,20 +41,32 @@ const DRWMutexAcquireTimeout = 25 * time.Millisecond // 25ms.
// A DRWMutex is a distributed mutual exclusion lock.
type DRWMutex struct {
Name string
locks []bool // Array of nodes that granted a lock
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
Name string
writeLocks []string // Array of nodes that granted a write lock
readersLocks [][]string // Array of array of nodes that granted reader locks
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
}
type Granted struct {
index int
locked bool
index int
lockUid string // Locked if set with UID string, unlocked if empty
}
func (g *Granted) isLocked() bool {
return isLocked(g.lockUid)
}
func isLocked(uid string) bool {
return len(uid) > 0
}
type LockArgs struct {
Token string
Timestamp time.Time
Name string
Node string
RPCPath string
UID string
}
func (l *LockArgs) SetToken(token string) {
@ -65,75 +79,59 @@ func (l *LockArgs) SetTimestamp(tstamp time.Time) {
func NewDRWMutex(name string) *DRWMutex {
return &DRWMutex{
Name: name,
locks: make([]bool, dnodeCount),
Name: name,
writeLocks: make([]string, dnodeCount),
}
}
// Lock holds a write lock on dm.
//
// If the lock is already in use, the calling go routine
// blocks until the mutex is available.
func (dm *DRWMutex) Lock() {
isReadLock := false
dm.lockBlocking(isReadLock)
}
// RLock holds a read lock on dm.
//
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
// If one or more read lock are already in use, it will grant another lock.
// Otherwise the calling go routine blocks until the mutex is available.
func (dm *DRWMutex) RLock() {
// Shield RLock() with local mutex in order to prevent more than
// one broadcast going out at the same time from this node
dm.m.Lock()
defer dm.m.Unlock()
runs, backOff := 1, 1
for {
// create temp arrays on stack
locks := make([]bool, dnodeCount)
// try to acquire the lock
isReadLock := true
success := lock(clnts, &locks, dm.Name, isReadLock)
if success {
// if success, copy array to object
copy(dm.locks, locks[:])
return
}
// We timed out on the previous lock, incrementally wait for a longer back-off time,
// and try again afterwards
time.Sleep(time.Duration(backOff) * time.Millisecond)
backOff += int(rand.Float64() * math.Pow(2, float64(runs)))
if backOff > 1024 {
backOff = backOff % 64
runs = 1 // reset runs
} else if runs < 10 {
runs++
}
}
isReadLock := true
dm.lockBlocking(isReadLock)
}
// Lock locks dm.
// lockBlocking will acquire either a read or a write lock
//
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (dm *DRWMutex) Lock() {
// Shield Lock() with local mutex in order to prevent more than
// one broadcast going out at the same time from this node
dm.m.Lock()
defer dm.m.Unlock()
// The call will block until the lock is granted using a built-in
// timing randomized back-off algorithm to try again until successful
func (dm *DRWMutex) lockBlocking(isReadLock bool) {
runs, backOff := 1, 1
for {
// create temp arrays on stack
locks := make([]bool, dnodeCount)
// create temp array on stack
locks := make([]string, dnodeCount)
// try to acquire the lock
isReadLock := false
success := lock(clnts, &locks, dm.Name, isReadLock)
if success {
dm.m.Lock()
defer dm.m.Unlock()
// if success, copy array to object
copy(dm.locks, locks[:])
if isReadLock {
// append new array of bools at the end
dm.readersLocks = append(dm.readersLocks, make([]string, dnodeCount))
// and copy stack array into last spot
copy(dm.readersLocks[len(dm.readersLocks)-1], locks[:])
} else {
copy(dm.writeLocks, locks[:])
}
return
}
@ -154,7 +152,7 @@ func (dm *DRWMutex) Lock() {
// lock tries to acquire the distributed lock, returning true or false
//
func lock(clnts []RPC, locks *[]bool, lockName string, isReadLock bool) bool {
func lock(clnts []RPC, locks *[]string, lockName string, isReadLock bool) bool {
// Create buffered channel of quorum size
ch := make(chan Granted, dnodeCount)
@ -166,21 +164,29 @@ func lock(clnts []RPC, locks *[]bool, lockName string, isReadLock bool) bool {
// All client methods issuing RPCs are thread-safe and goroutine-safe,
// i.e. it is safe to call them from multiple concurrently running go routines.
var locked bool
bytesUid := [16]byte{}
cryptorand.Read(bytesUid[:])
uid := fmt.Sprintf("%X", bytesUid[:])
args := LockArgs{Name: lockName, Node: clnts[ownNode].Node(), RPCPath: clnts[ownNode].RPCPath(), UID: uid}
if isReadLock {
if err := c.Call("Dsync.RLock", &LockArgs{Name: lockName}, &locked); err != nil {
if err := c.Call("Dsync.RLock", &args, &locked); err != nil {
if dsyncLog {
log.Println("Unable to call Dsync.RLock", err)
}
}
} else {
if err := c.Call("Dsync.Lock", &LockArgs{Name: lockName}, &locked); err != nil {
if err := c.Call("Dsync.Lock", &args, &locked); err != nil {
if dsyncLog {
log.Println("Unable to call Dsync.Lock", err)
}
}
}
ch <- Granted{index: index, locked: locked}
g := Granted{index: index}
if locked {
g.lockUid = args.UID
}
ch <- g
}(index, isReadLock, c)
}
@ -196,16 +202,16 @@ func lock(clnts []RPC, locks *[]bool, lockName string, isReadLock bool) bool {
done := false
timeout := time.After(DRWMutexAcquireTimeout)
for ; i < dnodeCount; i++ { // Loop until we acquired all locks
for ; i < dnodeCount; i++ { // Loop until we acquired all locks
select {
case grant := <-ch:
if grant.locked {
if grant.isLocked() {
// Mark that this node has acquired the lock
(*locks)[grant.index] = true
(*locks)[grant.index] = grant.lockUid
} else {
locksFailed++
if locksFailed > dnodeCount - dquorum {
if locksFailed > dnodeCount-dquorum {
// We know that we are not going to get the lock anymore, so exit out
// and release any locks that did get acquired
done = true
@ -238,24 +244,31 @@ func lock(clnts []RPC, locks *[]bool, lockName string, isReadLock bool) bool {
// already has been unlocked again by the original calling thread)
for ; i < dnodeCount; i++ {
grantToBeReleased := <-ch
if grantToBeReleased.locked {
if grantToBeReleased.isLocked() {
// release lock
sendRelease(clnts[grantToBeReleased.index], lockName, isReadLock)
sendRelease(clnts[grantToBeReleased.index], lockName, grantToBeReleased.lockUid, isReadLock)
}
}
}(isReadLock)
wg.Wait()
// Verify that localhost server is actively participating in the lock (the lock maintenance relies on this fact)
if quorum && !isLocked((*locks)[ownNode]) {
// If not, release lock (and try again later)
releaseAll(clnts, locks, lockName, isReadLock)
quorum = false
}
return quorum
}
// quorumMet determines whether we have acquired n/2+1 underlying locks or not
func quorumMet(locks *[]bool) bool {
func quorumMet(locks *[]string) bool {
count := 0
for _, locked := range *locks {
if locked {
for _, uid := range *locks {
if isLocked(uid) {
count++
}
}
@ -264,65 +277,89 @@ func quorumMet(locks *[]bool) bool {
}
// releaseAll releases all locks that are marked as locked
func releaseAll(clnts []RPC, locks *[]bool, lockName string, isReadLock bool) {
func releaseAll(clnts []RPC, locks *[]string, lockName string, isReadLock bool) {
for lock := 0; lock < dnodeCount; lock++ {
if (*locks)[lock] {
sendRelease(clnts[lock], lockName, isReadLock)
(*locks)[lock] = false
if isLocked((*locks)[lock]) {
sendRelease(clnts[lock], lockName, (*locks)[lock], isReadLock)
(*locks)[lock] = ""
}
}
}
// Unlock unlocks the write lock.
//
// It is a run-time error if dm is not locked on entry to Unlock.
func (dm *DRWMutex) Unlock() {
// create temp array on stack
locks := make([]string, dnodeCount)
{
dm.m.Lock()
defer dm.m.Unlock()
// Check if minimally a single bool is set in the writeLocks array
lockFound := false
for _, uid := range dm.writeLocks {
if isLocked(uid) {
lockFound = true
break
}
}
if !lockFound {
panic("Trying to Unlock() while no Lock() is active")
}
// Copy writelocks to stack array
copy(locks, dm.writeLocks[:])
}
isReadLock := false
unlock(&locks, dm.Name, isReadLock)
}
// RUnlock releases a read lock held on dm.
//
// It is a run-time error if dm is not locked on entry to RUnlock.
func (dm *DRWMutex) RUnlock() {
// We don't panic like sync.Mutex, when an unlock is issued on an
// un-locked lock, since the lock rpc server may have restarted and
// "forgotten" about the lock.
// We don't need to wait until we have released all the locks (or the quorum)
// (a subsequent lock will retry automatically in case it would fail to get
// quorum)
for index, c := range clnts {
// create temp array on stack
locks := make([]string, dnodeCount)
if dm.locks[index] {
// broadcast lock release to all nodes the granted the lock
isReadLock := true
sendRelease(c, dm.Name, isReadLock)
dm.locks[index] = false
{
dm.m.Lock()
defer dm.m.Unlock()
if len(dm.readersLocks) == 0 {
panic("Trying to RUnlock() while no RLock() is active")
}
// Copy out first element to release it first (FIFO)
copy(locks, dm.readersLocks[0][:])
// Drop first element from array
dm.readersLocks = dm.readersLocks[1:]
}
isReadLock := true
unlock(&locks, dm.Name, isReadLock)
}
// Unlock unlocks dm.
//
// It is a run-time error if dm is not locked on entry to Unlock.
func (dm *DRWMutex) Unlock() {
func unlock(locks *[]string, name string, isReadLock bool) {
// We don't panic like sync.Mutex, when an unlock is issued on an
// un-locked lock, since the lock rpc server may have restarted and
// "forgotten" about the lock.
// We don't need to synchronously wait until we have released all the locks (or the quorum)
// (a subsequent lock will retry automatically in case it would fail to get quorum)
// We don't need to wait until we have released all the locks (or the quorum)
// (a subsequent lock will retry automatically in case it would fail to get
// quorum)
for index, c := range clnts {
if dm.locks[index] {
if isLocked((*locks)[index]) {
// broadcast lock release to all nodes the granted the lock
isReadLock := false
sendRelease(c, dm.Name, isReadLock)
sendRelease(c, name, (*locks)[index], isReadLock)
dm.locks[index] = false
(*locks)[index] = ""
}
}
}
// sendRelease sends a release message to a node that previously granted a lock
func sendRelease(c RPC, name string, isReadLock bool) {
func sendRelease(c RPC, name, uid string, isReadLock bool) {
backOffArray := []time.Duration{
30 * time.Second, // 30secs.
@ -340,9 +377,9 @@ func sendRelease(c RPC, name string, isReadLock bool) {
// All client methods issuing RPCs are thread-safe and goroutine-safe,
// i.e. it is safe to call them from multiple concurrently running goroutines.
var unlocked bool
args := LockArgs{Name: name, UID: uid} // Just send name & uid (and leave out node and rpcPath; unimportant for unlocks)
if isReadLock {
if err := c.Call("Dsync.RUnlock", &LockArgs{Name: name}, &unlocked); err == nil {
if err := c.Call("Dsync.RUnlock", &args, &unlocked); err == nil {
// RUnlock delivered, exit out
return
} else if err != nil {
@ -355,7 +392,7 @@ func sendRelease(c RPC, name string, isReadLock bool) {
}
}
} else {
if err := c.Call("Dsync.Unlock", &LockArgs{Name: name}, &unlocked); err == nil {
if err := c.Call("Dsync.Unlock", &args, &unlocked); err == nil {
// Unlock delivered, exit out
return
} else if err != nil {
@ -374,3 +411,14 @@ func sendRelease(c RPC, name string, isReadLock bool) {
}
}(c, name)
}
// DRLocker returns a sync.Locker interface that implements
// the Lock and Unlock methods by calling drw.RLock and drw.RUnlock.
func (dm *DRWMutex) DRLocker() sync.Locker {
return (*drlocker)(dm)
}
type drlocker DRWMutex
func (dr *drlocker) Lock() { (*DRWMutex)(dr).RLock() }
func (dr *drlocker) Unlock() { (*DRWMutex)(dr).RUnlock() }

View File

@ -29,13 +29,16 @@ var dnodeCount int
// List of rpc client objects, one per lock server.
var clnts []RPC
// Index into rpc client array for server running on localhost
var ownNode int
// Simple majority based quorum, set to dNodeCount/2+1
var dquorum int
// SetNodesWithPath - initializes package-level global state variables such as clnts.
// N B - This function should be called only once inside any program that uses
// dsync.
func SetNodesWithClients(rpcClnts []RPC) (err error) {
func SetNodesWithClients(rpcClnts []RPC, rpcOwnNode int) (err error) {
// Validate if number of nodes is within allowable range.
if dnodeCount != 0 {
@ -46,10 +49,17 @@ func SetNodesWithClients(rpcClnts []RPC) (err error) {
return errors.New("Dsync not designed for more than 16 nodes")
}
if rpcOwnNode > len(rpcClnts) {
return errors.New("Index for own node is too large")
}
dnodeCount = len(rpcClnts)
dquorum = dnodeCount/2 + 1
// Initialize node name and rpc path for each RPCClient object.
clnts = make([]RPC, dnodeCount)
copy(clnts, rpcClnts)
ownNode = rpcOwnNode
return nil
}

View File

@ -24,5 +24,7 @@ type RPC interface {
SetToken(token string)
SetTimestamp(tstamp time.Time)
}, reply interface{}) error
Node() string
RPCPath() string
Close() error
}

6
vendor/vendor.json vendored
View File

@ -98,10 +98,10 @@
"revisionTime": "2015-11-18T20:00:48-08:00"
},
{
"checksumSHA1": "0raNaLP/AhxXhEeB5CdSnbED3O4=",
"checksumSHA1": "0wc4gxRamjyjVSiZF73KIaCRC6k=",
"path": "github.com/minio/dsync",
"revision": "1f615ccd013d35489becfe710e0ba7dce98b59e5",
"revisionTime": "2016-08-29T17:06:27Z"
"revision": "531818cd6458e24e30eec2c67136a8bcb24520af",
"revisionTime": "2016-09-15T18:37:01Z"
},
{
"path": "github.com/minio/go-homedir",