lock: Fix Refresh logic with multi resources lock (#13092)

A multi resources lock is a single lock UID with multiple associated
resources. This is created for example by multi objects delete
operation. This commit changes the behavior of Refresh() to iterate over
all locks having the same UID and refresh them.

Bonus: Fix showing top locks for multi delete objects
This commit is contained in:
Anis Elleuch 2021-08-27 21:07:55 +01:00 committed by GitHub
parent 2451b9a75a
commit e05886561d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 28 deletions

View File

@ -369,10 +369,10 @@ func topLockEntries(peerLocks []*PeerLocks, stale bool) madmin.LockEntries {
} }
for k, v := range peerLock.Locks { for k, v := range peerLock.Locks {
for _, lockReqInfo := range v { for _, lockReqInfo := range v {
if val, ok := entryMap[lockReqInfo.UID]; ok { if val, ok := entryMap[lockReqInfo.Name]; ok {
val.ServerList = append(val.ServerList, peerLock.Addr) val.ServerList = append(val.ServerList, peerLock.Addr)
} else { } else {
entryMap[lockReqInfo.UID] = lriToLockEntry(lockReqInfo, k, peerLock.Addr) entryMap[lockReqInfo.Name] = lriToLockEntry(lockReqInfo, k, peerLock.Addr)
} }
} }
} }

View File

@ -234,15 +234,17 @@ func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (rep
return true, nil return true, nil
} }
lockFound := false
for _, lris := range l.lockMap { for _, lris := range l.lockMap {
for _, lri := range lris { for _, lri := range lris {
if lri.UID == args.UID { if lri.UID == args.UID {
l.removeEntry(lri.Name, dsync.LockArgs{Owner: lri.Owner, UID: lri.UID}, &lris) l.removeEntry(lri.Name, dsync.LockArgs{UID: lri.UID}, &lris)
return true, nil lockFound = true
} }
} }
} }
return false, nil
return lockFound, nil
} }
} }
@ -254,24 +256,18 @@ func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refresh
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
resource := args.Resources[0] // refresh check is always per resource. lockFound := false
for _, lri := range l.lockMap {
// Lock found, proceed to verify if belongs to given uid.
lri, ok := l.lockMap[resource]
if !ok {
// lock doesn't exist yet, return false
return false, nil
}
// Check whether uid is still active // Check whether uid is still active
for i := range lri { for i := range lri {
if lri[i].UID == args.UID && lri[i].Owner == args.Owner { if lri[i].UID == args.UID {
lri[i].TimeLastRefresh = UTCNow() lri[i].TimeLastRefresh = UTCNow()
return true, nil lockFound = true
}
} }
} }
return false, nil return lockFound, nil
} }
} }

View File

@ -238,7 +238,7 @@ func (dm *DRWMutex) startContinousLockRefresh(lockLossCallback func(), id, sourc
case <-refreshTimer.C: case <-refreshTimer.C:
refreshTimer.Reset(drwMutexRefreshInterval) refreshTimer.Reset(drwMutexRefreshInterval)
refreshed, err := refresh(ctx, dm.clnt, id, source, quorum, dm.Names...) refreshed, err := refresh(ctx, dm.clnt, id, source, quorum)
if err == nil && !refreshed { if err == nil && !refreshed {
// Clean the lock locally and in remote nodes // Clean the lock locally and in remote nodes
forceUnlock(ctx, dm.clnt, id) forceUnlock(ctx, dm.clnt, id)
@ -279,8 +279,8 @@ type refreshResult struct {
succeeded bool succeeded bool
} }
func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int, lockNames ...string) (bool, error) { func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int) (bool, error) {
restClnts, owner := ds.GetLockers() restClnts, _ := ds.GetLockers()
// Create buffered channel of size equal to total number of nodes. // Create buffered channel of size equal to total number of nodes.
ch := make(chan refreshResult, len(restClnts)) ch := make(chan refreshResult, len(restClnts))
@ -298,11 +298,7 @@ func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int, lock
} }
args := LockArgs{ args := LockArgs{
Owner: owner,
UID: id, UID: id,
Resources: lockNames,
Source: source,
Quorum: quorum,
} }
ctx, cancel := context.WithTimeout(ctx, drwMutexRefreshCallTimeout) ctx, cancel := context.WithTimeout(ctx, drwMutexRefreshCallTimeout)