mirror of
https://github.com/minio/minio.git
synced 2025-04-20 10:37:31 -04:00
lock: Fix decision when a lock needs to be removed (#14095)
The code was not properly deciding if a lock needs to be removed when it doesn't have quorum anymore. After this commit, a lock will be forcefully unlocked if nodes reporting they are not able to find a lock internally breaks the quorum. Simplify the code as well.
This commit is contained in:
parent
0df31f63ab
commit
b106b1c131
@ -234,8 +234,8 @@ func (dm *DRWMutex) startContinousLockRefresh(lockLossCallback func(), id, sourc
|
|||||||
case <-refreshTimer.C:
|
case <-refreshTimer.C:
|
||||||
refreshTimer.Reset(drwMutexRefreshInterval)
|
refreshTimer.Reset(drwMutexRefreshInterval)
|
||||||
|
|
||||||
refreshed, err := refresh(ctx, dm.clnt, id, source, quorum)
|
noQuorum, err := refreshLock(ctx, dm.clnt, id, source, quorum)
|
||||||
if err == nil && !refreshed {
|
if err == nil && noQuorum {
|
||||||
// Clean the lock locally and in remote nodes
|
// Clean the lock locally and in remote nodes
|
||||||
forceUnlock(ctx, dm.clnt, id)
|
forceUnlock(ctx, dm.clnt, id)
|
||||||
// Execute the caller lock loss callback
|
// Execute the caller lock loss callback
|
||||||
@ -273,10 +273,12 @@ func forceUnlock(ctx context.Context, ds *Dsync, id string) {
|
|||||||
|
|
||||||
type refreshResult struct {
|
type refreshResult struct {
|
||||||
offline bool
|
offline bool
|
||||||
succeeded bool
|
refreshed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int) (bool, error) {
|
// Refresh the given lock in all nodes, return true to indicate if a lock
|
||||||
|
// does not exist in enough quorum nodes.
|
||||||
|
func refreshLock(ctx context.Context, ds *Dsync, id, source string, quorum int) (bool, error) {
|
||||||
restClnts, _ := ds.GetLockers()
|
restClnts, _ := ds.GetLockers()
|
||||||
|
|
||||||
// Create buffered channel of size equal to total number of nodes.
|
// Create buffered channel of size equal to total number of nodes.
|
||||||
@ -302,17 +304,13 @@ func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int) (boo
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
refreshed, err := c.Refresh(ctx, args)
|
refreshed, err := c.Refresh(ctx, args)
|
||||||
if refreshed && err == nil {
|
|
||||||
ch <- refreshResult{succeeded: true}
|
|
||||||
} else {
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ch <- refreshResult{offline: true}
|
ch <- refreshResult{offline: true}
|
||||||
log("dsync: Unable to call Refresh failed with %s for %#v at %s\n", err, args, c)
|
log("dsync: Unable to call Refresh failed with %s for %#v at %s\n", err, args, c)
|
||||||
} else {
|
} else {
|
||||||
ch <- refreshResult{succeeded: false}
|
ch <- refreshResult{refreshed: refreshed}
|
||||||
log("dsync: Refresh returned false for %#v at %s\n", args, c)
|
log("dsync: Refresh returned false for %#v at %s\n", args, c)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}(index, c)
|
}(index, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -322,39 +320,32 @@ func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int) (boo
|
|||||||
// b) received too many refreshed for quorum to be still possible
|
// b) received too many refreshed for quorum to be still possible
|
||||||
// c) timed out
|
// c) timed out
|
||||||
//
|
//
|
||||||
i, refreshFailed, refreshSucceeded := 0, 0, 0
|
lockNotFound, lockRefreshed := 0, 0
|
||||||
done := false
|
done := false
|
||||||
|
|
||||||
for ; i < len(restClnts); i++ {
|
for i := 0; i < len(restClnts); i++ {
|
||||||
select {
|
select {
|
||||||
case refresh := <-ch:
|
case refreshResult := <-ch:
|
||||||
if refresh.offline {
|
if refreshResult.offline {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if refresh.succeeded {
|
if refreshResult.refreshed {
|
||||||
refreshSucceeded++
|
lockRefreshed++
|
||||||
} else {
|
} else {
|
||||||
refreshFailed++
|
lockNotFound++
|
||||||
}
|
}
|
||||||
if refreshFailed > quorum {
|
if lockRefreshed >= quorum || lockNotFound > len(restClnts)-quorum {
|
||||||
// We know that we are not going to succeed with refresh
|
|
||||||
done = true
|
done = true
|
||||||
}
|
}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
// Refreshing is canceled
|
// Refreshing is canceled
|
||||||
return false, ctx.Err()
|
return false, ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
if done {
|
if done {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
refreshQuorum := refreshSucceeded >= quorum
|
|
||||||
if !refreshQuorum {
|
|
||||||
refreshQuorum = refreshFailed < quorum
|
|
||||||
}
|
|
||||||
|
|
||||||
// We may have some unused results in ch, release them async.
|
// We may have some unused results in ch, release them async.
|
||||||
go func() {
|
go func() {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
@ -363,7 +354,8 @@ func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int) (boo
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return refreshQuorum, nil
|
noQuorum := lockNotFound > len(restClnts)-quorum
|
||||||
|
return noQuorum, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// lock tries to acquire the distributed lock, returning true or false.
|
// lock tries to acquire the distributed lock, returning true or false.
|
||||||
|
@ -236,10 +236,46 @@ func TestTwoSimultaneousLocksForDifferentResources(t *testing.T) {
|
|||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test refreshing lock
|
// Test refreshing lock - refresh should always return true
|
||||||
|
//
|
||||||
|
func TestSuccessfulLockRefresh(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping test in short mode.")
|
||||||
|
}
|
||||||
|
|
||||||
|
dm := NewDRWMutex(ds, "aap")
|
||||||
|
contextCanceled := make(chan struct{})
|
||||||
|
|
||||||
|
ctx, cl := context.WithCancel(context.Background())
|
||||||
|
cancel := func() {
|
||||||
|
cl()
|
||||||
|
close(contextCanceled)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !dm.GetLock(ctx, cancel, id, source, Options{Timeout: 5 * time.Minute}) {
|
||||||
|
t.Fatal("GetLock() should be successful")
|
||||||
|
}
|
||||||
|
|
||||||
|
timer := time.NewTimer(drwMutexRefreshInterval * 2)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-contextCanceled:
|
||||||
|
t.Fatal("Lock context canceled which is not expected")
|
||||||
|
case <-timer.C:
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should be safe operation in all cases
|
||||||
|
dm.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test canceling context while quorum servers report lock not found
|
||||||
func TestFailedRefreshLock(t *testing.T) {
|
func TestFailedRefreshLock(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping test in short mode.")
|
||||||
|
}
|
||||||
|
|
||||||
// Simulate Refresh RPC response to return no locking found
|
// Simulate Refresh RPC response to return no locking found
|
||||||
for i := range lockServers {
|
for i := range lockServers[:3] {
|
||||||
lockServers[i].setRefreshReply(false)
|
lockServers[i].setRefreshReply(false)
|
||||||
defer lockServers[i].setRefreshReply(true)
|
defer lockServers[i].setRefreshReply(true)
|
||||||
}
|
}
|
||||||
@ -270,6 +306,10 @@ func TestFailedRefreshLock(t *testing.T) {
|
|||||||
|
|
||||||
// Test Unlock should not timeout
|
// Test Unlock should not timeout
|
||||||
func TestUnlockShouldNotTimeout(t *testing.T) {
|
func TestUnlockShouldNotTimeout(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping test in short mode.")
|
||||||
|
}
|
||||||
|
|
||||||
dm := NewDRWMutex(ds, "aap")
|
dm := NewDRWMutex(ds, "aap")
|
||||||
|
|
||||||
if !dm.GetLock(context.Background(), nil, id, source, Options{Timeout: 5 * time.Minute}) {
|
if !dm.GetLock(context.Background(), nil, id, source, Options{Timeout: 5 * time.Minute}) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user