lock: Timeout Unlock RPC call (#12213)

RPC unlock call needs to be timed out otherwise this can block
indefinitely.

Signed-off-by: Anis Elleuch <anis@min.io>
This commit is contained in:
Anis Elleuch
2021-05-11 10:11:29 +01:00
committed by GitHub
parent b81fada834
commit 0b34dfb479
10 changed files with 96 additions and 35 deletions

View File

@@ -15,13 +15,13 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package dsync_test
package dsync
import (
"fmt"
"sync"
. "github.com/minio/minio/pkg/dsync"
"sync/atomic"
"time"
)
const WriteLock = -1
@@ -34,6 +34,9 @@ type lockServer struct {
// Refresh returns lock not found if set to true
lockNotFound bool
// Set to true if you want peers servers to do not respond
responseDelay int64
}
func (l *lockServer) setRefreshReply(refreshed bool) {
@@ -42,7 +45,15 @@ func (l *lockServer) setRefreshReply(refreshed bool) {
l.lockNotFound = !refreshed
}
func (l *lockServer) setResponseDelay(responseDelay time.Duration) {
atomic.StoreInt64(&l.responseDelay, int64(responseDelay))
}
func (l *lockServer) Lock(args *LockArgs, reply *bool) error {
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
time.Sleep(time.Duration(d))
}
l.mutex.Lock()
defer l.mutex.Unlock()
if _, *reply = l.lockMap[args.Resources[0]]; !*reply {
@@ -53,6 +64,10 @@ func (l *lockServer) Lock(args *LockArgs, reply *bool) error {
}
func (l *lockServer) Unlock(args *LockArgs, reply *bool) error {
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
time.Sleep(time.Duration(d))
}
l.mutex.Lock()
defer l.mutex.Unlock()
var locksHeld int64
@@ -69,6 +84,10 @@ func (l *lockServer) Unlock(args *LockArgs, reply *bool) error {
const ReadLock = 1
func (l *lockServer) RLock(args *LockArgs, reply *bool) error {
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
time.Sleep(time.Duration(d))
}
l.mutex.Lock()
defer l.mutex.Unlock()
var locksHeld int64
@@ -84,6 +103,10 @@ func (l *lockServer) RLock(args *LockArgs, reply *bool) error {
}
func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error {
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
time.Sleep(time.Duration(d))
}
l.mutex.Lock()
defer l.mutex.Unlock()
var locksHeld int64
@@ -102,6 +125,10 @@ func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error {
}
func (l *lockServer) Refresh(args *LockArgs, reply *bool) error {
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
time.Sleep(time.Duration(d))
}
l.mutex.Lock()
defer l.mutex.Unlock()
*reply = !l.lockNotFound
@@ -109,6 +136,10 @@ func (l *lockServer) Refresh(args *LockArgs, reply *bool) error {
}
func (l *lockServer) ForceUnlock(args *LockArgs, reply *bool) error {
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
time.Sleep(time.Duration(d))
}
l.mutex.Lock()
defer l.mutex.Unlock()
if len(args.UID) != 0 {