make sure to release locks upon timeout (#10596)

fixes #10418
This commit is contained in:
Harshavardhana 2020-09-29 15:18:34 -07:00 committed by GitHub
parent fdf0ae9167
commit 1f9abbee4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 34 additions and 31 deletions

View File

@ -233,7 +233,7 @@ func newBootstrapRESTClient(endpoint Endpoint) *bootstrapRESTClient {
} }
} }
trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultRESTTimeout) trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultTimeout)
restClient := rest.NewClient(serverURL, trFn, newAuthToken) restClient := rest.NewClient(serverURL, trFn, newAuthToken)
restClient.HealthCheckFn = func() bool { restClient.HealthCheckFn = func() bool {
ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout)

View File

@ -770,7 +770,7 @@ func GetProxyEndpoints(endpointZones EndpointZones) ([]ProxyEndpoint, error) {
} }
} }
tr := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)() tr := newCustomHTTPTransport(tlsConfig, rest.DefaultTimeout)()
// Allow more requests to be in flight with higher response header timeout. // Allow more requests to be in flight with higher response header timeout.
tr.ResponseHeaderTimeout = 30 * time.Minute tr.ResponseHeaderTimeout = 30 * time.Minute
tr.MaxIdleConns = 64 tr.MaxIdleConns = 64

View File

@ -23,7 +23,6 @@ import (
"errors" "errors"
"io" "io"
"net/url" "net/url"
"time"
"github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/http"
xhttp "github.com/minio/minio/cmd/http" xhttp "github.com/minio/minio/cmd/http"
@ -154,7 +153,7 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient {
} }
} }
trFn := newInternodeHTTPTransport(tlsConfig, 10*time.Second) trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultTimeout)
restClient := rest.NewClient(serverURL, trFn, newAuthToken) restClient := rest.NewClient(serverURL, trFn, newAuthToken)
restClient.HealthCheckFn = func() bool { restClient.HealthCheckFn = func() bool {
ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout)

View File

@ -883,7 +883,7 @@ func newPeerRESTClient(peer *xnet.Host) *peerRESTClient {
} }
} }
trFn := newInternodeHTTPTransport(tlsConfig, 10*time.Second) trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultTimeout)
restClient := rest.NewClient(serverURL, trFn, newAuthToken) restClient := rest.NewClient(serverURL, trFn, newAuthToken)
// Construct a new health function. // Construct a new health function.

View File

@ -30,8 +30,8 @@ import (
xnet "github.com/minio/minio/pkg/net" xnet "github.com/minio/minio/pkg/net"
) )
// DefaultRESTTimeout - default RPC timeout is 15 seconds. // DefaultTimeout - default REST timeout is 10 seconds.
const DefaultRESTTimeout = 15 * time.Second const DefaultTimeout = 10 * time.Second
const ( const (
offline = iota offline = iota

View File

@ -680,7 +680,7 @@ func newStorageRESTClient(endpoint Endpoint) *storageRESTClient {
} }
} }
trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultRESTTimeout) trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultTimeout)
restClient := rest.NewClient(serverURL, trFn, newAuthToken) restClient := rest.NewClient(serverURL, trFn, newAuthToken)
restClient.HealthCheckFn = func() bool { restClient.HealthCheckFn = func() bool {
ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout)

View File

@ -497,7 +497,6 @@ func newCustomHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration) fu
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
DialContext: xhttp.NewCustomDialContext(dialTimeout), DialContext: xhttp.NewCustomDialContext(dialTimeout),
MaxIdleConnsPerHost: 16, MaxIdleConnsPerHost: 16,
MaxIdleConns: 16,
IdleConnTimeout: 1 * time.Minute, IdleConnTimeout: 1 * time.Minute,
ResponseHeaderTimeout: 3 * time.Minute, // Set conservative timeouts for MinIO internode. ResponseHeaderTimeout: 3 * time.Minute, // Set conservative timeouts for MinIO internode.
TLSHandshakeTimeout: 10 * time.Second, TLSHandshakeTimeout: 10 * time.Second,

View File

@ -142,7 +142,7 @@ const (
// algorithm until either the lock is acquired successfully or more // algorithm until either the lock is acquired successfully or more
// time has elapsed than the timeout value. // time has elapsed than the timeout value.
func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadLock bool, opts Options) (locked bool) { func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadLock bool, opts Options) (locked bool) {
restClnts, _ := dm.clnt.GetLockers() restClnts, owner := dm.clnt.GetLockers()
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
@ -154,6 +154,26 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL
defer cancel() defer cancel()
// Tolerance is not set, defaults to half of the locker clients.
tolerance := opts.Tolerance
if tolerance == 0 {
tolerance = len(restClnts) / 2
}
// Quorum is effectively = total clients subtracted with tolerance limit
quorum := len(restClnts) - tolerance
if !isReadLock {
// In situations for write locks, as a special case
// to avoid split brains we make sure to acquire
// quorum + 1 when tolerance is exactly half of the
// total locker clients.
if quorum == tolerance {
quorum++
}
}
tolerance = len(restClnts) - quorum
for { for {
select { select {
case <-retryCtx.Done(): case <-retryCtx.Done():
@ -161,10 +181,14 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL
// Caller context canceled or we timedout, // Caller context canceled or we timedout,
// return false anyways for both situations. // return false anyways for both situations.
// make sure to unlock any successful locks, since caller has timedout or canceled the request.
releaseAll(dm.clnt, quorum, owner, &locks, isReadLock, restClnts, dm.Names...)
return false return false
default: default:
// Try to acquire the lock. // Try to acquire the lock.
if locked = lock(retryCtx, dm.clnt, &locks, id, source, isReadLock, opts.Tolerance, dm.Names...); locked { if locked = lock(retryCtx, dm.clnt, &locks, id, source, isReadLock, tolerance, quorum, dm.Names...); locked {
dm.m.Lock() dm.m.Lock()
// If success, copy array to object // If success, copy array to object
@ -186,32 +210,13 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL
} }
// lock tries to acquire the distributed lock, returning true or false. // lock tries to acquire the distributed lock, returning true or false.
func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, isReadLock bool, tolerance int, lockNames ...string) bool { func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, isReadLock bool, tolerance, quorum int, lockNames ...string) bool {
for i := range *locks { for i := range *locks {
(*locks)[i] = "" (*locks)[i] = ""
} }
restClnts, owner := ds.GetLockers() restClnts, owner := ds.GetLockers()
// Tolerance is not set, defaults to half of the locker clients.
if tolerance == 0 {
tolerance = len(restClnts) / 2
}
// Quorum is effectively = total clients subtracted with tolerance limit
quorum := len(restClnts) - tolerance
if !isReadLock {
// In situations for write locks, as a special case
// to avoid split brains we make sure to acquire
// quorum + 1 when tolerance is exactly half of the
// total locker clients.
if quorum == tolerance {
quorum++
}
}
tolerance = len(restClnts) - quorum
// Create buffered channel of size equal to total number of nodes. // Create buffered channel of size equal to total number of nodes.
ch := make(chan Granted, len(restClnts)) ch := make(chan Granted, len(restClnts))
defer close(ch) defer close(ch)