minio/cmd/local-locker.go

289 lines
7.9 KiB
Go

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"fmt"
"sync"
"time"
"github.com/minio/minio/internal/dsync"
)
// lockRequesterInfo stores various info from the client for each lock that is requested.
type lockRequesterInfo struct {
Name string // name of the resource lock was requested for
Writer bool // Bool whether write or read lock.
UID string // UID to uniquely identify request of client.
Timestamp time.Time // Timestamp set at the time of initialization.
TimeLastRefresh time.Time // Timestamp for last lock refresh.
Source string // Contains line, function and filename reqesting the lock.
Group bool // indicates if it was a group lock.
// Owner represents the UUID of the owner who originally requested the lock
// useful in expiry.
Owner string
// Quorum represents the quorum required for this lock to be active.
Quorum int
}
// isWriteLock returns whether the lock is a write or read lock.
func isWriteLock(lri []lockRequesterInfo) bool {
return len(lri) == 1 && lri[0].Writer
}
// localLocker implements Dsync.NetLocker
type localLocker struct {
mutex sync.Mutex
lockMap map[string][]lockRequesterInfo
}
func (l *localLocker) String() string {
return globalEndpoints.Localhost()
}
func (l *localLocker) canTakeUnlock(resources ...string) bool {
var lkCnt int
for _, resource := range resources {
isWriteLockTaken := isWriteLock(l.lockMap[resource])
if isWriteLockTaken {
lkCnt++
}
}
return lkCnt == len(resources)
}
func (l *localLocker) canTakeLock(resources ...string) bool {
var noLkCnt int
for _, resource := range resources {
_, lockTaken := l.lockMap[resource]
if !lockTaken {
noLkCnt++
}
}
return noLkCnt == len(resources)
}
func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
l.mutex.Lock()
defer l.mutex.Unlock()
if !l.canTakeLock(args.Resources...) {
// Not all locks can be taken on resources,
// reject it completely.
return false, nil
}
// No locks held on the all resources, so claim write
// lock on all resources at once.
for _, resource := range args.Resources {
l.lockMap[resource] = []lockRequesterInfo{
{
Name: resource,
Writer: true,
Source: args.Source,
Owner: args.Owner,
UID: args.UID,
Timestamp: UTCNow(),
TimeLastRefresh: UTCNow(),
Group: len(args.Resources) > 1,
Quorum: args.Quorum,
},
}
}
return true, nil
}
func (l *localLocker) Unlock(_ context.Context, args dsync.LockArgs) (reply bool, err error) {
l.mutex.Lock()
defer l.mutex.Unlock()
if !l.canTakeUnlock(args.Resources...) {
// Unless it is a write lock reject it.
return reply, fmt.Errorf("Unlock attempted on a read locked entity: %s", args.Resources)
}
for _, resource := range args.Resources {
lri, ok := l.lockMap[resource]
if ok {
l.removeEntry(resource, args, &lri)
}
}
return true, nil
}
// removeEntry based on the uid of the lock message, removes a single entry from the
// lockRequesterInfo array or the whole array from the map (in case of a write lock
// or last read lock)
func (l *localLocker) removeEntry(name string, args dsync.LockArgs, lri *[]lockRequesterInfo) bool {
// Find correct entry to remove based on uid.
for index, entry := range *lri {
if entry.UID == args.UID && entry.Owner == args.Owner {
if len(*lri) == 1 {
// Remove the write lock.
delete(l.lockMap, name)
} else {
// Remove the appropriate read lock.
*lri = append((*lri)[:index], (*lri)[index+1:]...)
l.lockMap[name] = *lri
}
return true
}
}
// None found return false, perhaps entry removed in previous run.
return false
}
func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
l.mutex.Lock()
defer l.mutex.Unlock()
resource := args.Resources[0]
lrInfo := lockRequesterInfo{
Name: resource,
Writer: false,
Source: args.Source,
Owner: args.Owner,
UID: args.UID,
Timestamp: UTCNow(),
TimeLastRefresh: UTCNow(),
Quorum: args.Quorum,
}
if lri, ok := l.lockMap[resource]; ok {
if reply = !isWriteLock(lri); reply {
// Unless there is a write lock
l.lockMap[resource] = append(l.lockMap[resource], lrInfo)
}
} else {
// No locks held on the given name, so claim (first) read lock
l.lockMap[resource] = []lockRequesterInfo{lrInfo}
reply = true
}
return reply, nil
}
func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply bool, err error) {
l.mutex.Lock()
defer l.mutex.Unlock()
var lri []lockRequesterInfo
resource := args.Resources[0]
if lri, reply = l.lockMap[resource]; !reply {
// No lock is held on the given name
return true, nil
}
if reply = !isWriteLock(lri); !reply {
// A write-lock is held, cannot release a read lock
return reply, fmt.Errorf("RUnlock attempted on a write locked entity: %s", resource)
}
l.removeEntry(resource, args, &lri)
return reply, nil
}
func (l *localLocker) DupLockMap() map[string][]lockRequesterInfo {
l.mutex.Lock()
defer l.mutex.Unlock()
lockCopy := map[string][]lockRequesterInfo{}
for k, v := range l.lockMap {
lockCopy[k] = append(lockCopy[k], v...)
}
return lockCopy
}
func (l *localLocker) Close() error {
return nil
}
// IsOnline - local locker is always online.
func (l *localLocker) IsOnline() bool {
return true
}
// IsLocal - local locker returns true.
func (l *localLocker) IsLocal() bool {
return true
}
func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
l.mutex.Lock()
defer l.mutex.Unlock()
if len(args.UID) != 0 {
return false, fmt.Errorf("ForceUnlock called with non-empty UID: %s", args.UID)
}
for _, resource := range args.Resources {
delete(l.lockMap, resource) // Remove the lock (irrespective of write or read lock)
}
return true, nil
}
}
func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refreshed bool, err error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
l.mutex.Lock()
defer l.mutex.Unlock()
resource := args.Resources[0] // refresh check is always per resource.
// Lock found, proceed to verify if belongs to given uid.
lri, ok := l.lockMap[resource]
if !ok {
// lock doesn't exist yet, return false
return false, nil
}
// Check whether uid is still active
for i := range lri {
if lri[i].UID == args.UID && lri[i].Owner == args.Owner {
lri[i].TimeLastRefresh = UTCNow()
return true, nil
}
}
return false, nil
}
}
// Similar to removeEntry but only removes an entry only if the lock entry exists in map.
// Caller must hold 'l.mutex' lock.
func (l *localLocker) expireOldLocks(interval time.Duration) {
l.mutex.Lock()
defer l.mutex.Unlock()
for _, lris := range l.lockMap {
for _, lri := range lris {
if time.Since(lri.TimeLastRefresh) > interval {
l.removeEntry(lri.Name, dsync.LockArgs{Owner: lri.Owner, UID: lri.UID}, &lris)
}
}
}
}
func newLocker() *localLocker {
return &localLocker{
lockMap: make(map[string][]lockRequesterInfo),
}
}