mirror of
https://github.com/minio/minio.git
synced 2025-05-22 18:11:50 -04:00
parent
b1b0aadabf
commit
8b0ab6ead6
@ -21,6 +21,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/minio/minio/internal/dsync"
|
"github.com/minio/minio/internal/dsync"
|
||||||
@ -50,7 +51,7 @@ func isWriteLock(lri []lockRequesterInfo) bool {
|
|||||||
|
|
||||||
// localLocker implements Dsync.NetLocker
|
// localLocker implements Dsync.NetLocker
|
||||||
type localLocker struct {
|
type localLocker struct {
|
||||||
mutex chan struct{}
|
mutex sync.Mutex
|
||||||
lockMap map[string][]lockRequesterInfo
|
lockMap map[string][]lockRequesterInfo
|
||||||
lockUID map[string]string // UUID -> resource map.
|
lockUID map[string]string // UUID -> resource map.
|
||||||
}
|
}
|
||||||
@ -69,41 +70,13 @@ func (l *localLocker) canTakeLock(resources ...string) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// lockMu locks the "mutex" of the local locker.
|
|
||||||
// If "ctx" is canceled before the lock can be obtained false is returned.
|
|
||||||
// If "true" is returned unlockMu MUST be called.
|
|
||||||
// Behavior is similar to sync.Mutex.
|
|
||||||
func (l *localLocker) lockMu(ctx context.Context) (ok bool) {
|
|
||||||
select {
|
|
||||||
case l.mutex <- struct{}{}:
|
|
||||||
return true
|
|
||||||
case <-ctx.Done():
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// lockMuBlock will block while getting the mutex.
|
|
||||||
// When returned unlockMu *must* be called.
|
|
||||||
// Behavior is similar to sync.Mutex.
|
|
||||||
func (l *localLocker) lockMuBlock() {
|
|
||||||
l.mutex <- struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// unlockMu unlocks an acquired mutex.
|
|
||||||
// This may only be called once after successfully getting a mutex.
|
|
||||||
func (l *localLocker) unlockMu() {
|
|
||||||
<-l.mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
|
func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
|
||||||
if len(args.Resources) > maxDeleteList {
|
if len(args.Resources) > maxDeleteList {
|
||||||
return false, fmt.Errorf("internal error: localLocker.Lock called with more than %d resources", maxDeleteList)
|
return false, fmt.Errorf("internal error: localLocker.Lock called with more than %d resources", maxDeleteList)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !l.lockMu(ctx) {
|
l.mutex.Lock()
|
||||||
return false, ctx.Err()
|
defer l.mutex.Unlock()
|
||||||
}
|
|
||||||
defer l.unlockMu()
|
|
||||||
|
|
||||||
if !l.canTakeLock(args.Resources...) {
|
if !l.canTakeLock(args.Resources...) {
|
||||||
// Not all locks can be taken on resources,
|
// Not all locks can be taken on resources,
|
||||||
@ -142,9 +115,8 @@ func (l *localLocker) Unlock(_ context.Context, args dsync.LockArgs) (reply bool
|
|||||||
return false, fmt.Errorf("internal error: localLocker.Unlock called with more than %d resources", maxDeleteList)
|
return false, fmt.Errorf("internal error: localLocker.Unlock called with more than %d resources", maxDeleteList)
|
||||||
}
|
}
|
||||||
|
|
||||||
l.lockMuBlock()
|
l.mutex.Lock()
|
||||||
defer l.unlockMu()
|
defer l.mutex.Unlock()
|
||||||
|
|
||||||
err = nil
|
err = nil
|
||||||
|
|
||||||
for _, resource := range args.Resources {
|
for _, resource := range args.Resources {
|
||||||
@ -191,11 +163,8 @@ func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply boo
|
|||||||
return false, fmt.Errorf("internal error: localLocker.RLock called with more than one resource")
|
return false, fmt.Errorf("internal error: localLocker.RLock called with more than one resource")
|
||||||
}
|
}
|
||||||
|
|
||||||
if !l.lockMu(ctx) {
|
l.mutex.Lock()
|
||||||
return false, ctx.Err()
|
defer l.mutex.Unlock()
|
||||||
}
|
|
||||||
defer l.unlockMu()
|
|
||||||
|
|
||||||
resource := args.Resources[0]
|
resource := args.Resources[0]
|
||||||
lrInfo := lockRequesterInfo{
|
lrInfo := lockRequesterInfo{
|
||||||
Name: resource,
|
Name: resource,
|
||||||
@ -227,9 +196,8 @@ func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply boo
|
|||||||
return false, fmt.Errorf("internal error: localLocker.RUnlock called with more than one resource")
|
return false, fmt.Errorf("internal error: localLocker.RUnlock called with more than one resource")
|
||||||
}
|
}
|
||||||
|
|
||||||
l.lockMuBlock()
|
l.mutex.Lock()
|
||||||
defer l.unlockMu()
|
defer l.mutex.Unlock()
|
||||||
|
|
||||||
var lri []lockRequesterInfo
|
var lri []lockRequesterInfo
|
||||||
|
|
||||||
resource := args.Resources[0]
|
resource := args.Resources[0]
|
||||||
@ -246,8 +214,8 @@ func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply boo
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *localLocker) DupLockMap() map[string][]lockRequesterInfo {
|
func (l *localLocker) DupLockMap() map[string][]lockRequesterInfo {
|
||||||
l.lockMuBlock()
|
l.mutex.Lock()
|
||||||
defer l.unlockMu()
|
defer l.mutex.Unlock()
|
||||||
|
|
||||||
lockCopy := make(map[string][]lockRequesterInfo, len(l.lockMap))
|
lockCopy := make(map[string][]lockRequesterInfo, len(l.lockMap))
|
||||||
for k, v := range l.lockMap {
|
for k, v := range l.lockMap {
|
||||||
@ -271,90 +239,97 @@ func (l *localLocker) IsLocal() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
|
func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
|
||||||
l.lockMuBlock()
|
select {
|
||||||
defer l.unlockMu()
|
case <-ctx.Done():
|
||||||
|
return false, ctx.Err()
|
||||||
if len(args.UID) == 0 {
|
default:
|
||||||
for _, resource := range args.Resources {
|
l.mutex.Lock()
|
||||||
lris, ok := l.lockMap[resource]
|
defer l.mutex.Unlock()
|
||||||
if !ok {
|
if len(args.UID) == 0 {
|
||||||
continue
|
for _, resource := range args.Resources {
|
||||||
}
|
|
||||||
// Collect uids, so we don't mutate while we delete
|
|
||||||
uids := make([]string, 0, len(lris))
|
|
||||||
for _, lri := range lris {
|
|
||||||
uids = append(uids, lri.UID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete collected uids:
|
|
||||||
for _, uid := range uids {
|
|
||||||
lris, ok := l.lockMap[resource]
|
lris, ok := l.lockMap[resource]
|
||||||
if !ok {
|
if !ok {
|
||||||
// Just to be safe, delete uuids.
|
|
||||||
for idx := 0; idx < maxDeleteList; idx++ {
|
|
||||||
mapID := formatUUID(uid, idx)
|
|
||||||
if _, ok := l.lockUID[mapID]; !ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
delete(l.lockUID, mapID)
|
|
||||||
}
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
l.removeEntry(resource, dsync.LockArgs{UID: uid}, &lris)
|
// Collect uids, so we don't mutate while we delete
|
||||||
}
|
uids := make([]string, 0, len(lris))
|
||||||
}
|
for _, lri := range lris {
|
||||||
return true, nil
|
uids = append(uids, lri.UID)
|
||||||
}
|
}
|
||||||
|
|
||||||
idx := 0
|
// Delete collected uids:
|
||||||
for {
|
for _, uid := range uids {
|
||||||
mapID := formatUUID(args.UID, idx)
|
lris, ok := l.lockMap[resource]
|
||||||
resource, ok := l.lockUID[mapID]
|
if !ok {
|
||||||
if !ok {
|
// Just to be safe, delete uuids.
|
||||||
return idx > 0, nil
|
for idx := 0; idx < maxDeleteList; idx++ {
|
||||||
|
mapID := formatUUID(uid, idx)
|
||||||
|
if _, ok := l.lockUID[mapID]; !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
delete(l.lockUID, mapID)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
l.removeEntry(resource, dsync.LockArgs{UID: uid}, &lris)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
}
|
}
|
||||||
lris, ok := l.lockMap[resource]
|
|
||||||
if !ok {
|
idx := 0
|
||||||
// Unexpected inconsistency, delete.
|
for {
|
||||||
delete(l.lockUID, mapID)
|
mapID := formatUUID(args.UID, idx)
|
||||||
|
resource, ok := l.lockUID[mapID]
|
||||||
|
if !ok {
|
||||||
|
return idx > 0, nil
|
||||||
|
}
|
||||||
|
lris, ok := l.lockMap[resource]
|
||||||
|
if !ok {
|
||||||
|
// Unexpected inconsistency, delete.
|
||||||
|
delete(l.lockUID, mapID)
|
||||||
|
idx++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
reply = true
|
||||||
|
l.removeEntry(resource, dsync.LockArgs{UID: args.UID}, &lris)
|
||||||
idx++
|
idx++
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
reply = true
|
|
||||||
l.removeEntry(resource, dsync.LockArgs{UID: args.UID}, &lris)
|
|
||||||
idx++
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refreshed bool, err error) {
|
func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refreshed bool, err error) {
|
||||||
if !l.lockMu(ctx) {
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
return false, ctx.Err()
|
return false, ctx.Err()
|
||||||
}
|
default:
|
||||||
defer l.unlockMu()
|
l.mutex.Lock()
|
||||||
|
defer l.mutex.Unlock()
|
||||||
|
|
||||||
// Check whether uid is still active.
|
// Check whether uid is still active.
|
||||||
resource, ok := l.lockUID[formatUUID(args.UID, 0)]
|
resource, ok := l.lockUID[formatUUID(args.UID, 0)]
|
||||||
if !ok {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
idx := 0
|
|
||||||
for {
|
|
||||||
lris, ok := l.lockMap[resource]
|
|
||||||
if !ok {
|
if !ok {
|
||||||
// Inconsistent. Delete UID.
|
return false, nil
|
||||||
delete(l.lockUID, formatUUID(args.UID, idx))
|
|
||||||
return idx > 0, nil
|
|
||||||
}
|
}
|
||||||
for i := range lris {
|
idx := 0
|
||||||
if lris[i].UID == args.UID {
|
for {
|
||||||
lris[i].TimeLastRefresh = UTCNow()
|
lris, ok := l.lockMap[resource]
|
||||||
|
if !ok {
|
||||||
|
// Inconsistent. Delete UID.
|
||||||
|
delete(l.lockUID, formatUUID(args.UID, idx))
|
||||||
|
return idx > 0, nil
|
||||||
|
}
|
||||||
|
for i := range lris {
|
||||||
|
if lris[i].UID == args.UID {
|
||||||
|
lris[i].TimeLastRefresh = UTCNow()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
idx++
|
||||||
|
resource, ok = l.lockUID[formatUUID(args.UID, idx)]
|
||||||
|
if !ok {
|
||||||
|
// No more resources for UID, but we did update at least one.
|
||||||
|
return true, nil
|
||||||
}
|
}
|
||||||
}
|
|
||||||
idx++
|
|
||||||
resource, ok = l.lockUID[formatUUID(args.UID, idx)]
|
|
||||||
if !ok {
|
|
||||||
// No more resources for UID, but we did update at least one.
|
|
||||||
return true, nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -362,8 +337,8 @@ func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refresh
|
|||||||
// Similar to removeEntry but only removes an entry only if the lock entry exists in map.
|
// Similar to removeEntry but only removes an entry only if the lock entry exists in map.
|
||||||
// Caller must hold 'l.mutex' lock.
|
// Caller must hold 'l.mutex' lock.
|
||||||
func (l *localLocker) expireOldLocks(interval time.Duration) {
|
func (l *localLocker) expireOldLocks(interval time.Duration) {
|
||||||
l.lockMuBlock()
|
l.mutex.Lock()
|
||||||
defer l.unlockMu()
|
defer l.mutex.Unlock()
|
||||||
|
|
||||||
for k, lris := range l.lockMap {
|
for k, lris := range l.lockMap {
|
||||||
modified := false
|
modified := false
|
||||||
@ -394,7 +369,6 @@ func (l *localLocker) expireOldLocks(interval time.Duration) {
|
|||||||
|
|
||||||
func newLocker() *localLocker {
|
func newLocker() *localLocker {
|
||||||
return &localLocker{
|
return &localLocker{
|
||||||
mutex: make(chan struct{}, 1),
|
|
||||||
lockMap: make(map[string][]lockRequesterInfo, 1000),
|
lockMap: make(map[string][]lockRequesterInfo, 1000),
|
||||||
lockUID: make(map[string]string, 1000),
|
lockUID: make(map[string]string, 1000),
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/minio/minio/internal/dsync"
|
"github.com/minio/minio/internal/dsync"
|
||||||
@ -37,7 +38,10 @@ func createLockTestServer(ctx context.Context, t *testing.T) (string, *lockRESTS
|
|||||||
}
|
}
|
||||||
|
|
||||||
locker := &lockRESTServer{
|
locker := &lockRESTServer{
|
||||||
ll: newLocker(),
|
ll: &localLocker{
|
||||||
|
mutex: sync.Mutex{},
|
||||||
|
lockMap: make(map[string][]lockRequesterInfo),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
creds := globalActiveCred
|
creds := globalActiveCred
|
||||||
token, err := authenticateNode(creds.AccessKey, creds.SecretKey, "")
|
token, err := authenticateNode(creds.AccessKey, creds.SecretKey, "")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user