diff --git a/cmd/local-locker.go b/cmd/local-locker.go index f24ecad74..17ac4d05c 100644 --- a/cmd/local-locker.go +++ b/cmd/local-locker.go @@ -20,6 +20,8 @@ package cmd import ( "context" "fmt" + "math" + "strconv" "sync" "time" @@ -40,6 +42,7 @@ type lockRequesterInfo struct { Owner string // Quorum represents the quorum required for this lock to be active. Quorum int + idx int } // isWriteLock returns whether the lock is a write or read lock. @@ -51,6 +54,7 @@ func isWriteLock(lri []lockRequesterInfo) bool { type localLocker struct { mutex sync.Mutex lockMap map[string][]lockRequesterInfo + lockUID map[string]string // UUID -> resource map. } func (l *localLocker) String() string { @@ -58,25 +62,22 @@ func (l *localLocker) String() string { } func (l *localLocker) canTakeUnlock(resources ...string) bool { - var lkCnt int for _, resource := range resources { - isWriteLockTaken := isWriteLock(l.lockMap[resource]) - if isWriteLockTaken { - lkCnt++ + if !isWriteLock(l.lockMap[resource]) { + return false } } - return lkCnt == len(resources) + return true } func (l *localLocker) canTakeLock(resources ...string) bool { - var noLkCnt int for _, resource := range resources { _, lockTaken := l.lockMap[resource] - if !lockTaken { - noLkCnt++ + if lockTaken { + return false } } - return noLkCnt == len(resources) + return true } func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { @@ -91,7 +92,7 @@ func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool // No locks held on the all resources, so claim write // lock on all resources at once. - for _, resource := range args.Resources { + for i, resource := range args.Resources { l.lockMap[resource] = []lockRequesterInfo{ { Name: resource, @@ -103,37 +104,46 @@ func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool TimeLastRefresh: UTCNow(), Group: len(args.Resources) > 1, Quorum: args.Quorum, + idx: i, }, } + l.lockUID[formatUUID(args.UID, i)] = resource } return true, nil } +func formatUUID(s string, idx int) string { + return s + strconv.Itoa(idx) +} + func (l *localLocker) Unlock(_ context.Context, args dsync.LockArgs) (reply bool, err error) { l.mutex.Lock() defer l.mutex.Unlock() + err = nil - if !l.canTakeUnlock(args.Resources...) { - // Unless it is a write lock reject it. - return reply, fmt.Errorf("Unlock attempted on a read locked entity: %s", args.Resources) - } for _, resource := range args.Resources { + if !l.canTakeUnlock(resource) { + // Unless it is a write lock reject it. + err = fmt.Errorf("unlock attempted on a read locked entity: %s", resource) + continue + } lri, ok := l.lockMap[resource] if ok { - l.removeEntry(resource, args, &lri) + reply = l.removeEntry(resource, args, &lri) || reply } } - return true, nil + return } // removeEntry based on the uid of the lock message, removes a single entry from the // lockRequesterInfo array or the whole array from the map (in case of a write lock // or last read lock) +// UID and optionally owner must match for entries to be deleted. func (l *localLocker) removeEntry(name string, args dsync.LockArgs, lri *[]lockRequesterInfo) bool { // Find correct entry to remove based on uid. for index, entry := range *lri { - if entry.UID == args.UID && entry.Owner == args.Owner { + if entry.UID == args.UID && (args.Owner == "" || entry.Owner == args.Owner) { if len(*lri) == 1 { // Remove the write lock. delete(l.lockMap, name) @@ -142,6 +152,7 @@ func (l *localLocker) removeEntry(name string, args dsync.LockArgs, lri *[]lockR *lri = append((*lri)[:index], (*lri)[index+1:]...) l.lockMap[name] = *lri } + delete(l.lockUID, formatUUID(args.UID, entry.idx)) return true } } @@ -151,6 +162,10 @@ func (l *localLocker) removeEntry(name string, args dsync.LockArgs, lri *[]lockR } func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { + if len(args.Resources) > 1 { + return false, fmt.Errorf("internal error: localLocker.RLock called with more than one resource") + } + l.mutex.Lock() defer l.mutex.Unlock() resource := args.Resources[0] @@ -168,16 +183,22 @@ func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply boo if reply = !isWriteLock(lri); reply { // Unless there is a write lock l.lockMap[resource] = append(l.lockMap[resource], lrInfo) + l.lockUID[args.UID] = formatUUID(resource, 0) } } else { // No locks held on the given name, so claim (first) read lock l.lockMap[resource] = []lockRequesterInfo{lrInfo} + l.lockUID[args.UID] = formatUUID(resource, 0) reply = true } return reply, nil } func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply bool, err error) { + if len(args.Resources) > 1 { + return false, fmt.Errorf("internal error: localLocker.RUnlock called with more than one resource") + } + l.mutex.Lock() defer l.mutex.Unlock() var lri []lockRequesterInfo @@ -187,9 +208,9 @@ func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply boo // No lock is held on the given name return true, nil } - if reply = !isWriteLock(lri); !reply { + if isWriteLock(lri) { // A write-lock is held, cannot release a read lock - return reply, fmt.Errorf("RUnlock attempted on a write locked entity: %s", resource) + return false, fmt.Errorf("RUnlock attempted on a write locked entity: %s", resource) } l.removeEntry(resource, args, &lri) return reply, nil @@ -199,9 +220,9 @@ func (l *localLocker) DupLockMap() map[string][]lockRequesterInfo { l.mutex.Lock() defer l.mutex.Unlock() - lockCopy := map[string][]lockRequesterInfo{} + lockCopy := make(map[string][]lockRequesterInfo, len(l.lockMap)) for k, v := range l.lockMap { - lockCopy[k] = append(lockCopy[k], v...) + lockCopy[k] = append(make([]lockRequesterInfo, 0, len(v)), v...) } return lockCopy } @@ -229,22 +250,53 @@ func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (rep defer l.mutex.Unlock() if len(args.UID) == 0 { for _, resource := range args.Resources { - delete(l.lockMap, resource) // Remove the lock (irrespective of write or read lock) + lris, ok := l.lockMap[resource] + if !ok { + continue + } + // Collect uids, so we don't mutate while we delete + uids := make([]string, 0, len(lris)) + for _, lri := range lris { + uids = append(uids, lri.UID) + } + + // Delete collected uids: + for _, uid := range uids { + lris, ok := l.lockMap[resource] + if !ok { + // Just to be safe, delete uuids. + for idx := 0; idx < math.MaxInt32; idx++ { + mapID := formatUUID(uid, idx) + if _, ok := l.lockUID[mapID]; !ok { + break + } + delete(l.lockUID, mapID) + } + continue + } + l.removeEntry(resource, dsync.LockArgs{UID: uid}, &lris) + } } return true, nil } - lockFound := false - for _, lris := range l.lockMap { - for _, lri := range lris { - if lri.UID == args.UID { - l.removeEntry(lri.Name, dsync.LockArgs{UID: lri.UID}, &lris) - lockFound = true - } + idx := 0 + for { + resource, ok := l.lockUID[formatUUID(args.UID, idx)] + if !ok { + return idx > 0, nil } + lris, ok := l.lockMap[resource] + if !ok { + // Unexpected inconsistency, delete. + delete(l.lockUID, formatUUID(args.UID, idx)) + idx++ + continue + } + reply = true + l.removeEntry(resource, dsync.LockArgs{UID: args.UID}, &lris) + idx++ } - - return lockFound, nil } } @@ -256,18 +308,31 @@ func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refresh l.mutex.Lock() defer l.mutex.Unlock() - lockFound := false - for _, lri := range l.lockMap { - // Check whether uid is still active - for i := range lri { - if lri[i].UID == args.UID { - lri[i].TimeLastRefresh = UTCNow() - lockFound = true + // Check whether uid is still active. + resource, ok := l.lockUID[formatUUID(args.UID, 0)] + if !ok { + return false, nil + } + idx := 0 + for { + lris, ok := l.lockMap[resource] + if !ok { + // Inconsistent. Delete UID. + delete(l.lockUID, formatUUID(args.UID, idx)) + return idx > 0, nil + } + for i := range lris { + if lris[i].UID == args.UID { + lris[i].TimeLastRefresh = UTCNow() } } + idx++ + resource, ok = l.lockUID[formatUUID(args.UID, idx)] + if !ok { + // No more resources for UID, but we did update at least one. + return true, nil + } } - - return lockFound, nil } } @@ -277,10 +342,24 @@ func (l *localLocker) expireOldLocks(interval time.Duration) { l.mutex.Lock() defer l.mutex.Unlock() - for _, lris := range l.lockMap { - for _, lri := range lris { - if time.Since(lri.TimeLastRefresh) > interval { - l.removeEntry(lri.Name, dsync.LockArgs{Owner: lri.Owner, UID: lri.UID}, &lris) + for k := range l.lockMap { + found := false + // Since we mutate the value, remove one per loop. + for { + lris, ok := l.lockMap[k] + if !ok { + break + } + for _, lri := range lris { + if time.Since(lri.TimeLastRefresh) > interval { + l.removeEntry(lri.Name, dsync.LockArgs{Owner: lri.Owner, UID: lri.UID}, &lris) + found = true + break + } + } + // We did not find any more to expire. + if !found { + break } } } @@ -288,6 +367,7 @@ func (l *localLocker) expireOldLocks(interval time.Duration) { func newLocker() *localLocker { return &localLocker{ - lockMap: make(map[string][]lockRequesterInfo), + lockMap: make(map[string][]lockRequesterInfo, 1000), + lockUID: make(map[string]string, 1000), } } diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index 88f832413..353f86cc0 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -34,7 +34,7 @@ const ( lockMaintenanceInterval = 1 * time.Minute // Lock validity duration - lockValidityDuration = 20 * time.Second + lockValidityDuration = 1 * time.Minute ) // To abstract a node over network. diff --git a/cmd/lock-rest-server_test.go b/cmd/lock-rest-server_test.go index 1a8fee8c4..93018a577 100644 --- a/cmd/lock-rest-server_test.go +++ b/cmd/lock-rest-server_test.go @@ -44,13 +44,12 @@ func BenchmarkLockArgs(b *testing.B) { b.Fatal(err) } - req := &http.Request{ - Body: ioutil.NopCloser(bytes.NewReader(argBytes)), - } + req := &http.Request{} b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { + req.Body = ioutil.NopCloser(bytes.NewReader(argBytes)) getLockArgs(req) } } @@ -64,12 +63,12 @@ func BenchmarkLockArgsOld(b *testing.B) { req := &http.Request{ Form: values, - Body: ioutil.NopCloser(bytes.NewReader([]byte(`obj.txt`))), } b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { + req.Body = ioutil.NopCloser(bytes.NewReader([]byte(`obj.txt`))) getLockArgsOld(req) } }