Hold locks granularly in nslockMap (#6242)

With benchmarks increases the performance for small files
by almost 4x times the previous releases.
This commit is contained in:
Harshavardhana 2018-08-05 20:25:25 -07:00 committed by Nitish Tiwari
parent eabfcea34e
commit 13fbb96736

View File

@ -133,31 +133,31 @@ type nsLockMap struct {
// Indicates if namespace is part of a distributed setup. // Indicates if namespace is part of a distributed setup.
isDistXL bool isDistXL bool
lockMap map[nsParam]*nsLock lockMap map[nsParam]*nsLock
lockMapMutex sync.Mutex lockMapMutex sync.RWMutex
} }
// Lock the namespace resource. // Lock the namespace resource.
func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock bool, timeout time.Duration) (locked bool) { func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock bool, timeout time.Duration) (locked bool) {
var nsLk *nsLock var nsLk *nsLock
n.lockMapMutex.Lock()
n.lockMapMutex.Lock()
param := nsParam{volume, path} param := nsParam{volume, path}
nsLk, found := n.lockMap[param] nsLk, found := n.lockMap[param]
if !found { if !found {
nsLk = &nsLock{ n.lockMap[param] = &nsLock{
RWLockerSync: func() RWLockerSync { RWLockerSync: func() RWLockerSync {
if n.isDistXL { if n.isDistXL {
return dsync.NewDRWMutex(pathJoin(volume, path), globalDsync) return dsync.NewDRWMutex(pathJoin(volume, path), globalDsync)
} }
return &lsync.LRWMutex{} return &lsync.LRWMutex{}
}(), }(),
ref: 0, ref: 1,
} }
n.lockMap[param] = nsLk nsLk = n.lockMap[param]
} else {
// Update ref count here to avoid multiple races.
nsLk.ref++
} }
nsLk.ref++ // Update ref count here to avoid multiple races.
// Unlock map before Locking NS which might block.
n.lockMapMutex.Unlock() n.lockMapMutex.Unlock()
// Locking here will block (until timeout). // Locking here will block (until timeout).
@ -168,44 +168,44 @@ func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock
} }
if !locked { // We failed to get the lock if !locked { // We failed to get the lock
// Decrement ref count since we failed to get the lock
n.lockMapMutex.Lock() n.lockMapMutex.Lock()
defer n.lockMapMutex.Unlock() nsLk.ref--
nsLk.ref-- // Decrement ref count since we failed to get the lock
if nsLk.ref == 0 { if nsLk.ref == 0 {
// Remove from the map if there are no more references. // Remove from the map if there are no more references.
delete(n.lockMap, param) delete(n.lockMap, param)
} }
n.lockMapMutex.Unlock()
} }
return return
} }
// Unlock the namespace resource. // Unlock the namespace resource.
func (n *nsLockMap) unlock(volume, path, opsID string, readLock bool) { func (n *nsLockMap) unlock(volume, path, opsID string, readLock bool) {
// nsLk.Unlock() will not block, hence locking the map for the
// entire function is fine.
n.lockMapMutex.Lock()
defer n.lockMapMutex.Unlock()
param := nsParam{volume, path} param := nsParam{volume, path}
if nsLk, found := n.lockMap[param]; found { n.lockMapMutex.RLock()
if readLock { nsLk, found := n.lockMap[param]
nsLk.RUnlock() n.lockMapMutex.RUnlock()
} else { if !found {
nsLk.Unlock() return
} }
if nsLk.ref == 0 { if readLock {
logger.LogIf(context.Background(), errors.New("Namespace reference count cannot be 0")) nsLk.RUnlock()
} } else {
if nsLk.ref != 0 { nsLk.Unlock()
nsLk.ref-- }
} n.lockMapMutex.Lock()
if nsLk.ref == 0 {
logger.LogIf(context.Background(), errors.New("Namespace reference count cannot be 0"))
} else {
nsLk.ref--
if nsLk.ref == 0 { if nsLk.ref == 0 {
// Remove from the map if there are no more references. // Remove from the map if there are no more references.
delete(n.lockMap, param) delete(n.lockMap, param)
} }
} }
n.lockMapMutex.Unlock()
} }
// Lock - locks the given resource for writes, using a previously // Lock - locks the given resource for writes, using a previously