Add lock overload protection (#20876)

Reject new lock requests immediately when 1000 goroutines are queued 
for the local lock mutex.

We do not reject unlocking, refreshing, or maintenance; they add to the count.

The limit is set to allow for bursty behavior but prevent requests from 
overloading the server completely.
This commit is contained in:
Klaus Post 2025-01-31 11:54:34 -08:00 committed by GitHub
parent abb385af41
commit bdb3db6dad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 290 additions and 122 deletions

View File

@ -24,11 +24,20 @@ import (
"fmt" "fmt"
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/minio/minio/internal/dsync" "github.com/minio/minio/internal/dsync"
) )
// Reject new lock requests immediately when this many are queued
// for the local lock mutex.
// We do not block unlocking or maintenance, but they add to the count.
// The limit is set to allow for bursty behavior,
// but prevent requests to overload the server completely.
// Rejected clients are expected to retry.
const lockMutexWaitLimit = 1000
// lockRequesterInfo stores various info from the client for each lock that is requested. // lockRequesterInfo stores various info from the client for each lock that is requested.
type lockRequesterInfo struct { type lockRequesterInfo struct {
Name string // name of the resource lock was requested for Name string // name of the resource lock was requested for
@ -52,9 +61,25 @@ func isWriteLock(lri []lockRequesterInfo) bool {
// //
//msgp:ignore localLocker //msgp:ignore localLocker
type localLocker struct { type localLocker struct {
mutex sync.Mutex mutex sync.Mutex
lockMap map[string][]lockRequesterInfo waitMutex atomic.Int32
lockUID map[string]string // UUID -> resource map. lockMap map[string][]lockRequesterInfo
lockUID map[string]string // UUID -> resource map.
// the following are updated on every cleanup defined in lockValidityDuration
readers atomic.Int32
writers atomic.Int32
lastCleanup atomic.Pointer[time.Time]
locksOverloaded atomic.Int64
}
// getMutex will lock the mutex.
// Call the returned function to unlock.
func (l *localLocker) getMutex() func() {
l.waitMutex.Add(1)
l.mutex.Lock()
l.waitMutex.Add(-1)
return l.mutex.Unlock
} }
func (l *localLocker) String() string { func (l *localLocker) String() string {
@ -76,9 +101,16 @@ func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool
return false, fmt.Errorf("internal error: localLocker.Lock called with more than %d resources", maxDeleteList) return false, fmt.Errorf("internal error: localLocker.Lock called with more than %d resources", maxDeleteList)
} }
l.mutex.Lock() // If we have too many waiting, reject this at once.
defer l.mutex.Unlock() if l.waitMutex.Load() > lockMutexWaitLimit {
l.locksOverloaded.Add(1)
return false, nil
}
// Wait for mutex
defer l.getMutex()()
if ctx.Err() != nil {
return false, ctx.Err()
}
if !l.canTakeLock(args.Resources...) { if !l.canTakeLock(args.Resources...) {
// Not all locks can be taken on resources, // Not all locks can be taken on resources,
// reject it completely. // reject it completely.
@ -117,9 +149,7 @@ func (l *localLocker) Unlock(_ context.Context, args dsync.LockArgs) (reply bool
return false, fmt.Errorf("internal error: localLocker.Unlock called with more than %d resources", maxDeleteList) return false, fmt.Errorf("internal error: localLocker.Unlock called with more than %d resources", maxDeleteList)
} }
l.mutex.Lock() defer l.getMutex()()
defer l.mutex.Unlock()
err = nil
for _, resource := range args.Resources { for _, resource := range args.Resources {
lri, ok := l.lockMap[resource] lri, ok := l.lockMap[resource]
@ -164,9 +194,17 @@ func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply boo
if len(args.Resources) != 1 { if len(args.Resources) != 1 {
return false, fmt.Errorf("internal error: localLocker.RLock called with more than one resource") return false, fmt.Errorf("internal error: localLocker.RLock called with more than one resource")
} }
// If we have too many waiting, reject this at once.
if l.waitMutex.Load() > lockMutexWaitLimit {
l.locksOverloaded.Add(1)
return false, nil
}
l.mutex.Lock() // Wait for mutex
defer l.mutex.Unlock() defer l.getMutex()()
if ctx.Err() != nil {
return false, ctx.Err()
}
resource := args.Resources[0] resource := args.Resources[0]
now := UTCNow() now := UTCNow()
lrInfo := lockRequesterInfo{ lrInfo := lockRequesterInfo{
@ -200,8 +238,7 @@ func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply boo
return false, fmt.Errorf("internal error: localLocker.RUnlock called with more than one resource") return false, fmt.Errorf("internal error: localLocker.RUnlock called with more than one resource")
} }
l.mutex.Lock() defer l.getMutex()()
defer l.mutex.Unlock()
var lri []lockRequesterInfo var lri []lockRequesterInfo
resource := args.Resources[0] resource := args.Resources[0]
@ -218,35 +255,28 @@ func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply boo
} }
type lockStats struct { type lockStats struct {
Total int Total int
Writes int Writes int
Reads int Reads int
LockQueue int
LocksAbandoned int
LastCleanup *time.Time
} }
func (l *localLocker) stats() lockStats { func (l *localLocker) stats() lockStats {
l.mutex.Lock() return lockStats{
defer l.mutex.Unlock() Total: len(l.lockMap),
Reads: int(l.readers.Load()),
st := lockStats{Total: len(l.lockMap)} Writes: int(l.writers.Load()),
for _, v := range l.lockMap { LockQueue: int(l.waitMutex.Load()),
if len(v) == 0 { LastCleanup: l.lastCleanup.Load(),
continue
}
entry := v[0]
if entry.Writer {
st.Writes++
} else {
st.Reads += len(v)
}
} }
return st
} }
type localLockMap map[string][]lockRequesterInfo type localLockMap map[string][]lockRequesterInfo
func (l *localLocker) DupLockMap() localLockMap { func (l *localLocker) DupLockMap() localLockMap {
l.mutex.Lock() defer l.getMutex()()
defer l.mutex.Unlock()
lockCopy := make(map[string][]lockRequesterInfo, len(l.lockMap)) lockCopy := make(map[string][]lockRequesterInfo, len(l.lockMap))
for k, v := range l.lockMap { for k, v := range l.lockMap {
@ -274,108 +304,110 @@ func (l *localLocker) IsLocal() bool {
} }
func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
select { if ctx.Err() != nil {
case <-ctx.Done():
return false, ctx.Err() return false, ctx.Err()
default: }
l.mutex.Lock()
defer l.mutex.Unlock()
if len(args.UID) == 0 {
for _, resource := range args.Resources {
lris, ok := l.lockMap[resource]
if !ok {
continue
}
// Collect uids, so we don't mutate while we delete
uids := make([]string, 0, len(lris))
for _, lri := range lris {
uids = append(uids, lri.UID)
}
// Delete collected uids: defer l.getMutex()()
for _, uid := range uids { if ctx.Err() != nil {
lris, ok := l.lockMap[resource] return false, ctx.Err()
if !ok { }
// Just to be safe, delete uuids. if len(args.UID) == 0 {
for idx := 0; idx < maxDeleteList; idx++ { for _, resource := range args.Resources {
mapID := formatUUID(uid, idx)
if _, ok := l.lockUID[mapID]; !ok {
break
}
delete(l.lockUID, mapID)
}
continue
}
l.removeEntry(resource, dsync.LockArgs{UID: uid}, &lris)
}
}
return true, nil
}
idx := 0
for {
mapID := formatUUID(args.UID, idx)
resource, ok := l.lockUID[mapID]
if !ok {
return idx > 0, nil
}
lris, ok := l.lockMap[resource] lris, ok := l.lockMap[resource]
if !ok { if !ok {
// Unexpected inconsistency, delete.
delete(l.lockUID, mapID)
idx++
continue continue
} }
reply = true // Collect uids, so we don't mutate while we delete
l.removeEntry(resource, dsync.LockArgs{UID: args.UID}, &lris) uids := make([]string, 0, len(lris))
idx++ for _, lri := range lris {
uids = append(uids, lri.UID)
}
// Delete collected uids:
for _, uid := range uids {
lris, ok := l.lockMap[resource]
if !ok {
// Just to be safe, delete uuids.
for idx := 0; idx < maxDeleteList; idx++ {
mapID := formatUUID(uid, idx)
if _, ok := l.lockUID[mapID]; !ok {
break
}
delete(l.lockUID, mapID)
}
continue
}
l.removeEntry(resource, dsync.LockArgs{UID: uid}, &lris)
}
} }
return true, nil
}
idx := 0
for {
mapID := formatUUID(args.UID, idx)
resource, ok := l.lockUID[mapID]
if !ok {
return idx > 0, nil
}
lris, ok := l.lockMap[resource]
if !ok {
// Unexpected inconsistency, delete.
delete(l.lockUID, mapID)
idx++
continue
}
reply = true
l.removeEntry(resource, dsync.LockArgs{UID: args.UID}, &lris)
idx++
} }
} }
func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refreshed bool, err error) { func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refreshed bool, err error) {
select { if ctx.Err() != nil {
case <-ctx.Done():
return false, ctx.Err() return false, ctx.Err()
default: }
l.mutex.Lock()
defer l.mutex.Unlock()
// Check whether uid is still active. defer l.getMutex()()
resource, ok := l.lockUID[formatUUID(args.UID, 0)] if ctx.Err() != nil {
return false, ctx.Err()
}
// Check whether uid is still active.
resource, ok := l.lockUID[formatUUID(args.UID, 0)]
if !ok {
return false, nil
}
idx := 0
for {
lris, ok := l.lockMap[resource]
if !ok { if !ok {
return false, nil // Inconsistent. Delete UID.
delete(l.lockUID, formatUUID(args.UID, idx))
return idx > 0, nil
} }
idx := 0 now := UTCNow()
for { for i := range lris {
lris, ok := l.lockMap[resource] if lris[i].UID == args.UID {
if !ok { lris[i].TimeLastRefresh = now.UnixNano()
// Inconsistent. Delete UID.
delete(l.lockUID, formatUUID(args.UID, idx))
return idx > 0, nil
}
now := UTCNow()
for i := range lris {
if lris[i].UID == args.UID {
lris[i].TimeLastRefresh = now.UnixNano()
}
}
idx++
resource, ok = l.lockUID[formatUUID(args.UID, idx)]
if !ok {
// No more resources for UID, but we did update at least one.
return true, nil
} }
} }
idx++
resource, ok = l.lockUID[formatUUID(args.UID, idx)]
if !ok {
// No more resources for UID, but we did update at least one.
return true, nil
}
} }
} }
// Similar to removeEntry but only removes an entry only if the lock entry exists in map. // Similar to removeEntry but only removes an entry only if the lock entry exists in map.
// Caller must hold 'l.mutex' lock. // Caller must hold 'l.mutex' lock.
func (l *localLocker) expireOldLocks(interval time.Duration) { func (l *localLocker) expireOldLocks(interval time.Duration) {
l.mutex.Lock() defer l.getMutex()()
defer l.mutex.Unlock()
var readers, writers int32
for k, lris := range l.lockMap { for k, lris := range l.lockMap {
modified := false modified := false
for i := 0; i < len(lris); { for i := 0; i < len(lris); {
@ -393,6 +425,11 @@ func (l *localLocker) expireOldLocks(interval time.Duration) {
lris = append(lris[:i], lris[i+1:]...) lris = append(lris[:i], lris[i+1:]...)
// Check same i // Check same i
} else { } else {
if lri.Writer {
writers++
} else {
readers++
}
// Move to next // Move to next
i++ i++
} }
@ -401,6 +438,10 @@ func (l *localLocker) expireOldLocks(interval time.Duration) {
l.lockMap[k] = lris l.lockMap[k] = lris
} }
} }
t := time.Now()
l.lastCleanup.Store(&t)
l.readers.Store(readers)
l.writers.Store(writers)
} }
func newLocker() *localLocker { func newLocker() *localLocker {

View File

@ -3,6 +3,8 @@ package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT. // Code generated by github.com/tinylib/msgp DO NOT EDIT.
import ( import (
"time"
"github.com/tinylib/msgp/msgp" "github.com/tinylib/msgp/msgp"
) )
@ -506,6 +508,36 @@ func (z *lockStats) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "Reads") err = msgp.WrapError(err, "Reads")
return return
} }
case "LockQueue":
z.LockQueue, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "LockQueue")
return
}
case "LocksAbandoned":
z.LocksAbandoned, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "LocksAbandoned")
return
}
case "LastCleanup":
if dc.IsNil() {
err = dc.ReadNil()
if err != nil {
err = msgp.WrapError(err, "LastCleanup")
return
}
z.LastCleanup = nil
} else {
if z.LastCleanup == nil {
z.LastCleanup = new(time.Time)
}
*z.LastCleanup, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, "LastCleanup")
return
}
}
default: default:
err = dc.Skip() err = dc.Skip()
if err != nil { if err != nil {
@ -518,10 +550,10 @@ func (z *lockStats) DecodeMsg(dc *msgp.Reader) (err error) {
} }
// EncodeMsg implements msgp.Encodable // EncodeMsg implements msgp.Encodable
func (z lockStats) EncodeMsg(en *msgp.Writer) (err error) { func (z *lockStats) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 3 // map header, size 6
// write "Total" // write "Total"
err = en.Append(0x83, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) err = en.Append(0x86, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
if err != nil { if err != nil {
return return
} }
@ -550,15 +582,52 @@ func (z lockStats) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "Reads") err = msgp.WrapError(err, "Reads")
return return
} }
// write "LockQueue"
err = en.Append(0xa9, 0x4c, 0x6f, 0x63, 0x6b, 0x51, 0x75, 0x65, 0x75, 0x65)
if err != nil {
return
}
err = en.WriteInt(z.LockQueue)
if err != nil {
err = msgp.WrapError(err, "LockQueue")
return
}
// write "LocksAbandoned"
err = en.Append(0xae, 0x4c, 0x6f, 0x63, 0x6b, 0x73, 0x41, 0x62, 0x61, 0x6e, 0x64, 0x6f, 0x6e, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteInt(z.LocksAbandoned)
if err != nil {
err = msgp.WrapError(err, "LocksAbandoned")
return
}
// write "LastCleanup"
err = en.Append(0xab, 0x4c, 0x61, 0x73, 0x74, 0x43, 0x6c, 0x65, 0x61, 0x6e, 0x75, 0x70)
if err != nil {
return
}
if z.LastCleanup == nil {
err = en.WriteNil()
if err != nil {
return
}
} else {
err = en.WriteTime(*z.LastCleanup)
if err != nil {
err = msgp.WrapError(err, "LastCleanup")
return
}
}
return return
} }
// MarshalMsg implements msgp.Marshaler // MarshalMsg implements msgp.Marshaler
func (z lockStats) MarshalMsg(b []byte) (o []byte, err error) { func (z *lockStats) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize()) o = msgp.Require(b, z.Msgsize())
// map header, size 3 // map header, size 6
// string "Total" // string "Total"
o = append(o, 0x83, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) o = append(o, 0x86, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
o = msgp.AppendInt(o, z.Total) o = msgp.AppendInt(o, z.Total)
// string "Writes" // string "Writes"
o = append(o, 0xa6, 0x57, 0x72, 0x69, 0x74, 0x65, 0x73) o = append(o, 0xa6, 0x57, 0x72, 0x69, 0x74, 0x65, 0x73)
@ -566,6 +635,19 @@ func (z lockStats) MarshalMsg(b []byte) (o []byte, err error) {
// string "Reads" // string "Reads"
o = append(o, 0xa5, 0x52, 0x65, 0x61, 0x64, 0x73) o = append(o, 0xa5, 0x52, 0x65, 0x61, 0x64, 0x73)
o = msgp.AppendInt(o, z.Reads) o = msgp.AppendInt(o, z.Reads)
// string "LockQueue"
o = append(o, 0xa9, 0x4c, 0x6f, 0x63, 0x6b, 0x51, 0x75, 0x65, 0x75, 0x65)
o = msgp.AppendInt(o, z.LockQueue)
// string "LocksAbandoned"
o = append(o, 0xae, 0x4c, 0x6f, 0x63, 0x6b, 0x73, 0x41, 0x62, 0x61, 0x6e, 0x64, 0x6f, 0x6e, 0x65, 0x64)
o = msgp.AppendInt(o, z.LocksAbandoned)
// string "LastCleanup"
o = append(o, 0xab, 0x4c, 0x61, 0x73, 0x74, 0x43, 0x6c, 0x65, 0x61, 0x6e, 0x75, 0x70)
if z.LastCleanup == nil {
o = msgp.AppendNil(o)
} else {
o = msgp.AppendTime(o, *z.LastCleanup)
}
return return
} }
@ -605,6 +687,35 @@ func (z *lockStats) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "Reads") err = msgp.WrapError(err, "Reads")
return return
} }
case "LockQueue":
z.LockQueue, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "LockQueue")
return
}
case "LocksAbandoned":
z.LocksAbandoned, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "LocksAbandoned")
return
}
case "LastCleanup":
if msgp.IsNil(bts) {
bts, err = msgp.ReadNilBytes(bts)
if err != nil {
return
}
z.LastCleanup = nil
} else {
if z.LastCleanup == nil {
z.LastCleanup = new(time.Time)
}
*z.LastCleanup, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "LastCleanup")
return
}
}
default: default:
bts, err = msgp.Skip(bts) bts, err = msgp.Skip(bts)
if err != nil { if err != nil {
@ -618,7 +729,12 @@ func (z *lockStats) UnmarshalMsg(bts []byte) (o []byte, err error) {
} }
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z lockStats) Msgsize() (s int) { func (z *lockStats) Msgsize() (s int) {
s = 1 + 6 + msgp.IntSize + 7 + msgp.IntSize + 6 + msgp.IntSize s = 1 + 6 + msgp.IntSize + 7 + msgp.IntSize + 6 + msgp.IntSize + 10 + msgp.IntSize + 15 + msgp.IntSize + 12
if z.LastCleanup == nil {
s += msgp.NilSize
} else {
s += msgp.TimeSize
}
return return
} }

