mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
fix: possiblity of double write lockers on same resource (#9616)
To avoid this issue with refCounter refactor the code such that - locker() always increases refCount upon success - unlocker() always decrements refCount upon success (as a special case removes the resource if the refCount is zero) By these two assumptions we are able to see that we are never granted two write lockers in any situation. Thanks to @vcabbage for writing a nice reproducer.
This commit is contained in:
parent
a3f41c7049
commit
6de410a0aa
@ -18,17 +18,18 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
pathutil "path"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/minio/lsync"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/dsync"
|
||||
)
|
||||
|
||||
@ -57,7 +58,7 @@ func newNSLock(isDistXL bool) *nsLockMap {
|
||||
|
||||
// nsLock - provides primitives for locking critical namespace regions.
|
||||
type nsLock struct {
|
||||
ref uint32
|
||||
ref int32
|
||||
*lsync.LRWMutex
|
||||
}
|
||||
|
||||
@ -72,21 +73,18 @@ type nsLockMap struct {
|
||||
|
||||
// Lock the namespace resource.
|
||||
func (n *nsLockMap) lock(ctx context.Context, volume string, path string, lockSource, opsID string, readLock bool, timeout time.Duration) (locked bool) {
|
||||
var nsLk *nsLock
|
||||
|
||||
resource := pathJoin(volume, path)
|
||||
|
||||
n.lockMapMutex.Lock()
|
||||
if _, found := n.lockMap[resource]; !found {
|
||||
n.lockMap[resource] = &nsLock{
|
||||
nsLk, found := n.lockMap[resource]
|
||||
if !found {
|
||||
nsLk = &nsLock{
|
||||
LRWMutex: lsync.NewLRWMutex(ctx),
|
||||
ref: 1,
|
||||
}
|
||||
} else {
|
||||
// Update ref count here to avoid multiple races.
|
||||
atomic.AddUint32(&n.lockMap[resource].ref, 1)
|
||||
// Add a count to indicate that a parallel unlock doesn't clear this entry.
|
||||
}
|
||||
nsLk = n.lockMap[resource]
|
||||
nsLk.ref++
|
||||
n.lockMap[resource] = nsLk
|
||||
n.lockMapMutex.Unlock()
|
||||
|
||||
// Locking here will block (until timeout).
|
||||
@ -97,15 +95,19 @@ func (n *nsLockMap) lock(ctx context.Context, volume string, path string, lockSo
|
||||
}
|
||||
|
||||
if !locked { // We failed to get the lock
|
||||
|
||||
// Decrement ref count since we failed to get the lock
|
||||
if atomic.AddUint32(&nsLk.ref, ^uint32(0)) == 0 {
|
||||
// Remove from the map if there are no more references.
|
||||
n.lockMapMutex.Lock()
|
||||
delete(n.lockMap, resource)
|
||||
n.lockMapMutex.Unlock()
|
||||
n.lockMapMutex.Lock()
|
||||
n.lockMap[resource].ref--
|
||||
if n.lockMap[resource].ref < 0 {
|
||||
logger.CriticalIf(GlobalContext, errors.New("resource reference count was lower than 0"))
|
||||
}
|
||||
if n.lockMap[resource].ref == 0 {
|
||||
// Remove from the map if there are no more references.
|
||||
delete(n.lockMap, resource)
|
||||
}
|
||||
n.lockMapMutex.Unlock()
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@ -123,7 +125,11 @@ func (n *nsLockMap) unlock(volume string, path string, readLock bool) {
|
||||
} else {
|
||||
n.lockMap[resource].Unlock()
|
||||
}
|
||||
if atomic.AddUint32(&n.lockMap[resource].ref, ^uint32(0)) == 0 {
|
||||
n.lockMap[resource].ref--
|
||||
if n.lockMap[resource].ref < 0 {
|
||||
logger.CriticalIf(GlobalContext, errors.New("resource reference count was lower than 0"))
|
||||
}
|
||||
if n.lockMap[resource].ref == 0 {
|
||||
// Remove from the map if there are no more references.
|
||||
delete(n.lockMap, resource)
|
||||
}
|
||||
|
@ -17,7 +17,10 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// WARNING:
|
||||
@ -29,9 +32,69 @@ import (
|
||||
func TestGetSource(t *testing.T) {
|
||||
currentSource := func() string { return getSource(2) }
|
||||
gotSource := currentSource()
|
||||
// Hard coded line number, 31, in the "expectedSource" value
|
||||
expectedSource := "[namespace-lock_test.go:31:TestGetSource()]"
|
||||
// Hard coded line number, 34, in the "expectedSource" value
|
||||
expectedSource := "[namespace-lock_test.go:34:TestGetSource()]"
|
||||
if gotSource != expectedSource {
|
||||
t.Errorf("expected : %s, got : %s", expectedSource, gotSource)
|
||||
}
|
||||
}
|
||||
|
||||
// Test lock race
|
||||
func TestNSLockRace(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
for i := 0; i < 10000; i++ {
|
||||
nsLk := newNSLock(false)
|
||||
|
||||
// lk1; ref=1
|
||||
if !nsLk.lock(ctx, "volume", "path", "source", "opsID", false, time.Second) {
|
||||
t.Fatal("failed to acquire lock")
|
||||
}
|
||||
|
||||
// lk2
|
||||
lk2ch := make(chan struct{})
|
||||
go func() {
|
||||
defer close(lk2ch)
|
||||
nsLk.lock(ctx, "volume", "path", "source", "opsID", false, 1*time.Millisecond)
|
||||
}()
|
||||
time.Sleep(1 * time.Millisecond) // wait for goroutine to advance; ref=2
|
||||
|
||||
// Unlock the 1st lock; ref=1 after this line
|
||||
nsLk.unlock("volume", "path", false)
|
||||
|
||||
// Taking another lockMapMutex here allows queuing up additional lockers. This should
|
||||
// not be required but makes reproduction much easier.
|
||||
nsLk.lockMapMutex.Lock()
|
||||
|
||||
// lk3 blocks.
|
||||
lk3ch := make(chan bool)
|
||||
go func() {
|
||||
lk3ch <- nsLk.lock(ctx, "volume", "path", "source", "opsID", false, 0)
|
||||
}()
|
||||
|
||||
// lk4, blocks.
|
||||
lk4ch := make(chan bool)
|
||||
go func() {
|
||||
lk4ch <- nsLk.lock(ctx, "volume", "path", "source", "opsID", false, 0)
|
||||
}()
|
||||
runtime.Gosched()
|
||||
|
||||
// unlock the manual lock
|
||||
nsLk.lockMapMutex.Unlock()
|
||||
|
||||
// To trigger the race:
|
||||
// 1) lk3 or lk4 need to advance and increment the ref on the existing resource,
|
||||
// successfully acquiring the lock.
|
||||
// 2) lk2 then needs to advance and remove the resource from lockMap.
|
||||
// 3) lk3 or lk4 (whichever didn't execute in step 1) then executes and creates
|
||||
// a new entry in lockMap and acquires a lock for the same resource.
|
||||
|
||||
<-lk2ch
|
||||
lk3ok := <-lk3ch
|
||||
lk4ok := <-lk4ch
|
||||
|
||||
if lk3ok && lk4ok {
|
||||
t.Fatalf("multiple locks acquired; iteration=%d, lk3=%t, lk4=%t", i, lk3ok, lk4ok)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user