lock: Fix support single node XL locking as well.

This commit is contained in:
Harshavardhana 2016-08-29 16:13:54 -07:00
parent f655592ff5
commit bca1385683

View File

@ -109,7 +109,7 @@ func (n *nsLockMap) lock(volume, path string, readLock bool) {
} }
nsLk.ref++ // Update ref count here to avoid multiple races. nsLk.ref++ // Update ref count here to avoid multiple races.
rwlock := nsLk.writer rwlock := nsLk.writer
if readLock { if readLock && n.isDist {
rwlock = dsync.NewDRWMutex(pathutil.Join(volume, path)) rwlock = dsync.NewDRWMutex(pathutil.Join(volume, path))
} }
// Unlock map before Locking NS which might block. // Unlock map before Locking NS which might block.
@ -119,6 +119,7 @@ func (n *nsLockMap) lock(volume, path string, readLock bool) {
if readLock { if readLock {
rwlock.RLock() rwlock.RLock()
if n.isDist {
// Only add (for reader case) to array after RLock() succeeds // Only add (for reader case) to array after RLock() succeeds
// (so that we know for sure that element in [0] can be RUnlocked()) // (so that we know for sure that element in [0] can be RUnlocked())
n.lockMapMutex.Lock() n.lockMapMutex.Lock()
@ -128,7 +129,7 @@ func (n *nsLockMap) lock(volume, path string, readLock bool) {
nsLk.readerArray = append(nsLk.readerArray, rwlock) nsLk.readerArray = append(nsLk.readerArray, rwlock)
} }
n.lockMapMutex.Unlock() n.lockMapMutex.Unlock()
}
} else { } else {
rwlock.Lock() rwlock.Lock()
} }
@ -143,6 +144,7 @@ func (n *nsLockMap) unlock(volume, path string, readLock bool) {
param := nsParam{volume, path} param := nsParam{volume, path}
if nsLk, found := n.lockMap[param]; found { if nsLk, found := n.lockMap[param]; found {
if readLock { if readLock {
if n.isDist {
if len(nsLk.readerArray) == 0 { if len(nsLk.readerArray) == 0 {
errorIf(errors.New("Length of reader lock array cannot be 0."), "Invalid reader lock array length detected.") errorIf(errors.New("Length of reader lock array cannot be 0."), "Invalid reader lock array length detected.")
} }
@ -150,6 +152,9 @@ func (n *nsLockMap) unlock(volume, path string, readLock bool) {
nsLk.readerArray[0].RUnlock() nsLk.readerArray[0].RUnlock()
// And discard first element // And discard first element
nsLk.readerArray = nsLk.readerArray[1:] nsLk.readerArray = nsLk.readerArray[1:]
} else {
nsLk.writer.RUnlock()
}
} else { } else {
nsLk.writer.Unlock() nsLk.writer.Unlock()
} }
@ -160,7 +165,7 @@ func (n *nsLockMap) unlock(volume, path string, readLock bool) {
nsLk.ref-- nsLk.ref--
} }
if nsLk.ref == 0 { if nsLk.ref == 0 {
if len(nsLk.readerArray) != 0 { if len(nsLk.readerArray) != 0 && n.isDist {
errorIf(errors.New("Length of reader lock array should be 0 upon deleting map entry."), "Invalid reader lock array length detected.") errorIf(errors.New("Length of reader lock array should be 0 upon deleting map entry."), "Invalid reader lock array length detected.")
} }