View File

@ -33,8 +33,12 @@ type lockRESTServer struct {
// RefreshHandler - refresh the current lock // RefreshHandler - refresh the current lock
func (l *lockRESTServer) RefreshHandler(args *dsync.LockArgs) (*dsync.LockResp, *grid.RemoteErr) { func (l *lockRESTServer) RefreshHandler(args *dsync.LockArgs) (*dsync.LockResp, *grid.RemoteErr) {
// Add a timeout similar to what we expect upstream.
ctx, cancel := context.WithTimeout(context.Background(), dsync.DefaultTimeouts.RefreshCall)
defer cancel()
resp := lockRPCRefresh.NewResponse() resp := lockRPCRefresh.NewResponse()
refreshed, err := l.ll.Refresh(context.Background(), *args) refreshed, err := l.ll.Refresh(ctx, *args)
if err != nil { if err != nil {
return l.makeResp(resp, err) return l.makeResp(resp, err)
} }
@ -46,8 +50,11 @@ func (l *lockRESTServer) RefreshHandler(args *dsync.LockArgs) (*dsync.LockResp,
// LockHandler - Acquires a lock. // LockHandler - Acquires a lock.
func (l *lockRESTServer) LockHandler(args *dsync.LockArgs) (*dsync.LockResp, *grid.RemoteErr) { func (l *lockRESTServer) LockHandler(args *dsync.LockArgs) (*dsync.LockResp, *grid.RemoteErr) {
// Add a timeout similar to what we expect upstream.
ctx, cancel := context.WithTimeout(context.Background(), dsync.DefaultTimeouts.Acquire)
defer cancel()
resp := lockRPCLock.NewResponse() resp := lockRPCLock.NewResponse()
success, err := l.ll.Lock(context.Background(), *args) success, err := l.ll.Lock(ctx, *args)
if err == nil && !success { if err == nil && !success {
return l.makeResp(resp, errLockConflict) return l.makeResp(resp, errLockConflict)
} }
@ -65,8 +72,11 @@ func (l *lockRESTServer) UnlockHandler(args *dsync.LockArgs) (*dsync.LockResp, *
// RLockHandler - Acquires an RLock. // RLockHandler - Acquires an RLock.
func (l *lockRESTServer) RLockHandler(args *dsync.LockArgs) (*dsync.LockResp, *grid.RemoteErr) { func (l *lockRESTServer) RLockHandler(args *dsync.LockArgs) (*dsync.LockResp, *grid.RemoteErr) {
// Add a timeout similar to what we expect upstream.
ctx, cancel := context.WithTimeout(context.Background(), dsync.DefaultTimeouts.Acquire)
defer cancel()
resp := lockRPCRLock.NewResponse() resp := lockRPCRLock.NewResponse()
success, err := l.ll.RLock(context.Background(), *args) success, err := l.ll.RLock(ctx, *args)
if err == nil && !success { if err == nil && !success {
err = errLockConflict err = errLockConflict
} }

View File

@ -443,6 +443,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
// Special context for NetLockers - do not use timeouts. // Special context for NetLockers - do not use timeouts.
// Also, pass the trace context info if found for debugging // Also, pass the trace context info if found for debugging
netLockCtx := context.Background() netLockCtx := context.Background()
tc, ok := ctx.Value(mcontext.ContextTraceKey).(*mcontext.TraceCtxt) tc, ok := ctx.Value(mcontext.ContextTraceKey).(*mcontext.TraceCtxt)
if ok { if ok {
netLockCtx = context.WithValue(netLockCtx, mcontext.ContextTraceKey, tc) netLockCtx = context.WithValue(netLockCtx, mcontext.ContextTraceKey, tc)