Make localLocker lock attempts cancellable (#16510)

This commit is contained in:
Klaus Post 2023-01-31 18:41:17 +01:00 committed by GitHub
parent a24037bfec
commit cdb1b48ad9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 117 additions and 95 deletions

View File

@ -21,7 +21,6 @@ import (
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/minio/minio/internal/dsync"
@ -51,7 +50,7 @@ func isWriteLock(lri []lockRequesterInfo) bool {
// localLocker implements Dsync.NetLocker
type localLocker struct {
mutex sync.Mutex
mutex chan struct{}
lockMap map[string][]lockRequesterInfo
lockUID map[string]string // UUID -> resource map.
}
@ -70,13 +69,41 @@ func (l *localLocker) canTakeLock(resources ...string) bool {
return true
}
// lockMu locks the "mutex" of the local locker.
// If "ctx" is canceled before the lock can be obtained false is returned.
// If "true" is returned unlockMu MUST be called.
// Behavior is similar to sync.Mutex.
func (l *localLocker) lockMu(ctx context.Context) (ok bool) {
select {
case l.mutex <- struct{}{}:
return true
case <-ctx.Done():
return false
}
}
// lockMuBlock will block while getting the mutex.
// When returned unlockMu *must* be called.
// Behavior is similar to sync.Mutex.
func (l *localLocker) lockMuBlock() {
l.mutex <- struct{}{}
}
// unlockMu unlocks an acquired mutex.
// This may only be called once after successfully getting a mutex.
func (l *localLocker) unlockMu() {
<-l.mutex
}
func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
if len(args.Resources) > maxDeleteList {
return false, fmt.Errorf("internal error: localLocker.Lock called with more than %d resources", maxDeleteList)
}
l.mutex.Lock()
defer l.mutex.Unlock()
if !l.lockMu(ctx) {
return false, ctx.Err()
}
defer l.unlockMu()
if !l.canTakeLock(args.Resources...) {
// Not all locks can be taken on resources,
@ -115,8 +142,9 @@ func (l *localLocker) Unlock(_ context.Context, args dsync.LockArgs) (reply bool
return false, fmt.Errorf("internal error: localLocker.Unlock called with more than %d resources", maxDeleteList)
}
l.mutex.Lock()
defer l.mutex.Unlock()
l.lockMuBlock()
defer l.unlockMu()
err = nil
for _, resource := range args.Resources {
@ -163,8 +191,11 @@ func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply boo
return false, fmt.Errorf("internal error: localLocker.RLock called with more than one resource")
}
l.mutex.Lock()
defer l.mutex.Unlock()
if !l.lockMu(ctx) {
return false, ctx.Err()
}
defer l.unlockMu()
resource := args.Resources[0]
lrInfo := lockRequesterInfo{
Name: resource,
@ -196,8 +227,9 @@ func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply boo
return false, fmt.Errorf("internal error: localLocker.RUnlock called with more than one resource")
}
l.mutex.Lock()
defer l.mutex.Unlock()
l.lockMuBlock()
defer l.unlockMu()
var lri []lockRequesterInfo
resource := args.Resources[0]
@ -214,8 +246,8 @@ func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply boo
}
func (l *localLocker) DupLockMap() map[string][]lockRequesterInfo {
l.mutex.Lock()
defer l.mutex.Unlock()
l.lockMuBlock()
defer l.unlockMu()
lockCopy := make(map[string][]lockRequesterInfo, len(l.lockMap))
for k, v := range l.lockMap {
@ -239,106 +271,99 @@ func (l *localLocker) IsLocal() bool {
}
func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
l.mutex.Lock()
defer l.mutex.Unlock()
if len(args.UID) == 0 {
for _, resource := range args.Resources {
lris, ok := l.lockMap[resource]
if !ok {
continue
}
// Collect uids, so we don't mutate while we delete
uids := make([]string, 0, len(lris))
for _, lri := range lris {
uids = append(uids, lri.UID)
}
l.lockMuBlock()
defer l.unlockMu()
// Delete collected uids:
for _, uid := range uids {
lris, ok := l.lockMap[resource]
if !ok {
// Just to be safe, delete uuids.
for idx := 0; idx < maxDeleteList; idx++ {
mapID := formatUUID(uid, idx)
if _, ok := l.lockUID[mapID]; !ok {
break
}
delete(l.lockUID, mapID)
}
continue
}
l.removeEntry(resource, dsync.LockArgs{UID: uid}, &lris)
}
}
return true, nil
}
idx := 0
for {
mapID := formatUUID(args.UID, idx)
resource, ok := l.lockUID[mapID]
if !ok {
return idx > 0, nil
}
if len(args.UID) == 0 {
for _, resource := range args.Resources {
lris, ok := l.lockMap[resource]
if !ok {
// Unexpected inconsistency, delete.
delete(l.lockUID, mapID)
idx++
continue
}
reply = true
l.removeEntry(resource, dsync.LockArgs{UID: args.UID}, &lris)
idx++
// Collect uids, so we don't mutate while we delete
uids := make([]string, 0, len(lris))
for _, lri := range lris {
uids = append(uids, lri.UID)
}
// Delete collected uids:
for _, uid := range uids {
lris, ok := l.lockMap[resource]
if !ok {
// Just to be safe, delete uuids.
for idx := 0; idx < maxDeleteList; idx++ {
mapID := formatUUID(uid, idx)
if _, ok := l.lockUID[mapID]; !ok {
break
}
delete(l.lockUID, mapID)
}
continue
}
l.removeEntry(resource, dsync.LockArgs{UID: uid}, &lris)
}
}
return true, nil
}
idx := 0
for {
mapID := formatUUID(args.UID, idx)
resource, ok := l.lockUID[mapID]
if !ok {
return idx > 0, nil
}
lris, ok := l.lockMap[resource]
if !ok {
// Unexpected inconsistency, delete.
delete(l.lockUID, mapID)
idx++
continue
}
reply = true
l.removeEntry(resource, dsync.LockArgs{UID: args.UID}, &lris)
idx++
}
}
func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refreshed bool, err error) {
select {
case <-ctx.Done():
if !l.lockMu(ctx) {
return false, ctx.Err()
default:
l.mutex.Lock()
defer l.mutex.Unlock()
}
defer l.unlockMu()
// Check whether uid is still active.
resource, ok := l.lockUID[formatUUID(args.UID, 0)]
// Check whether uid is still active.
resource, ok := l.lockUID[formatUUID(args.UID, 0)]
if !ok {
return false, nil
}
idx := 0
for {
lris, ok := l.lockMap[resource]
if !ok {
return false, nil
// Inconsistent. Delete UID.
delete(l.lockUID, formatUUID(args.UID, idx))
return idx > 0, nil
}
idx := 0
for {
lris, ok := l.lockMap[resource]
if !ok {
// Inconsistent. Delete UID.
delete(l.lockUID, formatUUID(args.UID, idx))
return idx > 0, nil
}
for i := range lris {
if lris[i].UID == args.UID {
lris[i].TimeLastRefresh = UTCNow()
}
}
idx++
resource, ok = l.lockUID[formatUUID(args.UID, idx)]
if !ok {
// No more resources for UID, but we did update at least one.
return true, nil
for i := range lris {
if lris[i].UID == args.UID {
lris[i].TimeLastRefresh = UTCNow()
}
}
idx++
resource, ok = l.lockUID[formatUUID(args.UID, idx)]
if !ok {
// No more resources for UID, but we did update at least one.
return true, nil
}
}
}
// Similar to removeEntry but only removes an entry only if the lock entry exists in map.
// Caller must hold 'l.mutex' lock.
func (l *localLocker) expireOldLocks(interval time.Duration) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.lockMuBlock()
defer l.unlockMu()
for k, lris := range l.lockMap {
modified := false
@ -369,6 +394,7 @@ func (l *localLocker) expireOldLocks(interval time.Duration) {
func newLocker() *localLocker {
return &localLocker{
mutex: make(chan struct{}, 1),
lockMap: make(map[string][]lockRequesterInfo, 1000),
lockUID: make(map[string]string, 1000),
}

View File

@ -21,7 +21,6 @@ import (
"context"
"os"
"reflect"
"sync"
"testing"
"github.com/minio/minio/internal/dsync"
@ -38,10 +37,7 @@ func createLockTestServer(ctx context.Context, t *testing.T) (string, *lockRESTS
}
locker := &lockRESTServer{
ll: &localLocker{
mutex: sync.Mutex{},
lockMap: make(map[string][]lockRequesterInfo),
},
ll: newLocker(),
}
creds := globalActiveCred
token, err := authenticateNode(creds.AccessKey, creds.SecretKey, "")