mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
tests: Clean up dsync package (#14415)
Add non-constant timeouts to dsync package. Reduce test runtime by minutes. Hopefully not too aggressive.
This commit is contained in:
parent
cc46a99f97
commit
b030ef1aca
@ -225,6 +225,7 @@ func (n *nsLockMap) NewNSLock(lockers func() ([]dsync.NetLocker, string), volume
|
|||||||
if n.isDistErasure {
|
if n.isDistErasure {
|
||||||
drwmutex := dsync.NewDRWMutex(&dsync.Dsync{
|
drwmutex := dsync.NewDRWMutex(&dsync.Dsync{
|
||||||
GetLockers: lockers,
|
GetLockers: lockers,
|
||||||
|
Timeouts: dsync.DefaultTimeouts,
|
||||||
}, pathsJoinPrefix(volume, paths...)...)
|
}, pathsJoinPrefix(volume, paths...)...)
|
||||||
return &distLockInstance{drwmutex, opsID}
|
return &distLockInstance{drwmutex, opsID}
|
||||||
}
|
}
|
||||||
|
@ -43,32 +43,61 @@ func log(format string, data ...interface{}) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// dRWMutexAcquireTimeout - tolerance limit to wait for lock acquisition before.
|
const (
|
||||||
const drwMutexAcquireTimeout = 1 * time.Second // 1 second.
|
// dRWMutexAcquireTimeout - default tolerance limit to wait for lock acquisition before.
|
||||||
|
drwMutexAcquireTimeout = 1 * time.Second // 1 second.
|
||||||
|
|
||||||
// dRWMutexRefreshTimeout - timeout for the refresh call
|
// dRWMutexRefreshTimeout - default timeout for the refresh call
|
||||||
const drwMutexRefreshCallTimeout = 5 * time.Second
|
drwMutexRefreshCallTimeout = 5 * time.Second
|
||||||
|
|
||||||
// dRWMutexUnlockTimeout - timeout for the unlock call
|
// dRWMutexUnlockTimeout - default timeout for the unlock call
|
||||||
const drwMutexUnlockCallTimeout = 30 * time.Second
|
drwMutexUnlockCallTimeout = 30 * time.Second
|
||||||
|
|
||||||
// dRWMutexForceUnlockTimeout - timeout for the unlock call
|
// dRWMutexForceUnlockTimeout - default timeout for the unlock call
|
||||||
const drwMutexForceUnlockCallTimeout = 30 * time.Second
|
drwMutexForceUnlockCallTimeout = 30 * time.Second
|
||||||
|
|
||||||
// dRWMutexRefreshInterval - the interval between two refresh calls
|
// dRWMutexRefreshInterval - default the interval between two refresh calls
|
||||||
const drwMutexRefreshInterval = 10 * time.Second
|
drwMutexRefreshInterval = 10 * time.Second
|
||||||
|
|
||||||
const drwMutexInfinite = 1<<63 - 1
|
lockRetryInterval = 1 * time.Second
|
||||||
|
|
||||||
|
drwMutexInfinite = 1<<63 - 1
|
||||||
|
)
|
||||||
|
|
||||||
|
// Timeouts are timeouts for specific operations.
|
||||||
|
type Timeouts struct {
|
||||||
|
// Acquire - tolerance limit to wait for lock acquisition before.
|
||||||
|
Acquire time.Duration
|
||||||
|
|
||||||
|
// RefreshCall - timeout for the refresh call
|
||||||
|
RefreshCall time.Duration
|
||||||
|
|
||||||
|
// UnlockCall - timeout for the unlock call
|
||||||
|
UnlockCall time.Duration
|
||||||
|
|
||||||
|
// ForceUnlockCall - timeout for the force unlock call
|
||||||
|
ForceUnlockCall time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultTimeouts contains default timeouts.
|
||||||
|
var DefaultTimeouts = Timeouts{
|
||||||
|
Acquire: drwMutexAcquireTimeout,
|
||||||
|
RefreshCall: drwMutexUnlockCallTimeout,
|
||||||
|
UnlockCall: drwMutexRefreshCallTimeout,
|
||||||
|
ForceUnlockCall: drwMutexForceUnlockCallTimeout,
|
||||||
|
}
|
||||||
|
|
||||||
// A DRWMutex is a distributed mutual exclusion lock.
|
// A DRWMutex is a distributed mutual exclusion lock.
|
||||||
type DRWMutex struct {
|
type DRWMutex struct {
|
||||||
Names []string
|
Names []string
|
||||||
writeLocks []string // Array of nodes that granted a write lock
|
writeLocks []string // Array of nodes that granted a write lock
|
||||||
readLocks []string // Array of array of nodes that granted reader locks
|
readLocks []string // Array of array of nodes that granted reader locks
|
||||||
rng *rand.Rand
|
rng *rand.Rand
|
||||||
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
|
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
|
||||||
clnt *Dsync
|
clnt *Dsync
|
||||||
cancelRefresh context.CancelFunc
|
cancelRefresh context.CancelFunc
|
||||||
|
refreshInterval time.Duration
|
||||||
|
lockRetryInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// Granted - represents a structure of a granted lock.
|
// Granted - represents a structure of a granted lock.
|
||||||
@ -90,11 +119,13 @@ func NewDRWMutex(clnt *Dsync, names ...string) *DRWMutex {
|
|||||||
restClnts, _ := clnt.GetLockers()
|
restClnts, _ := clnt.GetLockers()
|
||||||
sort.Strings(names)
|
sort.Strings(names)
|
||||||
return &DRWMutex{
|
return &DRWMutex{
|
||||||
writeLocks: make([]string, len(restClnts)),
|
writeLocks: make([]string, len(restClnts)),
|
||||||
readLocks: make([]string, len(restClnts)),
|
readLocks: make([]string, len(restClnts)),
|
||||||
Names: names,
|
Names: names,
|
||||||
clnt: clnt,
|
clnt: clnt,
|
||||||
rng: rand.New(&lockedRandSource{src: rand.NewSource(time.Now().UTC().UnixNano())}),
|
rng: rand.New(&lockedRandSource{src: rand.NewSource(time.Now().UTC().UnixNano())}),
|
||||||
|
refreshInterval: drwMutexRefreshInterval,
|
||||||
|
lockRetryInterval: lockRetryInterval,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -146,10 +177,6 @@ func (dm *DRWMutex) GetRLock(ctx context.Context, cancel context.CancelFunc, id,
|
|||||||
return dm.lockBlocking(ctx, cancel, id, source, isReadLock, opts)
|
return dm.lockBlocking(ctx, cancel, id, source, isReadLock, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
|
||||||
lockRetryInterval = 1 * time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
// lockBlocking will try to acquire either a read or a write lock
|
// lockBlocking will try to acquire either a read or a write lock
|
||||||
//
|
//
|
||||||
// The function will loop using a built-in timing randomized back-off
|
// The function will loop using a built-in timing randomized back-off
|
||||||
@ -209,7 +236,7 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, lockLossCallback func(), i
|
|||||||
return locked
|
return locked
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(time.Duration(dm.rng.Float64() * float64(lockRetryInterval)))
|
time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -224,7 +251,7 @@ func (dm *DRWMutex) startContinousLockRefresh(lockLossCallback func(), id, sourc
|
|||||||
go func() {
|
go func() {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
refreshTimer := time.NewTimer(drwMutexRefreshInterval)
|
refreshTimer := time.NewTimer(dm.refreshInterval)
|
||||||
defer refreshTimer.Stop()
|
defer refreshTimer.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@ -232,7 +259,7 @@ func (dm *DRWMutex) startContinousLockRefresh(lockLossCallback func(), id, sourc
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-refreshTimer.C:
|
case <-refreshTimer.C:
|
||||||
refreshTimer.Reset(drwMutexRefreshInterval)
|
refreshTimer.Reset(dm.refreshInterval)
|
||||||
|
|
||||||
noQuorum, err := refreshLock(ctx, dm.clnt, id, source, quorum)
|
noQuorum, err := refreshLock(ctx, dm.clnt, id, source, quorum)
|
||||||
if err == nil && noQuorum {
|
if err == nil && noQuorum {
|
||||||
@ -250,7 +277,7 @@ func (dm *DRWMutex) startContinousLockRefresh(lockLossCallback func(), id, sourc
|
|||||||
}
|
}
|
||||||
|
|
||||||
func forceUnlock(ctx context.Context, ds *Dsync, id string) {
|
func forceUnlock(ctx context.Context, ds *Dsync, id string) {
|
||||||
ctx, cancel := context.WithTimeout(ctx, drwMutexForceUnlockCallTimeout)
|
ctx, cancel := context.WithTimeout(ctx, ds.Timeouts.ForceUnlockCall)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
restClnts, _ := ds.GetLockers()
|
restClnts, _ := ds.GetLockers()
|
||||||
@ -300,7 +327,7 @@ func refreshLock(ctx context.Context, ds *Dsync, id, source string, quorum int)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, drwMutexRefreshCallTimeout)
|
ctx, cancel := context.WithTimeout(ctx, ds.Timeouts.RefreshCall)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
refreshed, err := c.Refresh(ctx, args)
|
refreshed, err := c.Refresh(ctx, args)
|
||||||
@ -379,7 +406,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Combined timeout for the lock attempt.
|
// Combined timeout for the lock attempt.
|
||||||
ctx, cancel := context.WithTimeout(ctx, drwMutexAcquireTimeout)
|
ctx, cancel := context.WithTimeout(ctx, ds.Timeouts.Acquire)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
for index, c := range restClnts {
|
for index, c := range restClnts {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -573,7 +600,7 @@ func (dm *DRWMutex) Unlock() {
|
|||||||
|
|
||||||
isReadLock := false
|
isReadLock := false
|
||||||
for !releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) {
|
for !releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) {
|
||||||
time.Sleep(time.Duration(dm.rng.Float64() * float64(lockRetryInterval)))
|
time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -614,7 +641,7 @@ func (dm *DRWMutex) RUnlock() {
|
|||||||
|
|
||||||
isReadLock := true
|
isReadLock := true
|
||||||
for !releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) {
|
for !releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) {
|
||||||
time.Sleep(time.Duration(dm.rng.Float64() * float64(lockRetryInterval)))
|
time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -635,7 +662,7 @@ func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bo
|
|||||||
Resources: names,
|
Resources: names,
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), drwMutexUnlockCallTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), ds.Timeouts.UnlockCall)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
if isReadLock {
|
if isReadLock {
|
||||||
|
@ -47,13 +47,13 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) {
|
|||||||
// fmt.Println("2nd read lock acquired, waiting...")
|
// fmt.Println("2nd read lock acquired, waiting...")
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * testDrwMutexAcquireTimeout)
|
||||||
drwm.RUnlock()
|
drwm.RUnlock()
|
||||||
// fmt.Println("1st read lock released, waiting...")
|
// fmt.Println("1st read lock released, waiting...")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(3 * time.Second)
|
time.Sleep(3 * testDrwMutexAcquireTimeout)
|
||||||
drwm.RUnlock()
|
drwm.RUnlock()
|
||||||
// fmt.Println("2nd read lock released, waiting...")
|
// fmt.Println("2nd read lock released, waiting...")
|
||||||
}()
|
}()
|
||||||
@ -63,7 +63,7 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) {
|
|||||||
locked = drwm.GetLock(ctx3, cancel3, id, source, Options{Timeout: duration})
|
locked = drwm.GetLock(ctx3, cancel3, id, source, Options{Timeout: duration})
|
||||||
if locked {
|
if locked {
|
||||||
// fmt.Println("Write lock acquired, waiting...")
|
// fmt.Println("Write lock acquired, waiting...")
|
||||||
time.Sleep(time.Second)
|
time.Sleep(testDrwMutexAcquireTimeout)
|
||||||
|
|
||||||
drwm.Unlock()
|
drwm.Unlock()
|
||||||
}
|
}
|
||||||
@ -72,7 +72,7 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestSimpleWriteLockAcquired(t *testing.T) {
|
func TestSimpleWriteLockAcquired(t *testing.T) {
|
||||||
locked := testSimpleWriteLock(t, 5*time.Second)
|
locked := testSimpleWriteLock(t, 10*testDrwMutexAcquireTimeout)
|
||||||
|
|
||||||
expected := true
|
expected := true
|
||||||
if locked != expected {
|
if locked != expected {
|
||||||
@ -81,7 +81,7 @@ func TestSimpleWriteLockAcquired(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestSimpleWriteLockTimedOut(t *testing.T) {
|
func TestSimpleWriteLockTimedOut(t *testing.T) {
|
||||||
locked := testSimpleWriteLock(t, time.Second)
|
locked := testSimpleWriteLock(t, testDrwMutexAcquireTimeout)
|
||||||
|
|
||||||
expected := false
|
expected := false
|
||||||
if locked != expected {
|
if locked != expected {
|
||||||
@ -99,7 +99,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(3 * testDrwMutexAcquireTimeout)
|
||||||
drwm.Unlock()
|
drwm.Unlock()
|
||||||
// fmt.Println("Initial write lock released, waiting...")
|
// fmt.Println("Initial write lock released, waiting...")
|
||||||
}()
|
}()
|
||||||
@ -109,7 +109,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) {
|
|||||||
locked = drwm.GetLock(ctx2, cancel2, id, source, Options{Timeout: duration})
|
locked = drwm.GetLock(ctx2, cancel2, id, source, Options{Timeout: duration})
|
||||||
if locked {
|
if locked {
|
||||||
// fmt.Println("2nd write lock acquired, waiting...")
|
// fmt.Println("2nd write lock acquired, waiting...")
|
||||||
time.Sleep(time.Second)
|
time.Sleep(testDrwMutexAcquireTimeout)
|
||||||
|
|
||||||
drwm.Unlock()
|
drwm.Unlock()
|
||||||
}
|
}
|
||||||
@ -118,7 +118,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestDualWriteLockAcquired(t *testing.T) {
|
func TestDualWriteLockAcquired(t *testing.T) {
|
||||||
locked := testDualWriteLock(t, 5*time.Second)
|
locked := testDualWriteLock(t, 10*testDrwMutexAcquireTimeout)
|
||||||
|
|
||||||
expected := true
|
expected := true
|
||||||
if locked != expected {
|
if locked != expected {
|
||||||
@ -127,7 +127,7 @@ func TestDualWriteLockAcquired(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestDualWriteLockTimedOut(t *testing.T) {
|
func TestDualWriteLockTimedOut(t *testing.T) {
|
||||||
locked := testDualWriteLock(t, time.Second)
|
locked := testDualWriteLock(t, testDrwMutexAcquireTimeout)
|
||||||
|
|
||||||
expected := false
|
expected := false
|
||||||
if locked != expected {
|
if locked != expected {
|
||||||
@ -214,25 +214,27 @@ func writer(rwm *DRWMutex, numIterations int, activity *int32, cdone chan bool)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Borrowed from rwmutex_test.go
|
// Borrowed from rwmutex_test.go
|
||||||
func HammerRWMutex(gomaxprocs, numReaders, numIterations int) {
|
func hammerRWMutex(t *testing.T, gomaxprocs, numReaders, numIterations int) {
|
||||||
runtime.GOMAXPROCS(gomaxprocs)
|
t.Run(fmt.Sprintf("%d-%d-%d", gomaxprocs, numReaders, numIterations), func(t *testing.T) {
|
||||||
// Number of active readers + 10000 * number of active writers.
|
runtime.GOMAXPROCS(gomaxprocs)
|
||||||
var activity int32
|
// Number of active readers + 10000 * number of active writers.
|
||||||
rwm := NewDRWMutex(ds, "test")
|
var activity int32
|
||||||
cdone := make(chan bool)
|
rwm := NewDRWMutex(ds, "test")
|
||||||
go writer(rwm, numIterations, &activity, cdone)
|
cdone := make(chan bool)
|
||||||
var i int
|
go writer(rwm, numIterations, &activity, cdone)
|
||||||
for i = 0; i < numReaders/2; i++ {
|
var i int
|
||||||
go reader(rwm, numIterations, &activity, cdone)
|
for i = 0; i < numReaders/2; i++ {
|
||||||
}
|
go reader(rwm, numIterations, &activity, cdone)
|
||||||
go writer(rwm, numIterations, &activity, cdone)
|
}
|
||||||
for ; i < numReaders; i++ {
|
go writer(rwm, numIterations, &activity, cdone)
|
||||||
go reader(rwm, numIterations, &activity, cdone)
|
for ; i < numReaders; i++ {
|
||||||
}
|
go reader(rwm, numIterations, &activity, cdone)
|
||||||
// Wait for the 2 writers and all readers to finish.
|
}
|
||||||
for i := 0; i < 2+numReaders; i++ {
|
// Wait for the 2 writers and all readers to finish.
|
||||||
<-cdone
|
for i := 0; i < 2+numReaders; i++ {
|
||||||
}
|
<-cdone
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Borrowed from rwmutex_test.go
|
// Borrowed from rwmutex_test.go
|
||||||
@ -242,16 +244,16 @@ func TestRWMutex(t *testing.T) {
|
|||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
n = 5
|
n = 5
|
||||||
}
|
}
|
||||||
HammerRWMutex(1, 1, n)
|
hammerRWMutex(t, 1, 1, n)
|
||||||
HammerRWMutex(1, 3, n)
|
hammerRWMutex(t, 1, 3, n)
|
||||||
HammerRWMutex(1, 10, n)
|
hammerRWMutex(t, 1, 10, n)
|
||||||
HammerRWMutex(4, 1, n)
|
hammerRWMutex(t, 4, 1, n)
|
||||||
HammerRWMutex(4, 3, n)
|
hammerRWMutex(t, 4, 3, n)
|
||||||
HammerRWMutex(4, 10, n)
|
hammerRWMutex(t, 4, 10, n)
|
||||||
HammerRWMutex(10, 1, n)
|
hammerRWMutex(t, 10, 1, n)
|
||||||
HammerRWMutex(10, 3, n)
|
hammerRWMutex(t, 10, 3, n)
|
||||||
HammerRWMutex(10, 10, n)
|
hammerRWMutex(t, 10, 10, n)
|
||||||
HammerRWMutex(10, 5, n)
|
hammerRWMutex(t, 10, 5, n)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Borrowed from rwmutex_test.go
|
// Borrowed from rwmutex_test.go
|
||||||
@ -267,12 +269,13 @@ func TestUnlockPanic(t *testing.T) {
|
|||||||
|
|
||||||
// Borrowed from rwmutex_test.go
|
// Borrowed from rwmutex_test.go
|
||||||
func TestUnlockPanic2(t *testing.T) {
|
func TestUnlockPanic2(t *testing.T) {
|
||||||
|
mu := NewDRWMutex(ds, "test-unlock-panic-2")
|
||||||
defer func() {
|
defer func() {
|
||||||
if recover() == nil {
|
if recover() == nil {
|
||||||
t.Fatalf("unlock of unlocked RWMutex did not panic")
|
t.Fatalf("unlock of unlocked RWMutex did not panic")
|
||||||
}
|
}
|
||||||
|
mu.RUnlock() // Unlock, so -test.count > 1 works
|
||||||
}()
|
}()
|
||||||
mu := NewDRWMutex(ds, "test-unlock-panic-2")
|
|
||||||
mu.RLock(id, source)
|
mu.RLock(id, source)
|
||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
}
|
}
|
||||||
@ -290,12 +293,13 @@ func TestRUnlockPanic(t *testing.T) {
|
|||||||
|
|
||||||
// Borrowed from rwmutex_test.go
|
// Borrowed from rwmutex_test.go
|
||||||
func TestRUnlockPanic2(t *testing.T) {
|
func TestRUnlockPanic2(t *testing.T) {
|
||||||
|
mu := NewDRWMutex(ds, "test-runlock-panic-2")
|
||||||
defer func() {
|
defer func() {
|
||||||
if recover() == nil {
|
if recover() == nil {
|
||||||
t.Fatalf("read unlock of unlocked RWMutex did not panic")
|
t.Fatalf("read unlock of unlocked RWMutex did not panic")
|
||||||
}
|
}
|
||||||
|
mu.Unlock() // Unlock, so -test.count > 1 works
|
||||||
}()
|
}()
|
||||||
mu := NewDRWMutex(ds, "test-runlock-panic-2")
|
|
||||||
mu.Lock(id, source)
|
mu.Lock(id, source)
|
||||||
mu.RUnlock()
|
mu.RUnlock()
|
||||||
}
|
}
|
||||||
|
@ -22,4 +22,7 @@ package dsync
|
|||||||
type Dsync struct {
|
type Dsync struct {
|
||||||
// List of rest client objects, one per lock server.
|
// List of rest client objects, one per lock server.
|
||||||
GetLockers func() ([]NetLocker, string)
|
GetLockers func() ([]NetLocker, string)
|
||||||
|
|
||||||
|
// Timeouts to apply.
|
||||||
|
Timeouts Timeouts
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,14 @@ import (
|
|||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
testDrwMutexAcquireTimeout = 250 * time.Millisecond
|
||||||
|
testDrwMutexRefreshCallTimeout = 250 * time.Millisecond
|
||||||
|
testDrwMutexUnlockCallTimeout = 250 * time.Millisecond
|
||||||
|
testDrwMutexForceUnlockCallTimeout = 250 * time.Millisecond
|
||||||
|
testDrwMutexRefreshInterval = 100 * time.Millisecond
|
||||||
|
)
|
||||||
|
|
||||||
// TestMain initializes the testing framework
|
// TestMain initializes the testing framework
|
||||||
func TestMain(m *testing.M) {
|
func TestMain(m *testing.M) {
|
||||||
startLockServers()
|
startLockServers()
|
||||||
@ -40,6 +48,12 @@ func TestMain(m *testing.M) {
|
|||||||
|
|
||||||
ds = &Dsync{
|
ds = &Dsync{
|
||||||
GetLockers: func() ([]NetLocker, string) { return clnts, uuid.New().String() },
|
GetLockers: func() ([]NetLocker, string) { return clnts, uuid.New().String() },
|
||||||
|
Timeouts: Timeouts{
|
||||||
|
Acquire: testDrwMutexAcquireTimeout,
|
||||||
|
RefreshCall: testDrwMutexRefreshCallTimeout,
|
||||||
|
UnlockCall: testDrwMutexUnlockCallTimeout,
|
||||||
|
ForceUnlockCall: testDrwMutexForceUnlockCallTimeout,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
code := m.Run()
|
code := m.Run()
|
||||||
@ -53,7 +67,7 @@ func TestSimpleLock(t *testing.T) {
|
|||||||
dm.Lock(id, source)
|
dm.Lock(id, source)
|
||||||
|
|
||||||
// fmt.Println("Lock acquired, waiting...")
|
// fmt.Println("Lock acquired, waiting...")
|
||||||
time.Sleep(2500 * time.Millisecond)
|
time.Sleep(testDrwMutexRefreshCallTimeout)
|
||||||
|
|
||||||
dm.Unlock()
|
dm.Unlock()
|
||||||
}
|
}
|
||||||
@ -91,7 +105,7 @@ func TestTwoSimultaneousLocksForSameResource(t *testing.T) {
|
|||||||
|
|
||||||
// Release lock after 10 seconds
|
// Release lock after 10 seconds
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(5 * testDrwMutexAcquireTimeout)
|
||||||
// fmt.Println("Unlocking dm1")
|
// fmt.Println("Unlocking dm1")
|
||||||
|
|
||||||
dm1st.Unlock()
|
dm1st.Unlock()
|
||||||
@ -100,7 +114,7 @@ func TestTwoSimultaneousLocksForSameResource(t *testing.T) {
|
|||||||
dm2nd.Lock(id, source)
|
dm2nd.Lock(id, source)
|
||||||
|
|
||||||
// fmt.Printf("2nd lock obtained after 1st lock is released\n")
|
// fmt.Printf("2nd lock obtained after 1st lock is released\n")
|
||||||
time.Sleep(2500 * time.Millisecond)
|
time.Sleep(testDrwMutexRefreshCallTimeout * 2)
|
||||||
|
|
||||||
dm2nd.Unlock()
|
dm2nd.Unlock()
|
||||||
}
|
}
|
||||||
@ -112,14 +126,17 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) {
|
|||||||
dm3rd := NewDRWMutex(ds, "aap")
|
dm3rd := NewDRWMutex(ds, "aap")
|
||||||
|
|
||||||
dm1st.Lock(id, source)
|
dm1st.Lock(id, source)
|
||||||
|
started := time.Now()
|
||||||
|
var expect time.Duration
|
||||||
// Release lock after 10 seconds
|
// Release lock after 10 seconds
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(10 * time.Second)
|
// TOTAL
|
||||||
|
time.Sleep(2 * testDrwMutexAcquireTimeout)
|
||||||
// fmt.Println("Unlocking dm1")
|
// fmt.Println("Unlocking dm1")
|
||||||
|
|
||||||
dm1st.Unlock()
|
dm1st.Unlock()
|
||||||
}()
|
}()
|
||||||
|
expect += 2 * testDrwMutexAcquireTimeout
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(2)
|
wg.Add(2)
|
||||||
@ -131,7 +148,7 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) {
|
|||||||
|
|
||||||
// Release lock after 10 seconds
|
// Release lock after 10 seconds
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(2500 * time.Millisecond)
|
time.Sleep(2 * testDrwMutexAcquireTimeout)
|
||||||
// fmt.Println("Unlocking dm2")
|
// fmt.Println("Unlocking dm2")
|
||||||
|
|
||||||
dm2nd.Unlock()
|
dm2nd.Unlock()
|
||||||
@ -140,10 +157,11 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) {
|
|||||||
dm3rd.Lock(id, source)
|
dm3rd.Lock(id, source)
|
||||||
|
|
||||||
// fmt.Printf("3rd lock obtained after 1st & 2nd locks are released\n")
|
// fmt.Printf("3rd lock obtained after 1st & 2nd locks are released\n")
|
||||||
time.Sleep(2500 * time.Millisecond)
|
time.Sleep(testDrwMutexRefreshCallTimeout)
|
||||||
|
|
||||||
dm3rd.Unlock()
|
dm3rd.Unlock()
|
||||||
}()
|
}()
|
||||||
|
expect += 2*testDrwMutexAcquireTimeout + testDrwMutexRefreshCallTimeout
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
@ -152,7 +170,7 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) {
|
|||||||
|
|
||||||
// Release lock after 10 seconds
|
// Release lock after 10 seconds
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(2500 * time.Millisecond)
|
time.Sleep(2 * testDrwMutexAcquireTimeout)
|
||||||
// fmt.Println("Unlocking dm3")
|
// fmt.Println("Unlocking dm3")
|
||||||
|
|
||||||
dm3rd.Unlock()
|
dm3rd.Unlock()
|
||||||
@ -161,12 +179,19 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) {
|
|||||||
dm2nd.Lock(id, source)
|
dm2nd.Lock(id, source)
|
||||||
|
|
||||||
// fmt.Printf("2nd lock obtained after 1st & 3rd locks are released\n")
|
// fmt.Printf("2nd lock obtained after 1st & 3rd locks are released\n")
|
||||||
time.Sleep(2500 * time.Millisecond)
|
time.Sleep(testDrwMutexRefreshCallTimeout)
|
||||||
|
|
||||||
dm2nd.Unlock()
|
dm2nd.Unlock()
|
||||||
}()
|
}()
|
||||||
|
expect += 2*testDrwMutexAcquireTimeout + testDrwMutexRefreshCallTimeout
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
// We expect at least 3 x 2 x testDrwMutexAcquireTimeout to have passed
|
||||||
|
elapsed := time.Since(started)
|
||||||
|
if elapsed < expect {
|
||||||
|
t.Errorf("expected at least %v time have passed, however %v passed", expect, elapsed)
|
||||||
|
}
|
||||||
|
t.Logf("expected at least %v time have passed, %v passed", expect, elapsed)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test two locks for different resources, both succeed
|
// Test two locks for different resources, both succeed
|
||||||
@ -176,14 +201,8 @@ func TestTwoSimultaneousLocksForDifferentResources(t *testing.T) {
|
|||||||
|
|
||||||
dm1.Lock(id, source)
|
dm1.Lock(id, source)
|
||||||
dm2.Lock(id, source)
|
dm2.Lock(id, source)
|
||||||
|
|
||||||
// fmt.Println("Both locks acquired, waiting...")
|
|
||||||
time.Sleep(2500 * time.Millisecond)
|
|
||||||
|
|
||||||
dm1.Unlock()
|
dm1.Unlock()
|
||||||
dm2.Unlock()
|
dm2.Unlock()
|
||||||
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test refreshing lock - refresh should always return true
|
// Test refreshing lock - refresh should always return true
|
||||||
@ -194,22 +213,19 @@ func TestSuccessfulLockRefresh(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
dm := NewDRWMutex(ds, "aap")
|
dm := NewDRWMutex(ds, "aap")
|
||||||
contextCanceled := make(chan struct{})
|
dm.refreshInterval = testDrwMutexRefreshInterval
|
||||||
|
|
||||||
ctx, cl := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
cancel := func() {
|
|
||||||
cl()
|
|
||||||
close(contextCanceled)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !dm.GetLock(ctx, cancel, id, source, Options{Timeout: 5 * time.Minute}) {
|
if !dm.GetLock(ctx, cancel, id, source, Options{Timeout: 5 * time.Minute}) {
|
||||||
t.Fatal("GetLock() should be successful")
|
t.Fatal("GetLock() should be successful")
|
||||||
}
|
}
|
||||||
|
|
||||||
timer := time.NewTimer(drwMutexRefreshInterval * 2)
|
// Make it run twice.
|
||||||
|
timer := time.NewTimer(testDrwMutexRefreshInterval * 2)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-contextCanceled:
|
case <-ctx.Done():
|
||||||
t.Fatal("Lock context canceled which is not expected")
|
t.Fatal("Lock context canceled which is not expected")
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
}
|
}
|
||||||
@ -231,6 +247,7 @@ func TestFailedRefreshLock(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
dm := NewDRWMutex(ds, "aap")
|
dm := NewDRWMutex(ds, "aap")
|
||||||
|
dm.refreshInterval = 500 * time.Millisecond
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
@ -261,14 +278,14 @@ func TestUnlockShouldNotTimeout(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
dm := NewDRWMutex(ds, "aap")
|
dm := NewDRWMutex(ds, "aap")
|
||||||
|
dm.refreshInterval = testDrwMutexUnlockCallTimeout
|
||||||
if !dm.GetLock(context.Background(), nil, id, source, Options{Timeout: 5 * time.Minute}) {
|
if !dm.GetLock(context.Background(), nil, id, source, Options{Timeout: 5 * time.Minute}) {
|
||||||
t.Fatal("GetLock() should be successful")
|
t.Fatal("GetLock() should be successful")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add delay to lock server responses to ensure that lock does not timeout
|
// Add delay to lock server responses to ensure that lock does not timeout
|
||||||
for i := range lockServers {
|
for i := range lockServers {
|
||||||
lockServers[i].setResponseDelay(2 * drwMutexUnlockCallTimeout)
|
lockServers[i].setResponseDelay(5 * testDrwMutexUnlockCallTimeout)
|
||||||
defer lockServers[i].setResponseDelay(0)
|
defer lockServers[i].setResponseDelay(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -278,7 +295,7 @@ func TestUnlockShouldNotTimeout(t *testing.T) {
|
|||||||
unlockReturned <- struct{}{}
|
unlockReturned <- struct{}{}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
timer := time.NewTimer(2 * drwMutexUnlockCallTimeout)
|
timer := time.NewTimer(2 * testDrwMutexUnlockCallTimeout)
|
||||||
defer timer.Stop()
|
defer timer.Stop()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
Loading…
Reference in New Issue
Block a user