// Copyright (c) 2015-2021 MinIO, Inc. // // This file is part of MinIO Object Storage stack // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . package cmd import ( "context" "fmt" "sync" "time" "github.com/minio/minio/pkg/dsync" ) // lockRequesterInfo stores various info from the client for each lock that is requested. type lockRequesterInfo struct { Name string // name of the resource lock was requested for Writer bool // Bool whether write or read lock. UID string // UID to uniquely identify request of client. Timestamp time.Time // Timestamp set at the time of initialization. TimeLastRefresh time.Time // Timestamp for last lock refresh. Source string // Contains line, function and filename reqesting the lock. Group bool // indicates if it was a group lock. // Owner represents the UUID of the owner who originally requested the lock // useful in expiry. Owner string // Quorum represents the quorum required for this lock to be active. Quorum int } // isWriteLock returns whether the lock is a write or read lock. func isWriteLock(lri []lockRequesterInfo) bool { return len(lri) == 1 && lri[0].Writer } // localLocker implements Dsync.NetLocker type localLocker struct { mutex sync.Mutex lockMap map[string][]lockRequesterInfo } func (l *localLocker) String() string { return globalEndpoints.Localhost() } func (l *localLocker) canTakeUnlock(resources ...string) bool { var lkCnt int for _, resource := range resources { isWriteLockTaken := isWriteLock(l.lockMap[resource]) if isWriteLockTaken { lkCnt++ } } return lkCnt == len(resources) } func (l *localLocker) canTakeLock(resources ...string) bool { var noLkCnt int for _, resource := range resources { _, lockTaken := l.lockMap[resource] if !lockTaken { noLkCnt++ } } return noLkCnt == len(resources) } func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { l.mutex.Lock() defer l.mutex.Unlock() if !l.canTakeLock(args.Resources...) { // Not all locks can be taken on resources, // reject it completely. return false, nil } // No locks held on the all resources, so claim write // lock on all resources at once. for _, resource := range args.Resources { l.lockMap[resource] = []lockRequesterInfo{ { Name: resource, Writer: true, Source: args.Source, Owner: args.Owner, UID: args.UID, Timestamp: UTCNow(), TimeLastRefresh: UTCNow(), Group: len(args.Resources) > 1, Quorum: args.Quorum, }, } } return true, nil } func (l *localLocker) Unlock(_ context.Context, args dsync.LockArgs) (reply bool, err error) { l.mutex.Lock() defer l.mutex.Unlock() if !l.canTakeUnlock(args.Resources...) { // Unless it is a write lock reject it. return reply, fmt.Errorf("Unlock attempted on a read locked entity: %s", args.Resources) } for _, resource := range args.Resources { lri, ok := l.lockMap[resource] if ok { l.removeEntry(resource, args, &lri) } } return true, nil } // removeEntry based on the uid of the lock message, removes a single entry from the // lockRequesterInfo array or the whole array from the map (in case of a write lock // or last read lock) func (l *localLocker) removeEntry(name string, args dsync.LockArgs, lri *[]lockRequesterInfo) bool { // Find correct entry to remove based on uid. for index, entry := range *lri { if entry.UID == args.UID && entry.Owner == args.Owner { if len(*lri) == 1 { // Remove the write lock. delete(l.lockMap, name) } else { // Remove the appropriate read lock. *lri = append((*lri)[:index], (*lri)[index+1:]...) l.lockMap[name] = *lri } return true } } // None found return false, perhaps entry removed in previous run. return false } func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { l.mutex.Lock() defer l.mutex.Unlock() resource := args.Resources[0] lrInfo := lockRequesterInfo{ Name: resource, Writer: false, Source: args.Source, Owner: args.Owner, UID: args.UID, Timestamp: UTCNow(), TimeLastRefresh: UTCNow(), Quorum: args.Quorum, } if lri, ok := l.lockMap[resource]; ok { if reply = !isWriteLock(lri); reply { // Unless there is a write lock l.lockMap[resource] = append(l.lockMap[resource], lrInfo) } } else { // No locks held on the given name, so claim (first) read lock l.lockMap[resource] = []lockRequesterInfo{lrInfo} reply = true } return reply, nil } func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply bool, err error) { l.mutex.Lock() defer l.mutex.Unlock() var lri []lockRequesterInfo resource := args.Resources[0] if lri, reply = l.lockMap[resource]; !reply { // No lock is held on the given name return true, nil } if reply = !isWriteLock(lri); !reply { // A write-lock is held, cannot release a read lock return reply, fmt.Errorf("RUnlock attempted on a write locked entity: %s", resource) } l.removeEntry(resource, args, &lri) return reply, nil } func (l *localLocker) DupLockMap() map[string][]lockRequesterInfo { l.mutex.Lock() defer l.mutex.Unlock() lockCopy := map[string][]lockRequesterInfo{} for k, v := range l.lockMap { lockCopy[k] = append(lockCopy[k], v...) } return lockCopy } func (l *localLocker) Close() error { return nil } // IsOnline - local locker is always online. func (l *localLocker) IsOnline() bool { return true } // IsLocal - local locker returns true. func (l *localLocker) IsLocal() bool { return true } func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { select { case <-ctx.Done(): return false, ctx.Err() default: l.mutex.Lock() defer l.mutex.Unlock() if len(args.UID) != 0 { return false, fmt.Errorf("ForceUnlock called with non-empty UID: %s", args.UID) } for _, resource := range args.Resources { delete(l.lockMap, resource) // Remove the lock (irrespective of write or read lock) } return true, nil } } func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refreshed bool, err error) { select { case <-ctx.Done(): return false, ctx.Err() default: l.mutex.Lock() defer l.mutex.Unlock() resource := args.Resources[0] // refresh check is always per resource. // Lock found, proceed to verify if belongs to given uid. lri, ok := l.lockMap[resource] if !ok { // lock doesn't exist yet, return false return false, nil } // Check whether uid is still active for i := range lri { if lri[i].UID == args.UID && lri[i].Owner == args.Owner { lri[i].TimeLastRefresh = UTCNow() return true, nil } } return false, nil } } // Similar to removeEntry but only removes an entry only if the lock entry exists in map. // Caller must hold 'l.mutex' lock. func (l *localLocker) expireOldLocks(interval time.Duration) { l.mutex.Lock() defer l.mutex.Unlock() for _, lris := range l.lockMap { for _, lri := range lris { if time.Since(lri.TimeLastRefresh) > interval { l.removeEntry(lri.Name, dsync.LockArgs{Owner: lri.Owner, UID: lri.UID}, &lris) } } } } func newLocker() *localLocker { return &localLocker{ lockMap: make(map[string][]lockRequesterInfo), } }