diff --git a/cmd/local-locker.go b/cmd/local-locker.go
index 431f0fb36..80a90b947 100644
--- a/cmd/local-locker.go
+++ b/cmd/local-locker.go
@@ -109,7 +109,7 @@ func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool
return true, nil
}
-func (l *localLocker) Unlock(args dsync.LockArgs) (reply bool, err error) {
+func (l *localLocker) Unlock(_ context.Context, args dsync.LockArgs) (reply bool, err error) {
l.mutex.Lock()
defer l.mutex.Unlock()
@@ -177,7 +177,7 @@ func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply boo
return reply, nil
}
-func (l *localLocker) RUnlock(args dsync.LockArgs) (reply bool, err error) {
+func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply bool, err error) {
l.mutex.Lock()
defer l.mutex.Unlock()
var lri []lockRequesterInfo
diff --git a/cmd/lock-rest-client.go b/cmd/lock-rest-client.go
index 88b594d81..e45e101b1 100644
--- a/cmd/lock-rest-client.go
+++ b/cmd/lock-rest-client.go
@@ -122,8 +122,8 @@ func (client *lockRESTClient) Lock(ctx context.Context, args dsync.LockArgs) (re
}
// RUnlock calls read unlock REST API.
-func (client *lockRESTClient) RUnlock(args dsync.LockArgs) (reply bool, err error) {
- return client.restCall(context.Background(), lockRESTMethodRUnlock, args)
+func (client *lockRESTClient) RUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
+ return client.restCall(ctx, lockRESTMethodRUnlock, args)
}
// RUnlock calls read unlock REST API.
@@ -132,8 +132,8 @@ func (client *lockRESTClient) Refresh(ctx context.Context, args dsync.LockArgs)
}
// Unlock calls write unlock RPC.
-func (client *lockRESTClient) Unlock(args dsync.LockArgs) (reply bool, err error) {
- return client.restCall(context.Background(), lockRESTMethodUnlock, args)
+func (client *lockRESTClient) Unlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
+ return client.restCall(ctx, lockRESTMethodUnlock, args)
}
// ForceUnlock calls force unlock handler to forcibly unlock an active lock.
diff --git a/cmd/lock-rest-client_test.go b/cmd/lock-rest-client_test.go
index 19ae9fc95..46f9f7889 100644
--- a/cmd/lock-rest-client_test.go
+++ b/cmd/lock-rest-client_test.go
@@ -47,12 +47,12 @@ func TestLockRESTlient(t *testing.T) {
t.Fatal("Expected for Lock to fail")
}
- _, err = lkClient.RUnlock(dsync.LockArgs{})
+ _, err = lkClient.RUnlock(context.Background(), dsync.LockArgs{})
if err == nil {
t.Fatal("Expected for RUnlock to fail")
}
- _, err = lkClient.Unlock(dsync.LockArgs{})
+ _, err = lkClient.Unlock(context.Background(), dsync.LockArgs{})
if err == nil {
t.Fatal("Expected for Unlock to fail")
}
diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go
index a92229743..41d0b5ab2 100644
--- a/cmd/lock-rest-server.go
+++ b/cmd/lock-rest-server.go
@@ -156,7 +156,7 @@ func (l *lockRESTServer) UnlockHandler(w http.ResponseWriter, r *http.Request) {
return
}
- _, err = l.ll.Unlock(args)
+ _, err = l.ll.Unlock(context.Background(), args)
// Ignore the Unlock() "reply" return value because if err == nil, "reply" is always true
// Consequently, if err != nil, reply is always false
if err != nil {
@@ -203,7 +203,7 @@ func (l *lockRESTServer) RUnlockHandler(w http.ResponseWriter, r *http.Request)
// Ignore the RUnlock() "reply" return value because if err == nil, "reply" is always true.
// Consequently, if err != nil, reply is always false
- if _, err = l.ll.RUnlock(args); err != nil {
+ if _, err = l.ll.RUnlock(context.Background(), args); err != nil {
l.writeErrorResponse(w, err)
return
}
diff --git a/pkg/dsync/drwmutex.go b/pkg/dsync/drwmutex.go
index 8909630d8..9d9a5b75a 100644
--- a/pkg/dsync/drwmutex.go
+++ b/pkg/dsync/drwmutex.go
@@ -46,7 +46,10 @@ func log(format string, data ...interface{}) {
const drwMutexAcquireTimeout = 1 * time.Second // 1 second.
// dRWMutexRefreshTimeout - timeout for the refresh call
-const drwMutexRefreshTimeout = 5 * time.Second
+const drwMutexRefreshCallTimeout = 5 * time.Second
+
+// dRWMutexUnlockTimeout - timeout for the unlock call
+const drwMutexUnlockCallTimeout = 30 * time.Second
// dRWMutexRefreshInterval - the interval between two refresh calls
const drwMutexRefreshInterval = 10 * time.Second
@@ -275,7 +278,7 @@ func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int, lock
Quorum: quorum,
}
- ctx, cancel := context.WithTimeout(ctx, drwMutexRefreshTimeout)
+ ctx, cancel := context.WithTimeout(ctx, drwMutexRefreshCallTimeout)
defer cancel()
refreshed, err := c.Refresh(ctx, args)
@@ -613,13 +616,16 @@ func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bo
Resources: names,
}
+ ctx, cancel := context.WithTimeout(context.Background(), drwMutexUnlockCallTimeout)
+ defer cancel()
+
if isReadLock {
- if _, err := c.RUnlock(args); err != nil {
+ if _, err := c.RUnlock(ctx, args); err != nil {
log("dsync: Unable to call RUnlock failed with %s for %#v at %s\n", err, args, c)
return false
}
} else {
- if _, err := c.Unlock(args); err != nil {
+ if _, err := c.Unlock(ctx, args); err != nil {
log("dsync: Unable to call Unlock failed with %s for %#v at %s\n", err, args, c)
return false
}
diff --git a/pkg/dsync/drwmutex_test.go b/pkg/dsync/drwmutex_test.go
index c429d8ff4..0038b008e 100644
--- a/pkg/dsync/drwmutex_test.go
+++ b/pkg/dsync/drwmutex_test.go
@@ -15,7 +15,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package dsync_test
+package dsync
import (
"context"
@@ -24,8 +24,6 @@ import (
"sync/atomic"
"testing"
"time"
-
- . "github.com/minio/minio/pkg/dsync"
)
const (
diff --git a/pkg/dsync/dsync-server_test.go b/pkg/dsync/dsync-server_test.go
index a8daba9f3..7a1a1b337 100644
--- a/pkg/dsync/dsync-server_test.go
+++ b/pkg/dsync/dsync-server_test.go
@@ -15,13 +15,13 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package dsync_test
+package dsync
import (
"fmt"
"sync"
-
- . "github.com/minio/minio/pkg/dsync"
+ "sync/atomic"
+ "time"
)
const WriteLock = -1
@@ -34,6 +34,9 @@ type lockServer struct {
// Refresh returns lock not found if set to true
lockNotFound bool
+
+ // Set to true if you want peers servers to do not respond
+ responseDelay int64
}
func (l *lockServer) setRefreshReply(refreshed bool) {
@@ -42,7 +45,15 @@ func (l *lockServer) setRefreshReply(refreshed bool) {
l.lockNotFound = !refreshed
}
+func (l *lockServer) setResponseDelay(responseDelay time.Duration) {
+ atomic.StoreInt64(&l.responseDelay, int64(responseDelay))
+}
+
func (l *lockServer) Lock(args *LockArgs, reply *bool) error {
+ if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
+ time.Sleep(time.Duration(d))
+ }
+
l.mutex.Lock()
defer l.mutex.Unlock()
if _, *reply = l.lockMap[args.Resources[0]]; !*reply {
@@ -53,6 +64,10 @@ func (l *lockServer) Lock(args *LockArgs, reply *bool) error {
}
func (l *lockServer) Unlock(args *LockArgs, reply *bool) error {
+ if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
+ time.Sleep(time.Duration(d))
+ }
+
l.mutex.Lock()
defer l.mutex.Unlock()
var locksHeld int64
@@ -69,6 +84,10 @@ func (l *lockServer) Unlock(args *LockArgs, reply *bool) error {
const ReadLock = 1
func (l *lockServer) RLock(args *LockArgs, reply *bool) error {
+ if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
+ time.Sleep(time.Duration(d))
+ }
+
l.mutex.Lock()
defer l.mutex.Unlock()
var locksHeld int64
@@ -84,6 +103,10 @@ func (l *lockServer) RLock(args *LockArgs, reply *bool) error {
}
func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error {
+ if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
+ time.Sleep(time.Duration(d))
+ }
+
l.mutex.Lock()
defer l.mutex.Unlock()
var locksHeld int64
@@ -102,6 +125,10 @@ func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error {
}
func (l *lockServer) Refresh(args *LockArgs, reply *bool) error {
+ if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
+ time.Sleep(time.Duration(d))
+ }
+
l.mutex.Lock()
defer l.mutex.Unlock()
*reply = !l.lockNotFound
@@ -109,6 +136,10 @@ func (l *lockServer) Refresh(args *LockArgs, reply *bool) error {
}
func (l *lockServer) ForceUnlock(args *LockArgs, reply *bool) error {
+ if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
+ time.Sleep(time.Duration(d))
+ }
+
l.mutex.Lock()
defer l.mutex.Unlock()
if len(args.UID) != 0 {
diff --git a/pkg/dsync/dsync_test.go b/pkg/dsync/dsync_test.go
index 646653cad..853d8e9df 100644
--- a/pkg/dsync/dsync_test.go
+++ b/pkg/dsync/dsync_test.go
@@ -15,12 +15,12 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package dsync_test
+package dsync
import (
"context"
"fmt"
- "log"
+ golog "log"
"math/rand"
"net"
"net/http"
@@ -32,8 +32,6 @@ import (
"time"
"github.com/google/uuid"
- "github.com/minio/minio/pkg/dsync"
- . "github.com/minio/minio/pkg/dsync"
)
const numberOfNodes = 5
@@ -56,7 +54,7 @@ func startRPCServers() {
server.HandleHTTP(rpcPaths[i], fmt.Sprintf("%s-debug", rpcPaths[i]))
l, e := net.Listen("tcp", ":"+strconv.Itoa(i+12345))
if e != nil {
- log.Fatal("listen error:", e)
+ golog.Fatal("listen error:", e)
}
go http.Serve(l, nil)
@@ -244,6 +242,7 @@ func TestFailedRefreshLock(t *testing.T) {
// Simulate Refresh RPC response to return no locking found
for i := range lockServers {
lockServers[i].setRefreshReply(false)
+ defer lockServers[i].setRefreshReply(true)
}
dm := NewDRWMutex(ds, "aap")
@@ -256,7 +255,7 @@ func TestFailedRefreshLock(t *testing.T) {
wg.Done()
}
- if !dm.GetLock(ctx, cancel, id, source, dsync.Options{Timeout: 5 * time.Minute}) {
+ if !dm.GetLock(ctx, cancel, id, source, Options{Timeout: 5 * time.Minute}) {
t.Fatal("GetLock() should be successful")
}
@@ -268,10 +267,35 @@ func TestFailedRefreshLock(t *testing.T) {
// Should be safe operation in all cases
dm.Unlock()
+}
- // Revert Refresh RPC response to locking found
+// Test Unlock should not timeout
+func TestUnlockShouldNotTimeout(t *testing.T) {
+ dm := NewDRWMutex(ds, "aap")
+
+ if !dm.GetLock(context.Background(), nil, id, source, Options{Timeout: 5 * time.Minute}) {
+ t.Fatal("GetLock() should be successful")
+ }
+
+ // Add delay to lock server responses to ensure that lock does not timeout
for i := range lockServers {
- lockServers[i].setRefreshReply(false)
+ lockServers[i].setResponseDelay(2 * drwMutexUnlockCallTimeout)
+ defer lockServers[i].setResponseDelay(0)
+ }
+
+ unlockReturned := make(chan struct{}, 1)
+ go func() {
+ dm.Unlock()
+ unlockReturned <- struct{}{}
+ }()
+
+ timer := time.NewTimer(2 * drwMutexUnlockCallTimeout)
+ defer timer.Stop()
+
+ select {
+ case <-unlockReturned:
+ t.Fatal("Unlock timed out, which should not happen")
+ case <-timer.C:
}
}
diff --git a/pkg/dsync/rpc-client-impl_test.go b/pkg/dsync/rpc-client-impl_test.go
index 0692c0592..2af7b0f17 100644
--- a/pkg/dsync/rpc-client-impl_test.go
+++ b/pkg/dsync/rpc-client-impl_test.go
@@ -15,14 +15,12 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package dsync_test
+package dsync
import (
"context"
"net/rpc"
"sync"
-
- . "github.com/minio/minio/pkg/dsync"
)
// ReconnectRPCClient is a wrapper type for rpc.Client which provides reconnect on first failure.
@@ -105,12 +103,12 @@ func (rpcClient *ReconnectRPCClient) Lock(ctx context.Context, args LockArgs) (s
return status, err
}
-func (rpcClient *ReconnectRPCClient) RUnlock(args LockArgs) (status bool, err error) {
+func (rpcClient *ReconnectRPCClient) RUnlock(ctx context.Context, args LockArgs) (status bool, err error) {
err = rpcClient.Call("Dsync.RUnlock", &args, &status)
return status, err
}
-func (rpcClient *ReconnectRPCClient) Unlock(args LockArgs) (status bool, err error) {
+func (rpcClient *ReconnectRPCClient) Unlock(ctx context.Context, args LockArgs) (status bool, err error) {
err = rpcClient.Call("Dsync.Unlock", &args, &status)
return status, err
}
diff --git a/pkg/dsync/rpc-client-interface.go b/pkg/dsync/rpc-client-interface.go
index dbf5659be..38ee679f6 100644
--- a/pkg/dsync/rpc-client-interface.go
+++ b/pkg/dsync/rpc-client-interface.go
@@ -54,12 +54,16 @@ type NetLocker interface {
// Do read unlock for given LockArgs. It should return
// * a boolean to indicate success/failure of the operation
// * an error on failure of unlock request operation.
- RUnlock(args LockArgs) (bool, error)
+ // Canceling the context will abort the remote call.
+ // In that case, the resource may or may not be unlocked.
+ RUnlock(ctx context.Context, args LockArgs) (bool, error)
// Do write unlock for given LockArgs. It should return
// * a boolean to indicate success/failure of the operation
// * an error on failure of unlock request operation.
- Unlock(args LockArgs) (bool, error)
+ // Canceling the context will abort the remote call.
+ // In that case, the resource may or may not be unlocked.
+ Unlock(ctx context.Context, args LockArgs) (bool, error)
// Refresh the given lock to prevent it from becoming stale
Refresh(ctx context.Context, args LockArgs) (bool, error)