From cdb1b48ad9eef656f47fdd1f2db3ed3c01ce934e Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Tue, 31 Jan 2023 18:41:17 +0100 Subject: [PATCH] Make localLocker lock attempts cancellable (#16510) --- cmd/local-locker.go | 206 ++++++++++++++++------------ cmd/lock-rest-server-common_test.go | 6 +- 2 files changed, 117 insertions(+), 95 deletions(-) diff --git a/cmd/local-locker.go b/cmd/local-locker.go index c6e9a3ffb..8135cd812 100644 --- a/cmd/local-locker.go +++ b/cmd/local-locker.go @@ -21,7 +21,6 @@ import ( "context" "fmt" "strconv" - "sync" "time" "github.com/minio/minio/internal/dsync" @@ -51,7 +50,7 @@ func isWriteLock(lri []lockRequesterInfo) bool { // localLocker implements Dsync.NetLocker type localLocker struct { - mutex sync.Mutex + mutex chan struct{} lockMap map[string][]lockRequesterInfo lockUID map[string]string // UUID -> resource map. } @@ -70,13 +69,41 @@ func (l *localLocker) canTakeLock(resources ...string) bool { return true } +// lockMu locks the "mutex" of the local locker. +// If "ctx" is canceled before the lock can be obtained false is returned. +// If "true" is returned unlockMu MUST be called. +// Behavior is similar to sync.Mutex. +func (l *localLocker) lockMu(ctx context.Context) (ok bool) { + select { + case l.mutex <- struct{}{}: + return true + case <-ctx.Done(): + return false + } +} + +// lockMuBlock will block while getting the mutex. +// When returned unlockMu *must* be called. +// Behavior is similar to sync.Mutex. +func (l *localLocker) lockMuBlock() { + l.mutex <- struct{}{} +} + +// unlockMu unlocks an acquired mutex. +// This may only be called once after successfully getting a mutex. +func (l *localLocker) unlockMu() { + <-l.mutex +} + func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { if len(args.Resources) > maxDeleteList { return false, fmt.Errorf("internal error: localLocker.Lock called with more than %d resources", maxDeleteList) } - l.mutex.Lock() - defer l.mutex.Unlock() + if !l.lockMu(ctx) { + return false, ctx.Err() + } + defer l.unlockMu() if !l.canTakeLock(args.Resources...) { // Not all locks can be taken on resources, @@ -115,8 +142,9 @@ func (l *localLocker) Unlock(_ context.Context, args dsync.LockArgs) (reply bool return false, fmt.Errorf("internal error: localLocker.Unlock called with more than %d resources", maxDeleteList) } - l.mutex.Lock() - defer l.mutex.Unlock() + l.lockMuBlock() + defer l.unlockMu() + err = nil for _, resource := range args.Resources { @@ -163,8 +191,11 @@ func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply boo return false, fmt.Errorf("internal error: localLocker.RLock called with more than one resource") } - l.mutex.Lock() - defer l.mutex.Unlock() + if !l.lockMu(ctx) { + return false, ctx.Err() + } + defer l.unlockMu() + resource := args.Resources[0] lrInfo := lockRequesterInfo{ Name: resource, @@ -196,8 +227,9 @@ func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply boo return false, fmt.Errorf("internal error: localLocker.RUnlock called with more than one resource") } - l.mutex.Lock() - defer l.mutex.Unlock() + l.lockMuBlock() + defer l.unlockMu() + var lri []lockRequesterInfo resource := args.Resources[0] @@ -214,8 +246,8 @@ func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply boo } func (l *localLocker) DupLockMap() map[string][]lockRequesterInfo { - l.mutex.Lock() - defer l.mutex.Unlock() + l.lockMuBlock() + defer l.unlockMu() lockCopy := make(map[string][]lockRequesterInfo, len(l.lockMap)) for k, v := range l.lockMap { @@ -239,106 +271,99 @@ func (l *localLocker) IsLocal() bool { } func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { - select { - case <-ctx.Done(): - return false, ctx.Err() - default: - l.mutex.Lock() - defer l.mutex.Unlock() - if len(args.UID) == 0 { - for _, resource := range args.Resources { - lris, ok := l.lockMap[resource] - if !ok { - continue - } - // Collect uids, so we don't mutate while we delete - uids := make([]string, 0, len(lris)) - for _, lri := range lris { - uids = append(uids, lri.UID) - } + l.lockMuBlock() + defer l.unlockMu() - // Delete collected uids: - for _, uid := range uids { - lris, ok := l.lockMap[resource] - if !ok { - // Just to be safe, delete uuids. - for idx := 0; idx < maxDeleteList; idx++ { - mapID := formatUUID(uid, idx) - if _, ok := l.lockUID[mapID]; !ok { - break - } - delete(l.lockUID, mapID) - } - continue - } - l.removeEntry(resource, dsync.LockArgs{UID: uid}, &lris) - } - } - return true, nil - } - - idx := 0 - for { - mapID := formatUUID(args.UID, idx) - resource, ok := l.lockUID[mapID] - if !ok { - return idx > 0, nil - } + if len(args.UID) == 0 { + for _, resource := range args.Resources { lris, ok := l.lockMap[resource] if !ok { - // Unexpected inconsistency, delete. - delete(l.lockUID, mapID) - idx++ continue } - reply = true - l.removeEntry(resource, dsync.LockArgs{UID: args.UID}, &lris) - idx++ + // Collect uids, so we don't mutate while we delete + uids := make([]string, 0, len(lris)) + for _, lri := range lris { + uids = append(uids, lri.UID) + } + + // Delete collected uids: + for _, uid := range uids { + lris, ok := l.lockMap[resource] + if !ok { + // Just to be safe, delete uuids. + for idx := 0; idx < maxDeleteList; idx++ { + mapID := formatUUID(uid, idx) + if _, ok := l.lockUID[mapID]; !ok { + break + } + delete(l.lockUID, mapID) + } + continue + } + l.removeEntry(resource, dsync.LockArgs{UID: uid}, &lris) + } } + return true, nil + } + + idx := 0 + for { + mapID := formatUUID(args.UID, idx) + resource, ok := l.lockUID[mapID] + if !ok { + return idx > 0, nil + } + lris, ok := l.lockMap[resource] + if !ok { + // Unexpected inconsistency, delete. + delete(l.lockUID, mapID) + idx++ + continue + } + reply = true + l.removeEntry(resource, dsync.LockArgs{UID: args.UID}, &lris) + idx++ } } func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refreshed bool, err error) { - select { - case <-ctx.Done(): + if !l.lockMu(ctx) { return false, ctx.Err() - default: - l.mutex.Lock() - defer l.mutex.Unlock() + } + defer l.unlockMu() - // Check whether uid is still active. - resource, ok := l.lockUID[formatUUID(args.UID, 0)] + // Check whether uid is still active. + resource, ok := l.lockUID[formatUUID(args.UID, 0)] + if !ok { + return false, nil + } + idx := 0 + for { + lris, ok := l.lockMap[resource] if !ok { - return false, nil + // Inconsistent. Delete UID. + delete(l.lockUID, formatUUID(args.UID, idx)) + return idx > 0, nil } - idx := 0 - for { - lris, ok := l.lockMap[resource] - if !ok { - // Inconsistent. Delete UID. - delete(l.lockUID, formatUUID(args.UID, idx)) - return idx > 0, nil - } - for i := range lris { - if lris[i].UID == args.UID { - lris[i].TimeLastRefresh = UTCNow() - } - } - idx++ - resource, ok = l.lockUID[formatUUID(args.UID, idx)] - if !ok { - // No more resources for UID, but we did update at least one. - return true, nil + for i := range lris { + if lris[i].UID == args.UID { + lris[i].TimeLastRefresh = UTCNow() } } + idx++ + resource, ok = l.lockUID[formatUUID(args.UID, idx)] + if !ok { + // No more resources for UID, but we did update at least one. + return true, nil + } } } // Similar to removeEntry but only removes an entry only if the lock entry exists in map. // Caller must hold 'l.mutex' lock. func (l *localLocker) expireOldLocks(interval time.Duration) { - l.mutex.Lock() - defer l.mutex.Unlock() + l.lockMuBlock() + defer l.unlockMu() for k, lris := range l.lockMap { modified := false @@ -369,6 +394,7 @@ func (l *localLocker) expireOldLocks(interval time.Duration) { func newLocker() *localLocker { return &localLocker{ + mutex: make(chan struct{}, 1), lockMap: make(map[string][]lockRequesterInfo, 1000), lockUID: make(map[string]string, 1000), } diff --git a/cmd/lock-rest-server-common_test.go b/cmd/lock-rest-server-common_test.go index 1ef884532..de42a72f3 100644 --- a/cmd/lock-rest-server-common_test.go +++ b/cmd/lock-rest-server-common_test.go @@ -21,7 +21,6 @@ import ( "context" "os" "reflect" - "sync" "testing" "github.com/minio/minio/internal/dsync" @@ -38,10 +37,7 @@ func createLockTestServer(ctx context.Context, t *testing.T) (string, *lockRESTS } locker := &lockRESTServer{ - ll: &localLocker{ - mutex: sync.Mutex{}, - lockMap: make(map[string][]lockRequesterInfo), - }, + ll: newLocker(), } creds := globalActiveCred token, err := authenticateNode(creds.AccessKey, creds.SecretKey, "")