From bdb3db6dadf48f0460a3ed24f943cfd19f218b99 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Fri, 31 Jan 2025 11:54:34 -0800 Subject: [PATCH] Add lock overload protection (#20876) Reject new lock requests immediately when 1000 goroutines are queued for the local lock mutex. We do not reject unlocking, refreshing, or maintenance; they add to the count. The limit is set to allow for bursty behavior but prevent requests from overloading the server completely. --- cmd/local-locker.go | 263 +++++++++++++++++++++---------------- cmd/local-locker_gen.go | 132 +++++++++++++++++-- cmd/lock-rest-server.go | 16 ++- internal/dsync/drwmutex.go | 1 + 4 files changed, 290 insertions(+), 122 deletions(-) diff --git a/cmd/local-locker.go b/cmd/local-locker.go index 47ffa9719..e5904d508 100644 --- a/cmd/local-locker.go +++ b/cmd/local-locker.go @@ -24,11 +24,20 @@ import ( "fmt" "strconv" "sync" + "sync/atomic" "time" "github.com/minio/minio/internal/dsync" ) +// Reject new lock requests immediately when this many are queued +// for the local lock mutex. +// We do not block unlocking or maintenance, but they add to the count. +// The limit is set to allow for bursty behavior, +// but prevent requests to overload the server completely. +// Rejected clients are expected to retry. +const lockMutexWaitLimit = 1000 + // lockRequesterInfo stores various info from the client for each lock that is requested. type lockRequesterInfo struct { Name string // name of the resource lock was requested for @@ -52,9 +61,25 @@ func isWriteLock(lri []lockRequesterInfo) bool { // //msgp:ignore localLocker type localLocker struct { - mutex sync.Mutex - lockMap map[string][]lockRequesterInfo - lockUID map[string]string // UUID -> resource map. + mutex sync.Mutex + waitMutex atomic.Int32 + lockMap map[string][]lockRequesterInfo + lockUID map[string]string // UUID -> resource map. + + // the following are updated on every cleanup defined in lockValidityDuration + readers atomic.Int32 + writers atomic.Int32 + lastCleanup atomic.Pointer[time.Time] + locksOverloaded atomic.Int64 +} + +// getMutex will lock the mutex. +// Call the returned function to unlock. +func (l *localLocker) getMutex() func() { + l.waitMutex.Add(1) + l.mutex.Lock() + l.waitMutex.Add(-1) + return l.mutex.Unlock } func (l *localLocker) String() string { @@ -76,9 +101,16 @@ func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool return false, fmt.Errorf("internal error: localLocker.Lock called with more than %d resources", maxDeleteList) } - l.mutex.Lock() - defer l.mutex.Unlock() - + // If we have too many waiting, reject this at once. + if l.waitMutex.Load() > lockMutexWaitLimit { + l.locksOverloaded.Add(1) + return false, nil + } + // Wait for mutex + defer l.getMutex()() + if ctx.Err() != nil { + return false, ctx.Err() + } if !l.canTakeLock(args.Resources...) { // Not all locks can be taken on resources, // reject it completely. @@ -117,9 +149,7 @@ func (l *localLocker) Unlock(_ context.Context, args dsync.LockArgs) (reply bool return false, fmt.Errorf("internal error: localLocker.Unlock called with more than %d resources", maxDeleteList) } - l.mutex.Lock() - defer l.mutex.Unlock() - err = nil + defer l.getMutex()() for _, resource := range args.Resources { lri, ok := l.lockMap[resource] @@ -164,9 +194,17 @@ func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply boo if len(args.Resources) != 1 { return false, fmt.Errorf("internal error: localLocker.RLock called with more than one resource") } + // If we have too many waiting, reject this at once. + if l.waitMutex.Load() > lockMutexWaitLimit { + l.locksOverloaded.Add(1) + return false, nil + } - l.mutex.Lock() - defer l.mutex.Unlock() + // Wait for mutex + defer l.getMutex()() + if ctx.Err() != nil { + return false, ctx.Err() + } resource := args.Resources[0] now := UTCNow() lrInfo := lockRequesterInfo{ @@ -200,8 +238,7 @@ func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply boo return false, fmt.Errorf("internal error: localLocker.RUnlock called with more than one resource") } - l.mutex.Lock() - defer l.mutex.Unlock() + defer l.getMutex()() var lri []lockRequesterInfo resource := args.Resources[0] @@ -218,35 +255,28 @@ func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply boo } type lockStats struct { - Total int - Writes int - Reads int + Total int + Writes int + Reads int + LockQueue int + LocksAbandoned int + LastCleanup *time.Time } func (l *localLocker) stats() lockStats { - l.mutex.Lock() - defer l.mutex.Unlock() - - st := lockStats{Total: len(l.lockMap)} - for _, v := range l.lockMap { - if len(v) == 0 { - continue - } - entry := v[0] - if entry.Writer { - st.Writes++ - } else { - st.Reads += len(v) - } + return lockStats{ + Total: len(l.lockMap), + Reads: int(l.readers.Load()), + Writes: int(l.writers.Load()), + LockQueue: int(l.waitMutex.Load()), + LastCleanup: l.lastCleanup.Load(), } - return st } type localLockMap map[string][]lockRequesterInfo func (l *localLocker) DupLockMap() localLockMap { - l.mutex.Lock() - defer l.mutex.Unlock() + defer l.getMutex()() lockCopy := make(map[string][]lockRequesterInfo, len(l.lockMap)) for k, v := range l.lockMap { @@ -274,108 +304,110 @@ func (l *localLocker) IsLocal() bool { } func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { - select { - case <-ctx.Done(): + if ctx.Err() != nil { return false, ctx.Err() - default: - l.mutex.Lock() - defer l.mutex.Unlock() - if len(args.UID) == 0 { - for _, resource := range args.Resources { - lris, ok := l.lockMap[resource] - if !ok { - continue - } - // Collect uids, so we don't mutate while we delete - uids := make([]string, 0, len(lris)) - for _, lri := range lris { - uids = append(uids, lri.UID) - } + } - // Delete collected uids: - for _, uid := range uids { - lris, ok := l.lockMap[resource] - if !ok { - // Just to be safe, delete uuids. - for idx := 0; idx < maxDeleteList; idx++ { - mapID := formatUUID(uid, idx) - if _, ok := l.lockUID[mapID]; !ok { - break - } - delete(l.lockUID, mapID) - } - continue - } - l.removeEntry(resource, dsync.LockArgs{UID: uid}, &lris) - } - } - return true, nil - } - - idx := 0 - for { - mapID := formatUUID(args.UID, idx) - resource, ok := l.lockUID[mapID] - if !ok { - return idx > 0, nil - } + defer l.getMutex()() + if ctx.Err() != nil { + return false, ctx.Err() + } + if len(args.UID) == 0 { + for _, resource := range args.Resources { lris, ok := l.lockMap[resource] if !ok { - // Unexpected inconsistency, delete. - delete(l.lockUID, mapID) - idx++ continue } - reply = true - l.removeEntry(resource, dsync.LockArgs{UID: args.UID}, &lris) - idx++ + // Collect uids, so we don't mutate while we delete + uids := make([]string, 0, len(lris)) + for _, lri := range lris { + uids = append(uids, lri.UID) + } + + // Delete collected uids: + for _, uid := range uids { + lris, ok := l.lockMap[resource] + if !ok { + // Just to be safe, delete uuids. + for idx := 0; idx < maxDeleteList; idx++ { + mapID := formatUUID(uid, idx) + if _, ok := l.lockUID[mapID]; !ok { + break + } + delete(l.lockUID, mapID) + } + continue + } + l.removeEntry(resource, dsync.LockArgs{UID: uid}, &lris) + } } + return true, nil + } + + idx := 0 + for { + mapID := formatUUID(args.UID, idx) + resource, ok := l.lockUID[mapID] + if !ok { + return idx > 0, nil + } + lris, ok := l.lockMap[resource] + if !ok { + // Unexpected inconsistency, delete. + delete(l.lockUID, mapID) + idx++ + continue + } + reply = true + l.removeEntry(resource, dsync.LockArgs{UID: args.UID}, &lris) + idx++ } } func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refreshed bool, err error) { - select { - case <-ctx.Done(): + if ctx.Err() != nil { return false, ctx.Err() - default: - l.mutex.Lock() - defer l.mutex.Unlock() + } - // Check whether uid is still active. - resource, ok := l.lockUID[formatUUID(args.UID, 0)] + defer l.getMutex()() + if ctx.Err() != nil { + return false, ctx.Err() + } + + // Check whether uid is still active. + resource, ok := l.lockUID[formatUUID(args.UID, 0)] + if !ok { + return false, nil + } + idx := 0 + for { + lris, ok := l.lockMap[resource] if !ok { - return false, nil + // Inconsistent. Delete UID. + delete(l.lockUID, formatUUID(args.UID, idx)) + return idx > 0, nil } - idx := 0 - for { - lris, ok := l.lockMap[resource] - if !ok { - // Inconsistent. Delete UID. - delete(l.lockUID, formatUUID(args.UID, idx)) - return idx > 0, nil - } - now := UTCNow() - for i := range lris { - if lris[i].UID == args.UID { - lris[i].TimeLastRefresh = now.UnixNano() - } - } - idx++ - resource, ok = l.lockUID[formatUUID(args.UID, idx)] - if !ok { - // No more resources for UID, but we did update at least one. - return true, nil + now := UTCNow() + for i := range lris { + if lris[i].UID == args.UID { + lris[i].TimeLastRefresh = now.UnixNano() } } + idx++ + resource, ok = l.lockUID[formatUUID(args.UID, idx)] + if !ok { + // No more resources for UID, but we did update at least one. + return true, nil + } } } // Similar to removeEntry but only removes an entry only if the lock entry exists in map. // Caller must hold 'l.mutex' lock. func (l *localLocker) expireOldLocks(interval time.Duration) { - l.mutex.Lock() - defer l.mutex.Unlock() + defer l.getMutex()() + var readers, writers int32 for k, lris := range l.lockMap { modified := false for i := 0; i < len(lris); { @@ -393,6 +425,11 @@ func (l *localLocker) expireOldLocks(interval time.Duration) { lris = append(lris[:i], lris[i+1:]...) // Check same i } else { + if lri.Writer { + writers++ + } else { + readers++ + } // Move to next i++ } @@ -401,6 +438,10 @@ func (l *localLocker) expireOldLocks(interval time.Duration) { l.lockMap[k] = lris } } + t := time.Now() + l.lastCleanup.Store(&t) + l.readers.Store(readers) + l.writers.Store(writers) } func newLocker() *localLocker { diff --git a/cmd/local-locker_gen.go b/cmd/local-locker_gen.go index bfd61e677..cd9df8724 100644 --- a/cmd/local-locker_gen.go +++ b/cmd/local-locker_gen.go @@ -3,6 +3,8 @@ package cmd // Code generated by github.com/tinylib/msgp DO NOT EDIT. import ( + "time" + "github.com/tinylib/msgp/msgp" ) @@ -506,6 +508,36 @@ func (z *lockStats) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Reads") return } + case "LockQueue": + z.LockQueue, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "LockQueue") + return + } + case "LocksAbandoned": + z.LocksAbandoned, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "LocksAbandoned") + return + } + case "LastCleanup": + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + err = msgp.WrapError(err, "LastCleanup") + return + } + z.LastCleanup = nil + } else { + if z.LastCleanup == nil { + z.LastCleanup = new(time.Time) + } + *z.LastCleanup, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "LastCleanup") + return + } + } default: err = dc.Skip() if err != nil { @@ -518,10 +550,10 @@ func (z *lockStats) DecodeMsg(dc *msgp.Reader) (err error) { } // EncodeMsg implements msgp.Encodable -func (z lockStats) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 3 +func (z *lockStats) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 6 // write "Total" - err = en.Append(0x83, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) + err = en.Append(0x86, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) if err != nil { return } @@ -550,15 +582,52 @@ func (z lockStats) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Reads") return } + // write "LockQueue" + err = en.Append(0xa9, 0x4c, 0x6f, 0x63, 0x6b, 0x51, 0x75, 0x65, 0x75, 0x65) + if err != nil { + return + } + err = en.WriteInt(z.LockQueue) + if err != nil { + err = msgp.WrapError(err, "LockQueue") + return + } + // write "LocksAbandoned" + err = en.Append(0xae, 0x4c, 0x6f, 0x63, 0x6b, 0x73, 0x41, 0x62, 0x61, 0x6e, 0x64, 0x6f, 0x6e, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteInt(z.LocksAbandoned) + if err != nil { + err = msgp.WrapError(err, "LocksAbandoned") + return + } + // write "LastCleanup" + err = en.Append(0xab, 0x4c, 0x61, 0x73, 0x74, 0x43, 0x6c, 0x65, 0x61, 0x6e, 0x75, 0x70) + if err != nil { + return + } + if z.LastCleanup == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = en.WriteTime(*z.LastCleanup) + if err != nil { + err = msgp.WrapError(err, "LastCleanup") + return + } + } return } // MarshalMsg implements msgp.Marshaler -func (z lockStats) MarshalMsg(b []byte) (o []byte, err error) { +func (z *lockStats) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 3 + // map header, size 6 // string "Total" - o = append(o, 0x83, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) + o = append(o, 0x86, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) o = msgp.AppendInt(o, z.Total) // string "Writes" o = append(o, 0xa6, 0x57, 0x72, 0x69, 0x74, 0x65, 0x73) @@ -566,6 +635,19 @@ func (z lockStats) MarshalMsg(b []byte) (o []byte, err error) { // string "Reads" o = append(o, 0xa5, 0x52, 0x65, 0x61, 0x64, 0x73) o = msgp.AppendInt(o, z.Reads) + // string "LockQueue" + o = append(o, 0xa9, 0x4c, 0x6f, 0x63, 0x6b, 0x51, 0x75, 0x65, 0x75, 0x65) + o = msgp.AppendInt(o, z.LockQueue) + // string "LocksAbandoned" + o = append(o, 0xae, 0x4c, 0x6f, 0x63, 0x6b, 0x73, 0x41, 0x62, 0x61, 0x6e, 0x64, 0x6f, 0x6e, 0x65, 0x64) + o = msgp.AppendInt(o, z.LocksAbandoned) + // string "LastCleanup" + o = append(o, 0xab, 0x4c, 0x61, 0x73, 0x74, 0x43, 0x6c, 0x65, 0x61, 0x6e, 0x75, 0x70) + if z.LastCleanup == nil { + o = msgp.AppendNil(o) + } else { + o = msgp.AppendTime(o, *z.LastCleanup) + } return } @@ -605,6 +687,35 @@ func (z *lockStats) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Reads") return } + case "LockQueue": + z.LockQueue, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "LockQueue") + return + } + case "LocksAbandoned": + z.LocksAbandoned, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "LocksAbandoned") + return + } + case "LastCleanup": + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + z.LastCleanup = nil + } else { + if z.LastCleanup == nil { + z.LastCleanup = new(time.Time) + } + *z.LastCleanup, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "LastCleanup") + return + } + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -618,7 +729,12 @@ func (z *lockStats) UnmarshalMsg(bts []byte) (o []byte, err error) { } // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z lockStats) Msgsize() (s int) { - s = 1 + 6 + msgp.IntSize + 7 + msgp.IntSize + 6 + msgp.IntSize +func (z *lockStats) Msgsize() (s int) { + s = 1 + 6 + msgp.IntSize + 7 + msgp.IntSize + 6 + msgp.IntSize + 10 + msgp.IntSize + 15 + msgp.IntSize + 12 + if z.LastCleanup == nil { + s += msgp.NilSize + } else { + s += msgp.TimeSize + } return } diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index efabef342..79e3dfdb9 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -33,8 +33,12 @@ type lockRESTServer struct { // RefreshHandler - refresh the current lock func (l *lockRESTServer) RefreshHandler(args *dsync.LockArgs) (*dsync.LockResp, *grid.RemoteErr) { + // Add a timeout similar to what we expect upstream. + ctx, cancel := context.WithTimeout(context.Background(), dsync.DefaultTimeouts.RefreshCall) + defer cancel() + resp := lockRPCRefresh.NewResponse() - refreshed, err := l.ll.Refresh(context.Background(), *args) + refreshed, err := l.ll.Refresh(ctx, *args) if err != nil { return l.makeResp(resp, err) } @@ -46,8 +50,11 @@ func (l *lockRESTServer) RefreshHandler(args *dsync.LockArgs) (*dsync.LockResp, // LockHandler - Acquires a lock. func (l *lockRESTServer) LockHandler(args *dsync.LockArgs) (*dsync.LockResp, *grid.RemoteErr) { + // Add a timeout similar to what we expect upstream. + ctx, cancel := context.WithTimeout(context.Background(), dsync.DefaultTimeouts.Acquire) + defer cancel() resp := lockRPCLock.NewResponse() - success, err := l.ll.Lock(context.Background(), *args) + success, err := l.ll.Lock(ctx, *args) if err == nil && !success { return l.makeResp(resp, errLockConflict) } @@ -65,8 +72,11 @@ func (l *lockRESTServer) UnlockHandler(args *dsync.LockArgs) (*dsync.LockResp, * // RLockHandler - Acquires an RLock. func (l *lockRESTServer) RLockHandler(args *dsync.LockArgs) (*dsync.LockResp, *grid.RemoteErr) { + // Add a timeout similar to what we expect upstream. + ctx, cancel := context.WithTimeout(context.Background(), dsync.DefaultTimeouts.Acquire) + defer cancel() resp := lockRPCRLock.NewResponse() - success, err := l.ll.RLock(context.Background(), *args) + success, err := l.ll.RLock(ctx, *args) if err == nil && !success { err = errLockConflict } diff --git a/internal/dsync/drwmutex.go b/internal/dsync/drwmutex.go index b682e0a45..ab58bb12c 100644 --- a/internal/dsync/drwmutex.go +++ b/internal/dsync/drwmutex.go @@ -443,6 +443,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is // Special context for NetLockers - do not use timeouts. // Also, pass the trace context info if found for debugging netLockCtx := context.Background() + tc, ok := ctx.Value(mcontext.ContextTraceKey).(*mcontext.TraceCtxt) if ok { netLockCtx = context.WithValue(netLockCtx, mcontext.ContextTraceKey, tc)