New dsync and added ForceUnlock to lock rpc server (#2956)

* Update dsync and added ForceUnlock function
* Added test cases for ForceUnlock
This commit is contained in:
Frank
2016-10-17 10:53:29 +02:00
committed by Harshavardhana
parent d02cb963d5
commit ea406754a6
4 changed files with 131 additions and 14 deletions

View File

@@ -124,7 +124,7 @@ func (dm *DRWMutex) lockBlocking(isReadLock bool) {
// if success, copy array to object
if isReadLock {
// append new array of bools at the end
// append new array of strings at the end
dm.readersLocks = append(dm.readersLocks, make([]string, dnodeCount))
// and copy stack array into last spot
copy(dm.readersLocks[len(dm.readersLocks)-1], locks[:])
@@ -315,12 +315,14 @@ func (dm *DRWMutex) Unlock() {
panic("Trying to Unlock() while no Lock() is active")
}
// Copy writelocks to stack array
// Copy write locks to stack array
copy(locks, dm.writeLocks[:])
// Clear write locks array
dm.writeLocks = make([]string, dnodeCount)
}
isReadLock := false
unlock(&locks, dm.Name, isReadLock)
unlock(locks, dm.Name, isReadLock)
}
// RUnlock releases a read lock held on dm.
@@ -344,25 +346,42 @@ func (dm *DRWMutex) RUnlock() {
}
isReadLock := true
unlock(&locks, dm.Name, isReadLock)
unlock(locks, dm.Name, isReadLock)
}
func unlock(locks *[]string, name string, isReadLock bool) {
func unlock(locks []string, name string, isReadLock bool) {
// We don't need to synchronously wait until we have released all the locks (or the quorum)
// (a subsequent lock will retry automatically in case it would fail to get quorum)
for index, c := range clnts {
if isLocked((*locks)[index]) {
// broadcast lock release to all nodes the granted the lock
sendRelease(c, name, (*locks)[index], isReadLock)
(*locks)[index] = ""
if isLocked(locks[index]) {
// broadcast lock release to all nodes that granted the lock
sendRelease(c, name, locks[index], isReadLock)
}
}
}
// ForceUnlock will forcefully clear a write or read lock.
func (dm *DRWMutex) ForceUnlock() {
{
dm.m.Lock()
defer dm.m.Unlock()
// Clear write locks array
dm.writeLocks = make([]string, dnodeCount)
// Clear read locks array
dm.readersLocks = nil
}
for _, c := range clnts {
// broadcast lock release to all nodes that granted the lock
sendRelease(c, dm.Name, "", false)
}
}
// sendRelease sends a release message to a node that previously granted a lock
func sendRelease(c RPC, name, uid string, isReadLock bool) {
@@ -383,7 +402,20 @@ func sendRelease(c RPC, name, uid string, isReadLock bool) {
// i.e. it is safe to call them from multiple concurrently running goroutines.
var unlocked bool
args := LockArgs{Name: name, UID: uid} // Just send name & uid (and leave out node and rpcPath; unimportant for unlocks)
if isReadLock {
if len(uid) == 0 {
if err := c.Call("Dsync.ForceUnlock", &args, &unlocked); err == nil {
// ForceUnlock delivered, exit out
return
} else if err != nil {
if dsyncLog {
log.Println("Unable to call Dsync.ForceUnlock", err)
}
if nErr, ok := err.(net.Error); ok && nErr.Timeout() {
// ForceUnlock possibly failed with server timestamp mismatch, server may have restarted.
return
}
}
} else if isReadLock {
if err := c.Call("Dsync.RUnlock", &args, &unlocked); err == nil {
// RUnlock delivered, exit out
return