mirror of
https://github.com/minio/minio.git
synced 2025-01-26 06:03:17 -05:00
fix: Pass context all the way down to the network call in lockers (#10161)
Context timeout might race on each other when timeouts are lower i.e when two lock attempts happened very quickly on the same resource and the servers were yet trying to establish quorum. This situation can lead to locks held which wouldn't be unlocked and subsequent lock attempts would fail. This would require a complete server restart. A potential of this issue happening is when server is booting up and we are trying to hold a 'transaction.lock' in quick bursts of timeout.
This commit is contained in:
parent
f7259adf83
commit
fe157166ca
@ -23,7 +23,7 @@ import (
|
||||
|
||||
// ClusterCheckHandler returns if the server is ready for requests.
|
||||
func ClusterCheckHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "ClusterCheckCheckHandler")
|
||||
ctx := newContext(r, w, "ClusterCheckHandler")
|
||||
|
||||
objLayer := newObjectLayerFn()
|
||||
// Service not initialized yet
|
||||
|
@ -451,7 +451,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
|
||||
for range retry.NewTimerWithJitter(retryCtx, time.Second, 5*time.Second, retry.MaxJitter) {
|
||||
// let one of the server acquire the lock, if not let them timeout.
|
||||
// which shall be retried again by this loop.
|
||||
if err := txnLk.GetLock(newDynamicTimeout(1*time.Second, 5*time.Second)); err != nil {
|
||||
if err := txnLk.GetLock(newDynamicTimeout(3*time.Second, 5*time.Second)); err != nil {
|
||||
logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. trying to acquire lock")
|
||||
continue
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
@ -71,30 +72,35 @@ func (l *localLocker) canTakeLock(resources ...string) bool {
|
||||
return noLkCnt == len(resources)
|
||||
}
|
||||
|
||||
func (l *localLocker) Lock(args dsync.LockArgs) (reply bool, err error) {
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false, ctx.Err()
|
||||
default:
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
|
||||
if !l.canTakeLock(args.Resources...) {
|
||||
// Not all locks can be taken on resources,
|
||||
// reject it completely.
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// No locks held on the all resources, so claim write
|
||||
// lock on all resources at once.
|
||||
for _, resource := range args.Resources {
|
||||
l.lockMap[resource] = []lockRequesterInfo{
|
||||
{
|
||||
Writer: true,
|
||||
Source: args.Source,
|
||||
UID: args.UID,
|
||||
Timestamp: UTCNow(),
|
||||
TimeLastCheck: UTCNow(),
|
||||
},
|
||||
if !l.canTakeLock(args.Resources...) {
|
||||
// Not all locks can be taken on resources,
|
||||
// reject it completely.
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// No locks held on the all resources, so claim write
|
||||
// lock on all resources at once.
|
||||
for _, resource := range args.Resources {
|
||||
l.lockMap[resource] = []lockRequesterInfo{
|
||||
{
|
||||
Writer: true,
|
||||
Source: args.Source,
|
||||
UID: args.UID,
|
||||
Timestamp: UTCNow(),
|
||||
TimeLastCheck: UTCNow(),
|
||||
},
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (l *localLocker) Unlock(args dsync.LockArgs) (reply bool, err error) {
|
||||
@ -138,28 +144,33 @@ func (l *localLocker) removeEntry(name, uid string, lri *[]lockRequesterInfo) bo
|
||||
return false
|
||||
}
|
||||
|
||||
func (l *localLocker) RLock(args dsync.LockArgs) (reply bool, err error) {
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
lrInfo := lockRequesterInfo{
|
||||
Writer: false,
|
||||
Source: args.Source,
|
||||
UID: args.UID,
|
||||
Timestamp: UTCNow(),
|
||||
TimeLastCheck: UTCNow(),
|
||||
}
|
||||
resource := args.Resources[0]
|
||||
if lri, ok := l.lockMap[resource]; ok {
|
||||
if reply = !isWriteLock(lri); reply {
|
||||
// Unless there is a write lock
|
||||
l.lockMap[resource] = append(l.lockMap[resource], lrInfo)
|
||||
func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false, ctx.Err()
|
||||
default:
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
lrInfo := lockRequesterInfo{
|
||||
Writer: false,
|
||||
Source: args.Source,
|
||||
UID: args.UID,
|
||||
Timestamp: UTCNow(),
|
||||
TimeLastCheck: UTCNow(),
|
||||
}
|
||||
} else {
|
||||
// No locks held on the given name, so claim (first) read lock
|
||||
l.lockMap[resource] = []lockRequesterInfo{lrInfo}
|
||||
reply = true
|
||||
resource := args.Resources[0]
|
||||
if lri, ok := l.lockMap[resource]; ok {
|
||||
if reply = !isWriteLock(lri); reply {
|
||||
// Unless there is a write lock
|
||||
l.lockMap[resource] = append(l.lockMap[resource], lrInfo)
|
||||
}
|
||||
} else {
|
||||
// No locks held on the given name, so claim (first) read lock
|
||||
l.lockMap[resource] = []lockRequesterInfo{lrInfo}
|
||||
reply = true
|
||||
}
|
||||
return reply, nil
|
||||
}
|
||||
return reply, nil
|
||||
}
|
||||
|
||||
func (l *localLocker) RUnlock(args dsync.LockArgs) (reply bool, err error) {
|
||||
@ -202,22 +213,27 @@ func (l *localLocker) IsOnline() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (l *localLocker) Expired(args dsync.LockArgs) (expired bool, err error) {
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired bool, err error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false, ctx.Err()
|
||||
default:
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
|
||||
// Lock found, proceed to verify if belongs to given uid.
|
||||
for _, resource := range args.Resources {
|
||||
if lri, ok := l.lockMap[resource]; ok {
|
||||
// Check whether uid is still active
|
||||
for _, entry := range lri {
|
||||
if entry.UID == args.UID {
|
||||
return false, nil
|
||||
// Lock found, proceed to verify if belongs to given uid.
|
||||
for _, resource := range args.Resources {
|
||||
if lri, ok := l.lockMap[resource]; ok {
|
||||
// Check whether uid is still active
|
||||
for _, entry := range lri {
|
||||
if entry.UID == args.UID {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Similar to removeEntry but only removes an entry only if the lock entry exists in map.
|
||||
|
@ -58,7 +58,7 @@ func (client *lockRESTClient) String() string {
|
||||
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
|
||||
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
|
||||
// after verifying format.json
|
||||
func (client *lockRESTClient) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
||||
func (client *lockRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
||||
if values == nil {
|
||||
values = make(url.Values)
|
||||
}
|
||||
@ -83,7 +83,7 @@ func (client *lockRESTClient) Close() error {
|
||||
}
|
||||
|
||||
// restCall makes a call to the lock REST server.
|
||||
func (client *lockRESTClient) restCall(call string, args dsync.LockArgs) (reply bool, err error) {
|
||||
func (client *lockRESTClient) restCall(ctx context.Context, call string, args dsync.LockArgs) (reply bool, err error) {
|
||||
values := url.Values{}
|
||||
values.Set(lockRESTUID, args.UID)
|
||||
values.Set(lockRESTSource, args.Source)
|
||||
@ -92,7 +92,7 @@ func (client *lockRESTClient) restCall(call string, args dsync.LockArgs) (reply
|
||||
buffer.WriteString(resource)
|
||||
buffer.WriteString("\n")
|
||||
}
|
||||
respBody, err := client.call(call, values, &buffer, -1)
|
||||
respBody, err := client.callWithContext(ctx, call, values, &buffer, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
switch err {
|
||||
case nil:
|
||||
@ -105,28 +105,28 @@ func (client *lockRESTClient) restCall(call string, args dsync.LockArgs) (reply
|
||||
}
|
||||
|
||||
// RLock calls read lock REST API.
|
||||
func (client *lockRESTClient) RLock(args dsync.LockArgs) (reply bool, err error) {
|
||||
return client.restCall(lockRESTMethodRLock, args)
|
||||
func (client *lockRESTClient) RLock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
|
||||
return client.restCall(ctx, lockRESTMethodRLock, args)
|
||||
}
|
||||
|
||||
// Lock calls lock REST API.
|
||||
func (client *lockRESTClient) Lock(args dsync.LockArgs) (reply bool, err error) {
|
||||
return client.restCall(lockRESTMethodLock, args)
|
||||
func (client *lockRESTClient) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
|
||||
return client.restCall(ctx, lockRESTMethodLock, args)
|
||||
}
|
||||
|
||||
// RUnlock calls read unlock REST API.
|
||||
func (client *lockRESTClient) RUnlock(args dsync.LockArgs) (reply bool, err error) {
|
||||
return client.restCall(lockRESTMethodRUnlock, args)
|
||||
return client.restCall(context.Background(), lockRESTMethodRUnlock, args)
|
||||
}
|
||||
|
||||
// Unlock calls write unlock RPC.
|
||||
func (client *lockRESTClient) Unlock(args dsync.LockArgs) (reply bool, err error) {
|
||||
return client.restCall(lockRESTMethodUnlock, args)
|
||||
return client.restCall(context.Background(), lockRESTMethodUnlock, args)
|
||||
}
|
||||
|
||||
// Expired calls expired handler to check if lock args have expired.
|
||||
func (client *lockRESTClient) Expired(args dsync.LockArgs) (expired bool, err error) {
|
||||
return client.restCall(lockRESTMethodExpired, args)
|
||||
func (client *lockRESTClient) Expired(ctx context.Context, args dsync.LockArgs) (expired bool, err error) {
|
||||
return client.restCall(ctx, lockRESTMethodExpired, args)
|
||||
}
|
||||
|
||||
func newLockAPI(endpoint Endpoint) dsync.NetLocker {
|
||||
|
@ -17,6 +17,7 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/minio/minio/pkg/dsync"
|
||||
@ -34,12 +35,12 @@ func TestLockRESTlient(t *testing.T) {
|
||||
}
|
||||
|
||||
// Attempt all calls.
|
||||
_, err = lkClient.RLock(dsync.LockArgs{})
|
||||
_, err = lkClient.RLock(context.Background(), dsync.LockArgs{})
|
||||
if err == nil {
|
||||
t.Fatal("Expected for Rlock to fail")
|
||||
}
|
||||
|
||||
_, err = lkClient.Lock(dsync.LockArgs{})
|
||||
_, err = lkClient.Lock(context.Background(), dsync.LockArgs{})
|
||||
if err == nil {
|
||||
t.Fatal("Expected for Lock to fail")
|
||||
}
|
||||
|
@ -96,7 +96,7 @@ func (l *lockRESTServer) LockHandler(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
success, err := l.ll.Lock(args)
|
||||
success, err := l.ll.Lock(r.Context(), args)
|
||||
if err == nil && !success {
|
||||
err = errLockConflict
|
||||
}
|
||||
@ -141,7 +141,7 @@ func (l *lockRESTServer) RLockHandler(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
success, err := l.ll.RLock(args)
|
||||
success, err := l.ll.RLock(r.Context(), args)
|
||||
if err == nil && !success {
|
||||
err = errLockConflict
|
||||
}
|
||||
@ -185,20 +185,14 @@ func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request)
|
||||
return
|
||||
}
|
||||
|
||||
l.ll.mutex.Lock()
|
||||
defer l.ll.mutex.Unlock()
|
||||
|
||||
// Lock found, proceed to verify if belongs to given uid.
|
||||
for _, resource := range args.Resources {
|
||||
if lri, ok := l.ll.lockMap[resource]; ok {
|
||||
// Check whether uid is still active
|
||||
for _, entry := range lri {
|
||||
if entry.UID == args.UID {
|
||||
l.writeErrorResponse(w, errLockNotExpired)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
expired, err := l.ll.Expired(r.Context(), args)
|
||||
if err != nil {
|
||||
l.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
if !expired {
|
||||
l.writeErrorResponse(w, errLockNotExpired)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@ -219,7 +213,10 @@ func getLongLivedLocks(interval time.Duration) map[Endpoint][]nameLockRequesterI
|
||||
for idx := range lriArray {
|
||||
// Check whether enough time has gone by since last check
|
||||
if time.Since(lriArray[idx].TimeLastCheck) >= interval {
|
||||
rslt = append(rslt, nameLockRequesterInfoPair{name: name, lri: lriArray[idx]})
|
||||
rslt = append(rslt, nameLockRequesterInfoPair{
|
||||
name: name,
|
||||
lri: lriArray[idx],
|
||||
})
|
||||
lriArray[idx].TimeLastCheck = UTCNow()
|
||||
}
|
||||
}
|
||||
@ -254,12 +251,15 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
|
||||
continue
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second)
|
||||
|
||||
// Call back to original server verify whether the lock is
|
||||
// still active (based on name & uid)
|
||||
expired, err := c.Expired(dsync.LockArgs{
|
||||
expired, err := c.Expired(ctx, dsync.LockArgs{
|
||||
UID: nlrip.lri.UID,
|
||||
Resources: []string{nlrip.name},
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
nlripsMap[nlrip.name]++
|
||||
c.Close()
|
||||
|
@ -85,10 +85,20 @@ const (
|
||||
querySep = "?"
|
||||
)
|
||||
|
||||
type restError string
|
||||
|
||||
func (e restError) Error() string {
|
||||
return string(e)
|
||||
}
|
||||
|
||||
func (e restError) Timeout() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// CallWithContext - make a REST call with context.
|
||||
func (c *Client) CallWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) {
|
||||
if !c.IsOnline() {
|
||||
return nil, &NetworkError{Err: errors.New("remote server offline")}
|
||||
return nil, &NetworkError{Err: &url.Error{Op: method, URL: c.url.String(), Err: restError("remote server offline")}}
|
||||
}
|
||||
req, err := http.NewRequest(http.MethodPost, c.url.String()+method+querySep+values.Encode(), body)
|
||||
if err != nil {
|
||||
|
@ -229,7 +229,7 @@ func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) {
|
||||
for range retry.NewTimer(retryCtx) {
|
||||
// let one of the server acquire the lock, if not let them timeout.
|
||||
// which shall be retried again by this loop.
|
||||
if err = txnLk.GetLock(newDynamicTimeout(1*time.Second, 3*time.Second)); err != nil {
|
||||
if err = txnLk.GetLock(newDynamicTimeout(3*time.Second, 3*time.Second)); err != nil {
|
||||
logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock")
|
||||
continue
|
||||
}
|
||||
|
@ -140,8 +140,8 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, timeout time.Duration, id,
|
||||
locks := make([]string, len(restClnts))
|
||||
|
||||
// Try to acquire the lock.
|
||||
success := lock(dm.clnt, &locks, id, source, isReadLock, dm.Names...)
|
||||
if !success {
|
||||
locked = lock(retryCtx, dm.clnt, &locks, id, source, isReadLock, dm.Names...)
|
||||
if !locked {
|
||||
continue
|
||||
}
|
||||
|
||||
@ -158,16 +158,16 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, timeout time.Duration, id,
|
||||
}
|
||||
|
||||
dm.m.Unlock()
|
||||
return true
|
||||
return locked
|
||||
}
|
||||
|
||||
// Failed to acquire the lock on this attempt, incrementally wait
|
||||
// for a longer back-off time and try again afterwards.
|
||||
return false
|
||||
return locked
|
||||
}
|
||||
|
||||
// lock tries to acquire the distributed lock, returning true or false.
|
||||
func lock(ds *Dsync, locks *[]string, id, source string, isReadLock bool, lockNames ...string) bool {
|
||||
func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, isReadLock bool, lockNames ...string) bool {
|
||||
|
||||
restClnts := ds.GetLockersFn()
|
||||
|
||||
@ -199,11 +199,11 @@ func lock(ds *Dsync, locks *[]string, id, source string, isReadLock bool, lockNa
|
||||
var locked bool
|
||||
var err error
|
||||
if isReadLock {
|
||||
if locked, err = c.RLock(args); err != nil {
|
||||
if locked, err = c.RLock(ctx, args); err != nil {
|
||||
log("dsync: Unable to call RLock failed with %s for %#v at %s\n", err, args, c)
|
||||
}
|
||||
} else {
|
||||
if locked, err = c.Lock(args); err != nil {
|
||||
if locked, err = c.Lock(ctx, args); err != nil {
|
||||
log("dsync: Unable to call Lock failed with %s for %#v at %s\n", err, args, c)
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package dsync_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/rpc"
|
||||
"sync"
|
||||
|
||||
@ -89,12 +90,12 @@ func (rpcClient *ReconnectRPCClient) Call(serviceMethod string, args interface{}
|
||||
return err
|
||||
}
|
||||
|
||||
func (rpcClient *ReconnectRPCClient) RLock(args LockArgs) (status bool, err error) {
|
||||
func (rpcClient *ReconnectRPCClient) RLock(ctx context.Context, args LockArgs) (status bool, err error) {
|
||||
err = rpcClient.Call("Dsync.RLock", &args, &status)
|
||||
return status, err
|
||||
}
|
||||
|
||||
func (rpcClient *ReconnectRPCClient) Lock(args LockArgs) (status bool, err error) {
|
||||
func (rpcClient *ReconnectRPCClient) Lock(ctx context.Context, args LockArgs) (status bool, err error) {
|
||||
err = rpcClient.Call("Dsync.Lock", &args, &status)
|
||||
return status, err
|
||||
}
|
||||
@ -109,7 +110,7 @@ func (rpcClient *ReconnectRPCClient) Unlock(args LockArgs) (status bool, err err
|
||||
return status, err
|
||||
}
|
||||
|
||||
func (rpcClient *ReconnectRPCClient) Expired(args LockArgs) (expired bool, err error) {
|
||||
func (rpcClient *ReconnectRPCClient) Expired(ctx context.Context, args LockArgs) (expired bool, err error) {
|
||||
err = rpcClient.Call("Dsync.Expired", &args, &expired)
|
||||
return expired, err
|
||||
}
|
||||
|
@ -16,6 +16,8 @@
|
||||
|
||||
package dsync
|
||||
|
||||
import "context"
|
||||
|
||||
// LockArgs is minimal required values for any dsync compatible lock operation.
|
||||
type LockArgs struct {
|
||||
// Unique ID of lock/unlock request.
|
||||
@ -34,12 +36,12 @@ type NetLocker interface {
|
||||
// Do read lock for given LockArgs. It should return
|
||||
// * a boolean to indicate success/failure of the operation
|
||||
// * an error on failure of lock request operation.
|
||||
RLock(args LockArgs) (bool, error)
|
||||
RLock(ctx context.Context, args LockArgs) (bool, error)
|
||||
|
||||
// Do write lock for given LockArgs. It should return
|
||||
// * a boolean to indicate success/failure of the operation
|
||||
// * an error on failure of lock request operation.
|
||||
Lock(args LockArgs) (bool, error)
|
||||
Lock(ctx context.Context, args LockArgs) (bool, error)
|
||||
|
||||
// Do read unlock for given LockArgs. It should return
|
||||
// * a boolean to indicate success/failure of the operation
|
||||
@ -52,7 +54,7 @@ type NetLocker interface {
|
||||
Unlock(args LockArgs) (bool, error)
|
||||
|
||||
// Expired returns if current lock args has expired.
|
||||
Expired(args LockArgs) (bool, error)
|
||||
Expired(ctx context.Context, args LockArgs) (bool, error)
|
||||
|
||||
// Returns underlying endpoint of this lock client instance.
|
||||
String() string
|
||||
|
Loading…
x
Reference in New Issue
Block a user