Vendorize minio/dsync for server-side read lock (#2484)

- Prevention of stale lock accumulation.
- Removal of dead code.
This commit is contained in:
Krishnan Parthasarathi 2016-08-17 18:44:33 -07:00 committed by Harshavardhana
parent 1f67c18222
commit c33d1b8ee6
5 changed files with 335 additions and 421 deletions

View File

@ -1,319 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dsync
import (
"log"
"math"
"math/rand"
"net/rpc"
"sync"
"time"
)
const DMutexAcquireTimeout = 25 * time.Millisecond
// A DMutex is a distributed mutual exclusion lock.
type DMutex struct {
Name string
locks []bool // Array of nodes that granted a lock
uids []string // Array of uids for verification of sending correct release messages
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
}
type Granted struct {
index int
locked bool
uid string
}
// Connect to respective lock server nodes on the first Lock() call.
func connectLazy(dm *DMutex) {
if clnts == nil {
panic("rpc client connections weren't initialized.")
}
for i := range clnts {
if clnts[i].rpc != nil {
continue
}
// Pass in unique path (as required by server.HandleHTTP().
// Ignore failure to connect, the lock server node may join the
// cluster later.
clnt, err := rpc.DialHTTPPath("tcp", nodes[i], rpcPaths[i])
if err != nil {
clnts[i].SetRPC(nil)
continue
}
clnts[i].SetRPC(clnt)
}
}
// Lock locks dm.
//
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (dm *DMutex) Lock() {
// Shield Lock() with local mutex in order to prevent more than
// one broadcast going out at the same time from this node
dm.m.Lock()
defer dm.m.Unlock()
runs, backOff := 1, 1
for {
connectLazy(dm)
// create temp arrays on stack
locks := make([]bool, n)
ids := make([]string, n)
// try to acquire the lock
success := lock(clnts, &locks, &ids, dm.Name)
if success {
// if success, copy array to object
dm.locks = make([]bool, n)
copy(dm.locks, locks[:])
dm.uids = make([]string, n)
copy(dm.uids, ids[:])
return
}
// We timed out on the previous lock, incrementally wait for a longer back-off time,
// and try again afterwards
time.Sleep(time.Duration(backOff) * time.Millisecond)
backOff += int(rand.Float64() * math.Pow(2, float64(runs)))
if backOff > 1024 {
backOff = backOff % 64
runs = 1 // reset runs
} else if runs < 10 {
runs++
}
}
}
func (dm *DMutex) tryLockTimeout() bool {
// Shield Lock() with local mutex in order to prevent more than
// one broadcast going out at the same time from this node
dm.m.Lock()
defer dm.m.Unlock()
// TODO: Implement reconnect
connectLazy(dm)
// create temp arrays on stack
locks := make([]bool, n)
ids := make([]string, n)
// try to acquire the lock
success := lock(clnts, &locks, &ids, dm.Name)
if success {
// if success, copy array to object
dm.locks = make([]bool, n)
copy(dm.locks, locks[:])
dm.uids = make([]string, n)
copy(dm.uids, ids[:])
}
return success
}
// lock tries to acquire the distributed lock, returning true or false
//
func lock(clnts []*RPCClient, locks *[]bool, uids *[]string, lockName string) bool {
// Create buffered channel of quorum size
ch := make(chan Granted, n/2+1)
for index, c := range clnts {
// broadcast lock request to all nodes
go func(index int, c *RPCClient) {
// All client methods issuing RPCs are thread-safe and goroutine-safe,
// i.e. it is safe to call them from multiple concurrently running go routines.
var status bool
err := c.Call("Dsync.Lock", lockName, &status)
locked, uid := false, ""
if err == nil {
locked = status
// TODO: Get UIOD again
uid = ""
} else {
// If rpc call failed due to connection related errors, reset rpc.Client object
// to trigger reconnect on subsequent Lock()/Unlock() requests to the same node.
if IsRPCError(err) {
clnts[index].SetRPC(nil)
}
// silently ignore error, retry later
}
ch <- Granted{index: index, locked: locked, uid: uid}
}(index, c)
}
var wg sync.WaitGroup
wg.Add(1)
quorum := false
go func() {
// Wait until we have received (minimally) quorum number of responses or timeout
i := 0
done := false
timeout := time.After(DMutexAcquireTimeout)
for ; i < n; i++ {
select {
case grant := <-ch:
if grant.locked {
// Mark that this node has acquired the lock
(*locks)[grant.index] = true
(*uids)[grant.index] = grant.uid
} else {
done = true
//fmt.Println("one lock failed before quorum -- release locks acquired")
releaseAll(clnts, locks, uids, lockName)
}
case <-timeout:
done = true
// timeout happened, maybe one of the nodes is slow, count
// number of locks to check whether we have quorum or not
if !quorumMet(locks) {
//fmt.Println("timed out -- release locks acquired")
releaseAll(clnts, locks, uids, lockName)
}
}
if done {
break
}
}
// Count locks in order to determine whterh we have quorum or not
quorum = quorumMet(locks)
// Signal that we have the quorum
wg.Done()
// Wait for the other responses and immediately release the locks
// (do not add them to the locks array because the DMutex could
// already has been unlocked again by the original calling thread)
for ; i < n; i++ {
grantToBeReleased := <-ch
if grantToBeReleased.locked {
// release lock
go sendRelease(clnts[grantToBeReleased.index], lockName, grantToBeReleased.uid)
}
}
}()
wg.Wait()
return quorum
}
// quorumMet determines whether we have acquired n/2+1 underlying locks or not
func quorumMet(locks *[]bool) bool {
count := 0
for _, locked := range *locks {
if locked {
count++
}
}
return count >= n/2+1
}
// releaseAll releases all locks that are marked as locked
func releaseAll(clnts []*RPCClient, locks *[]bool, ids *[]string, lockName string) {
for lock := 0; lock < n; lock++ {
if (*locks)[lock] {
go sendRelease(clnts[lock], lockName, (*ids)[lock])
(*locks)[lock] = false
(*ids)[lock] = ""
}
}
}
// hasLock returns whether or not a node participated in granting the lock
func (dm *DMutex) hasLock(node string) bool {
for index, n := range nodes {
if n == node {
return dm.locks[index]
}
}
return false
}
// locked returns whether or not we have met the quorum
func (dm *DMutex) locked() bool {
locks := make([]bool, n)
copy(locks[:], dm.locks[:])
return quorumMet(&locks)
}
// Unlock unlocks dm.
//
// It is a run-time error if dm is not locked on entry to Unlock.
func (dm *DMutex) Unlock() {
// Verify that we have the lock or panic otherwise (similar to sync.mutex)
if !dm.locked() {
panic("dsync: unlock of unlocked distributed mutex")
}
// We don't need to wait until we have released all the locks (or the quorum)
// (a subsequent lock will retry automatically in case it would fail to get
// quorum)
for index, c := range clnts {
if dm.locks[index] {
// broadcast lock release to all nodes the granted the lock
go sendRelease(c, dm.Name, dm.uids[index])
dm.locks[index] = false
}
}
}
// sendRelease sends a release message to a node that previously granted a lock
func sendRelease(c *RPCClient, name, uid string) {
// All client methods issuing RPCs are thread-safe and goroutine-safe,
// i.e. it is safe to call them from multiple concurrently running goroutines.
var status bool
// TODO: Send UID to server
if err := c.Call("Dsync.Unlock", name, &status); err != nil {
log.Fatal("Unlock on %s failed on client %v", name, c)
}
}

View File

@ -17,127 +17,360 @@
package dsync
import (
"fmt"
"math"
"math/rand"
"net/rpc"
"sync"
"time"
)
const maxReaders = 8
const DRWMutexAcquireTimeout = 25 * time.Millisecond
// A DRWMutex is a distributed mutual exclusion lock.
type DRWMutex struct {
rArray []*DMutex
rLockedArray []bool
w DMutex // held if there are pending writers
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
m2 sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
Name string
locks []bool // Array of nodes that granted a lock
uids []string // Array of uids for verification of sending correct release messages
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
}
func NewDRWMutex(name string) (drw *DRWMutex) {
rArray := make([]*DMutex, maxReaders)
rLockedArray := make([]bool, maxReaders)
for r := 0; r < maxReaders; r++ {
rArray[r] = &DMutex{Name: fmt.Sprintf("%s-r%d", name, r)}
}
type Granted struct {
index int
locked bool
uid string
}
func NewDRWMutex(name string) *DRWMutex {
return &DRWMutex{
rArray: rArray,
rLockedArray: rLockedArray,
w: DMutex{Name: name + "-w"}}
Name: name,
locks: make([]bool, dnodeCount),
uids: make([]string, dnodeCount),
}
}
// RLock locks drw for reading.
func (drw *DRWMutex) RLock() {
// Connect to respective lock server nodes on the first Lock() call.
func connectLazy() {
if clnts == nil {
panic("rpc client connections weren't initialized.")
}
for i := range clnts {
if clnts[i].rpc != nil {
continue
}
drw.m.Lock()
defer drw.m.Unlock()
// Pass in unique path (as required by server.HandleHTTP().
// Ignore failure to connect, the lock server node may join the
// cluster later.
clnt, err := rpc.DialHTTPPath("tcp", nodes[i], rpcPaths[i])
if err != nil {
clnts[i].SetRPC(nil)
continue
}
clnts[i].SetRPC(clnt)
}
}
// Check if no write is active, block otherwise
// Can skip this?
drw.w.Lock()
drw.w.Unlock()
// RLock holds a read lock on dm.
//
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (dm *DRWMutex) RLock() {
// Shield RLock() with local mutex in order to prevent more than
// one broadcast going out at the same time from this node
dm.m.Lock()
defer dm.m.Unlock()
// Lock either one of the reader locks
for i := 0; ; i++ {
drw.rLockedArray[i%maxReaders] = drw.rArray[i%maxReaders].tryLockTimeout()
if drw.rLockedArray[i%maxReaders] {
runs, backOff := 1, 1
for {
connectLazy()
// create temp arrays on stack
locks := make([]bool, dnodeCount)
ids := make([]string, dnodeCount)
// try to acquire the lock
isReadLock := true
success := lock(clnts, &locks, &ids, dm.Name, isReadLock)
if success {
// if success, copy array to object
copy(dm.locks, locks[:])
copy(dm.uids, ids[:])
return
}
}
}
// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (drw *DRWMutex) RUnlock() {
// We timed out on the previous lock, incrementally wait for a longer back-off time,
// and try again afterwards
time.Sleep(time.Duration(backOff) * time.Millisecond)
drw.m.Lock()
defer drw.m.Unlock()
backOff += int(rand.Float64() * math.Pow(2, float64(runs)))
if backOff > 1024 {
backOff = backOff % 64
// Unlock whichever readlock that was acquired)
for r := 0; r < maxReaders; r++ {
if drw.rLockedArray[r] {
drw.rArray[r].Unlock()
drw.rLockedArray[r] = false
// we only want to release a single read lock at a time
break
runs = 1 // reset runs
} else if runs < 10 {
runs++
}
}
}
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
// To ensure that the lock eventually becomes available,
// a blocked Lock call excludes new readers from acquiring
// the lock.
func (drw *DRWMutex) Lock() {
// Lock locks dm.
//
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (dm *DRWMutex) Lock() {
drw.m.Lock()
defer drw.m.Unlock()
// Shield Lock() with local mutex in order to prevent more than
// one broadcast going out at the same time from this node
dm.m.Lock()
defer dm.m.Unlock()
// First, resolve competition with other writers.
drw.w.Lock()
runs, backOff := 1, 1
// Acquire all read locks.
var wg sync.WaitGroup
wg.Add(maxReaders)
for {
connectLazy()
for r := 0; r < maxReaders; r++ {
go func(r int) {
defer wg.Done()
drw.rArray[r].Lock()
drw.rLockedArray[r] = true
}(r)
// create temp arrays on stack
locks := make([]bool, dnodeCount)
ids := make([]string, dnodeCount)
// try to acquire the lock
isReadLock := false
success := lock(clnts, &locks, &ids, dm.Name, isReadLock)
if success {
// if success, copy array to object
copy(dm.locks, locks[:])
copy(dm.uids, ids[:])
return
}
// We timed out on the previous lock, incrementally wait for a longer back-off time,
// and try again afterwards
time.Sleep(time.Duration(backOff) * time.Millisecond)
backOff += int(rand.Float64() * math.Pow(2, float64(runs)))
if backOff > 1024 {
backOff = backOff % 64
runs = 1 // reset runs
} else if runs < 10 {
runs++
}
}
}
// lock tries to acquire the distributed lock, returning true or false
//
func lock(clnts []*RPCClient, locks *[]bool, uids *[]string, lockName string, isReadLock bool) bool {
// Create buffered channel of quorum size
ch := make(chan Granted, dquorum)
for index, c := range clnts {
// broadcast lock request to all nodes
go func(index int, isReadLock bool, c *RPCClient) {
// All client methods issuing RPCs are thread-safe and goroutine-safe,
// i.e. it is safe to call them from multiple concurrently running go routines.
var status bool
var err error
if isReadLock {
err = c.Call("Dsync.RLock", lockName, &status)
} else {
err = c.Call("Dsync.Lock", lockName, &status)
}
locked, uid := false, ""
if err == nil {
locked = status
// TODO: Get UIOD again
uid = ""
} else {
// If rpc call failed due to connection related errors, reset rpc.Client object
// to trigger reconnect on subsequent Lock()/Unlock() requests to the same node.
if IsRPCError(err) {
clnts[index].SetRPC(nil)
}
// silently ignore error, retry later
}
ch <- Granted{index: index, locked: locked, uid: uid}
}(index, isReadLock, c)
}
var wg sync.WaitGroup
wg.Add(1)
quorum := false
go func(isReadLock bool) {
// Wait until we have received (minimally) quorum number of responses or timeout
i := 0
done := false
timeout := time.After(DRWMutexAcquireTimeout)
for ; i < dnodeCount; i++ {
select {
case grant := <-ch:
if grant.locked {
// Mark that this node has acquired the lock
(*locks)[grant.index] = true
(*uids)[grant.index] = grant.uid
} else {
done = true
//fmt.Println("one lock failed before quorum -- release locks acquired")
releaseAll(clnts, locks, uids, lockName, isReadLock)
}
case <-timeout:
done = true
// timeout happened, maybe one of the nodes is slow, count
// number of locks to check whether we have quorum or not
if !quorumMet(locks) {
//fmt.Println("timed out -- release locks acquired")
releaseAll(clnts, locks, uids, lockName, isReadLock)
}
}
if done {
break
}
}
// Count locks in order to determine whterh we have quorum or not
quorum = quorumMet(locks)
// Signal that we have the quorum
wg.Done()
// Wait for the other responses and immediately release the locks
// (do not add them to the locks array because the DRWMutex could
// already has been unlocked again by the original calling thread)
for ; i < dnodeCount; i++ {
grantToBeReleased := <-ch
if grantToBeReleased.locked {
// release lock
sendRelease(clnts[grantToBeReleased.index], lockName, grantToBeReleased.uid, isReadLock)
}
}
}(isReadLock)
wg.Wait()
return quorum
}
// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) an RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (drw *DRWMutex) Unlock() {
// quorumMet determines whether we have acquired n/2+1 underlying locks or not
func quorumMet(locks *[]bool) bool {
drw.m.Lock()
defer drw.m.Unlock()
for r := 0; r < maxReaders; r++ {
if !drw.rLockedArray[r] {
panic("dsync: unlock of unlocked distributed rwmutex")
count := 0
for _, locked := range *locks {
if locked {
count++
}
}
// Unlock all read locks
for r := 0; r < maxReaders; r++ {
drw.rArray[r].Unlock()
drw.rLockedArray[r] = false
return count >= dquorum
}
// releaseAll releases all locks that are marked as locked
func releaseAll(clnts []*RPCClient, locks *[]bool, ids *[]string, lockName string, isReadLock bool) {
for lock := 0; lock < dnodeCount; lock++ {
if (*locks)[lock] {
sendRelease(clnts[lock], lockName, (*ids)[lock], isReadLock)
(*locks)[lock] = false
(*ids)[lock] = ""
}
}
// Allow other writers to proceed.
drw.w.Unlock()
}
// RUnlock releases a read lock held on dm.
//
// It is a run-time error if dm is not locked on entry to RUnlock.
func (dm *DRWMutex) RUnlock() {
// We don't panic like sync.Mutex, when an unlock is issued on an
// un-locked lock, since the lock rpc server may have restarted and
// "forgotten" about the lock.
// We don't need to wait until we have released all the locks (or the quorum)
// (a subsequent lock will retry automatically in case it would fail to get
// quorum)
for index, c := range clnts {
if dm.locks[index] {
// broadcast lock release to all nodes the granted the lock
isReadLock := true
sendRelease(c, dm.Name, dm.uids[index], isReadLock)
dm.locks[index] = false
}
}
}
// Unlock unlocks dm.
//
// It is a run-time error if dm is not locked on entry to Unlock.
func (dm *DRWMutex) Unlock() {
// We don't panic like sync.Mutex, when an unlock is issued on an
// un-locked lock, since the lock rpc server may have restarted and
// "forgotten" about the lock.
// We don't need to wait until we have released all the locks (or the quorum)
// (a subsequent lock will retry automatically in case it would fail to get
// quorum)
for index, c := range clnts {
if dm.locks[index] {
// broadcast lock release to all nodes the granted the lock
isReadLock := false
sendRelease(c, dm.Name, dm.uids[index], isReadLock)
dm.locks[index] = false
}
}
}
// sendRelease sends a release message to a node that previously granted a lock
func sendRelease(c *RPCClient, name, uid string, isReadLock bool) {
backOffArray := []time.Duration{30 * time.Second, 1 * time.Minute, 3 * time.Minute, 10 * time.Minute, 30 * time.Minute, 1 * time.Hour}
go func(c *RPCClient, name, uid string) {
for _, backOff := range backOffArray {
// Make sure we are connected
connectLazy()
// All client methods issuing RPCs are thread-safe and goroutine-safe,
// i.e. it is safe to call them from multiple concurrently running goroutines.
var status bool
var err error
// TODO: Send UID to server
if isReadLock {
if err = c.Call("Dsync.RUnlock", name, &status); err == nil {
// RUnlock delivered, exit out
return
}
} else {
if err = c.Call("Dsync.Unlock", name, &status); err == nil {
// Unlock delivered, exit out
return
}
}
// If rpc call failed due to connection related errors, reset rpc.Client object
// to trigger reconnect on subsequent Lock()/Unlock() requests to the same node.
c.SetRPC(nil)
// wait
time.Sleep(backOff)
}
}(c, name, uid)
}

View File

@ -23,16 +23,20 @@ const DebugPath = "/debug"
const DefaultPath = "/rpc/dsync"
var n int
// Number of nodes participating in the distributed locking.
var dnodeCount int
// List of nodes participating.
var nodes []string
// List of rpc paths, one per lock server.
var rpcPaths []string
// List of rpc client objects, one per lock server.
var clnts []*RPCClient
func closeClients(clients []*RPCClient) {
for _, clnt := range clients {
clnt.Close()
}
}
// Simple majority based quorum, set to dNodeCount/2+1
var dquorum int
// SetNodesWithPath - initializes package-level global state variables such as
// nodes, rpcPaths, clnts.
@ -41,7 +45,7 @@ func closeClients(clients []*RPCClient) {
func SetNodesWithPath(nodeList []string, paths []string) (err error) {
// Validate if number of nodes is within allowable range.
if n != 0 {
if dnodeCount != 0 {
return errors.New("Cannot reinitialize dsync package")
} else if len(nodeList) < 4 {
return errors.New("Dsync not designed for less than 4 nodes")
@ -53,8 +57,9 @@ func SetNodesWithPath(nodeList []string, paths []string) (err error) {
copy(nodes, nodeList[:])
rpcPaths = make([]string, len(paths))
copy(rpcPaths, paths[:])
n = len(nodes)
clnts = make([]*RPCClient, n)
dnodeCount = len(nodes)
dquorum = dnodeCount/2 + 1
clnts = make([]*RPCClient, dnodeCount)
// Initialize node name and rpc path for each RPCClient object.
for i := range clnts {
clnts[i] = newClient(nodes[i], rpcPaths[i])

View File

@ -43,11 +43,6 @@ func (rpcClient *RPCClient) SetRPC(rpc *rpc.Client) {
defer rpcClient.Unlock()
rpcClient.rpc = rpc
}
func (rpcClient *RPCClient) Close() error {
rpcClient.Lock()
defer rpcClient.Unlock()
return rpcClient.rpc.Close()
}
func (rpcClient *RPCClient) Call(serviceMethod string, args interface{}, reply interface{}) error {
rpcClient.Lock()

6
vendor/vendor.json vendored
View File

@ -98,10 +98,10 @@
"revisionTime": "2015-11-18T20:00:48-08:00"
},
{
"checksumSHA1": "Ev8FdU+RSmpHQsLGzRpg5/ka7zE=",
"checksumSHA1": "kbVCnnU0gR/i8WA8Gs2I+/7kONY=",
"path": "github.com/minio/dsync",
"revision": "b26292b87d023da097193c8fe624d4a159e0fd03",
"revisionTime": "2016-08-11T06:53:13Z"
"revision": "8f4819554f1f4fffc2e1c8c706b23e5c844997f4",
"revisionTime": "2016-08-17T23:34:37Z"
},
{
"path": "github.com/minio/go-homedir",