mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
Add lock expiry handler to expire state locks (#8562)
This commit is contained in:
parent
e542084c37
commit
720442b1a2
@ -125,7 +125,7 @@ function start_minio_dist_erasure_sets()
|
|||||||
"${MINIO[@]}" server --address=:9009 "http://127.0.0.1:9000${WORK_DIR}/dist-disk-sets1" "http://127.0.0.1:9001${WORK_DIR}/dist-disk-sets2" "http://127.0.0.1:9002${WORK_DIR}/dist-disk-sets3" "http://127.0.0.1:9003${WORK_DIR}/dist-disk-sets4" "http://127.0.0.1:9004${WORK_DIR}/dist-disk-sets5" "http://127.0.0.1:9005${WORK_DIR}/dist-disk-sets6" "http://127.0.0.1:9006${WORK_DIR}/dist-disk-sets7" "http://127.0.0.1:9007${WORK_DIR}/dist-disk-sets8" "http://127.0.0.1:9008${WORK_DIR}/dist-disk-sets9" "http://127.0.0.1:9009${WORK_DIR}/dist-disk-sets10" "http://127.0.0.1:9000${WORK_DIR}/dist-disk-sets11" "http://127.0.0.1:9001${WORK_DIR}/dist-disk-sets12" "http://127.0.0.1:9002${WORK_DIR}/dist-disk-sets13" "http://127.0.0.1:9003${WORK_DIR}/dist-disk-sets14" "http://127.0.0.1:9004${WORK_DIR}/dist-disk-sets15" "http://127.0.0.1:9005${WORK_DIR}/dist-disk-sets16" "http://127.0.0.1:9006${WORK_DIR}/dist-disk-sets17" "http://127.0.0.1:9007${WORK_DIR}/dist-disk-sets18" "http://127.0.0.1:9008${WORK_DIR}/dist-disk-sets19" "http://127.0.0.1:9009${WORK_DIR}/dist-disk-sets20" >"$WORK_DIR/dist-minio-9009.log" 2>&1 &
|
"${MINIO[@]}" server --address=:9009 "http://127.0.0.1:9000${WORK_DIR}/dist-disk-sets1" "http://127.0.0.1:9001${WORK_DIR}/dist-disk-sets2" "http://127.0.0.1:9002${WORK_DIR}/dist-disk-sets3" "http://127.0.0.1:9003${WORK_DIR}/dist-disk-sets4" "http://127.0.0.1:9004${WORK_DIR}/dist-disk-sets5" "http://127.0.0.1:9005${WORK_DIR}/dist-disk-sets6" "http://127.0.0.1:9006${WORK_DIR}/dist-disk-sets7" "http://127.0.0.1:9007${WORK_DIR}/dist-disk-sets8" "http://127.0.0.1:9008${WORK_DIR}/dist-disk-sets9" "http://127.0.0.1:9009${WORK_DIR}/dist-disk-sets10" "http://127.0.0.1:9000${WORK_DIR}/dist-disk-sets11" "http://127.0.0.1:9001${WORK_DIR}/dist-disk-sets12" "http://127.0.0.1:9002${WORK_DIR}/dist-disk-sets13" "http://127.0.0.1:9003${WORK_DIR}/dist-disk-sets14" "http://127.0.0.1:9004${WORK_DIR}/dist-disk-sets15" "http://127.0.0.1:9005${WORK_DIR}/dist-disk-sets16" "http://127.0.0.1:9006${WORK_DIR}/dist-disk-sets17" "http://127.0.0.1:9007${WORK_DIR}/dist-disk-sets18" "http://127.0.0.1:9008${WORK_DIR}/dist-disk-sets19" "http://127.0.0.1:9009${WORK_DIR}/dist-disk-sets20" >"$WORK_DIR/dist-minio-9009.log" 2>&1 &
|
||||||
minio_pids[9]=$!
|
minio_pids[9]=$!
|
||||||
|
|
||||||
sleep 30
|
sleep 35
|
||||||
echo "${minio_pids[@]}"
|
echo "${minio_pids[@]}"
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -170,7 +170,7 @@ function start_minio_dist_erasure()
|
|||||||
"${MINIO[@]}" server --address=:9003 "http://127.0.0.1:9000${WORK_DIR}/dist-disk1" "http://127.0.0.1:9001${WORK_DIR}/dist-disk2" "http://127.0.0.1:9002${WORK_DIR}/dist-disk3" "http://127.0.0.1:9003${WORK_DIR}/dist-disk4" >"$WORK_DIR/dist-minio-9003.log" 2>&1 &
|
"${MINIO[@]}" server --address=:9003 "http://127.0.0.1:9000${WORK_DIR}/dist-disk1" "http://127.0.0.1:9001${WORK_DIR}/dist-disk2" "http://127.0.0.1:9002${WORK_DIR}/dist-disk3" "http://127.0.0.1:9003${WORK_DIR}/dist-disk4" >"$WORK_DIR/dist-minio-9003.log" 2>&1 &
|
||||||
minio_pids[3]=$!
|
minio_pids[3]=$!
|
||||||
|
|
||||||
sleep 30
|
sleep 35
|
||||||
echo "${minio_pids[@]}"
|
echo "${minio_pids[@]}"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,6 +69,10 @@ func (d *errorLocker) IsOnline() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *errorLocker) Expired(args dsync.LockArgs) (reply bool, err error) {
|
||||||
|
return false, errors.New("unable to check for lock expiration")
|
||||||
|
}
|
||||||
|
|
||||||
// localLocker implements Dsync.NetLocker
|
// localLocker implements Dsync.NetLocker
|
||||||
type localLocker struct {
|
type localLocker struct {
|
||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
@ -202,6 +206,34 @@ func (l *localLocker) IsOnline() bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *localLocker) Expired(args dsync.LockArgs) (expired bool, err error) {
|
||||||
|
l.mutex.Lock()
|
||||||
|
defer l.mutex.Unlock()
|
||||||
|
|
||||||
|
// Lock found, proceed to verify if belongs to given uid.
|
||||||
|
if lri, ok := l.lockMap[args.Resource]; ok {
|
||||||
|
// Check whether uid is still active
|
||||||
|
for _, entry := range lri {
|
||||||
|
if entry.UID == args.UID {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Similar to removeEntry but only removes an entry only if the lock entry exists in map.
|
||||||
|
// Caller must hold 'l.mutex' lock.
|
||||||
|
func (l *localLocker) removeEntryIfExists(nlrip nameLockRequesterInfoPair) {
|
||||||
|
// Check if entry is still in map (could have been removed altogether by 'concurrent' (R)Unlock of last entry)
|
||||||
|
if lri, ok := l.lockMap[nlrip.name]; ok {
|
||||||
|
// Even if the entry exists, it may not be the same entry which was
|
||||||
|
// considered as expired, so we simply an attempt to remove it if its
|
||||||
|
// not possible there is nothing we need to do.
|
||||||
|
l.removeEntry(nlrip.name, nlrip.lri.UID, &lri)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newLocker(endpoint Endpoint) *localLocker {
|
func newLocker(endpoint Endpoint) *localLocker {
|
||||||
return &localLocker{
|
return &localLocker{
|
||||||
endpoint: endpoint,
|
endpoint: endpoint,
|
||||||
|
@ -47,6 +47,8 @@ func toLockError(err error) error {
|
|||||||
switch err.Error() {
|
switch err.Error() {
|
||||||
case errLockConflict.Error():
|
case errLockConflict.Error():
|
||||||
return errLockConflict
|
return errLockConflict
|
||||||
|
case errLockNotExpired.Error():
|
||||||
|
return errLockNotExpired
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -109,7 +111,7 @@ func (client *lockRESTClient) restCall(call string, args dsync.LockArgs) (reply
|
|||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
return true, nil
|
return true, nil
|
||||||
case errLockConflict:
|
case errLockConflict, errLockNotExpired:
|
||||||
return false, nil
|
return false, nil
|
||||||
default:
|
default:
|
||||||
return false, err
|
return false, err
|
||||||
@ -136,6 +138,11 @@ func (client *lockRESTClient) Unlock(args dsync.LockArgs) (reply bool, err error
|
|||||||
return client.restCall(lockRESTMethodUnlock, args)
|
return client.restCall(lockRESTMethodUnlock, args)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Expired calls expired handler to check if lock args have expired.
|
||||||
|
func (client *lockRESTClient) Expired(args dsync.LockArgs) (expired bool, err error) {
|
||||||
|
return client.restCall(lockRESTMethodExpired, args)
|
||||||
|
}
|
||||||
|
|
||||||
func closeLockers(lockers []dsync.NetLocker) {
|
func closeLockers(lockers []dsync.NetLocker) {
|
||||||
for _, locker := range lockers {
|
for _, locker := range lockers {
|
||||||
locker.Close()
|
locker.Close()
|
||||||
|
@ -31,6 +31,7 @@ const (
|
|||||||
lockRESTMethodRLock = "/rlock"
|
lockRESTMethodRLock = "/rlock"
|
||||||
lockRESTMethodUnlock = "/unlock"
|
lockRESTMethodUnlock = "/unlock"
|
||||||
lockRESTMethodRUnlock = "/runlock"
|
lockRESTMethodRUnlock = "/runlock"
|
||||||
|
lockRESTMethodExpired = "/expired"
|
||||||
|
|
||||||
// Unique ID of lock/unlock request.
|
// Unique ID of lock/unlock request.
|
||||||
lockRESTUID = "uid"
|
lockRESTUID = "uid"
|
||||||
@ -41,4 +42,7 @@ const (
|
|||||||
lockRESTResource = "resource"
|
lockRESTResource = "resource"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errLockConflict = errors.New("lock conflict")
|
var (
|
||||||
|
errLockConflict = errors.New("lock conflict")
|
||||||
|
errLockNotExpired = errors.New("lock not expired")
|
||||||
|
)
|
||||||
|
@ -18,6 +18,7 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"path"
|
"path"
|
||||||
"time"
|
"time"
|
||||||
@ -127,6 +128,131 @@ func (l *lockRESTServer) RUnlockHandler(w http.ResponseWriter, r *http.Request)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ExpiredHandler - query expired lock status.
|
||||||
|
func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if !l.IsValid(w, r) {
|
||||||
|
l.writeErrorResponse(w, errors.New("Invalid request"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
lockArgs := getLockArgs(r)
|
||||||
|
|
||||||
|
l.ll.mutex.Lock()
|
||||||
|
defer l.ll.mutex.Unlock()
|
||||||
|
// Lock found, proceed to verify if belongs to given uid.
|
||||||
|
if lri, ok := l.ll.lockMap[lockArgs.Resource]; ok {
|
||||||
|
// Check whether uid is still active
|
||||||
|
for _, entry := range lri {
|
||||||
|
if entry.UID == lockArgs.UID {
|
||||||
|
l.writeErrorResponse(w, errLockNotExpired)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// nameLockRequesterInfoPair is a helper type for lock maintenance
|
||||||
|
type nameLockRequesterInfoPair struct {
|
||||||
|
name string
|
||||||
|
lri lockRequesterInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
// getLongLivedLocks returns locks that are older than a certain time and
|
||||||
|
// have not been 'checked' for validity too soon enough
|
||||||
|
func getLongLivedLocks(interval time.Duration) map[Endpoint][]nameLockRequesterInfoPair {
|
||||||
|
nlripMap := make(map[Endpoint][]nameLockRequesterInfoPair)
|
||||||
|
for endpoint, locker := range globalLockServers {
|
||||||
|
rslt := []nameLockRequesterInfoPair{}
|
||||||
|
locker.mutex.Lock()
|
||||||
|
for name, lriArray := range locker.lockMap {
|
||||||
|
for idx := range lriArray {
|
||||||
|
// Check whether enough time has gone by since last check
|
||||||
|
if time.Since(lriArray[idx].TimeLastCheck) >= interval {
|
||||||
|
rslt = append(rslt, nameLockRequesterInfoPair{name: name, lri: lriArray[idx]})
|
||||||
|
lriArray[idx].TimeLastCheck = UTCNow()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nlripMap[endpoint] = rslt
|
||||||
|
locker.mutex.Unlock()
|
||||||
|
}
|
||||||
|
return nlripMap
|
||||||
|
}
|
||||||
|
|
||||||
|
// lockMaintenance loops over locks that have been active for some time and checks back
|
||||||
|
// with the original server whether it is still alive or not
|
||||||
|
//
|
||||||
|
// Following logic inside ignores the errors generated for Dsync.Active operation.
|
||||||
|
// - server at client down
|
||||||
|
// - some network error (and server is up normally)
|
||||||
|
//
|
||||||
|
// We will ignore the error, and we will retry later to get a resolve on this lock
|
||||||
|
func lockMaintenance(interval time.Duration) {
|
||||||
|
// Validate if long lived locks are indeed clean.
|
||||||
|
// Get list of long lived locks to check for staleness.
|
||||||
|
for lendpoint, nlrips := range getLongLivedLocks(interval) {
|
||||||
|
for _, nlrip := range nlrips {
|
||||||
|
for _, ep := range globalEndpoints {
|
||||||
|
for _, endpoint := range ep.Endpoints {
|
||||||
|
if endpoint.String() == lendpoint.String() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
c := newLockAPI(endpoint)
|
||||||
|
if !c.IsOnline() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call back to original server verify whether the lock is still active (based on name & uid)
|
||||||
|
expired, err := c.Expired(dsync.LockArgs{
|
||||||
|
UID: nlrip.lri.UID,
|
||||||
|
Resource: nlrip.name,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
c.Close()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// For successful response, verify if lock was indeed active or stale.
|
||||||
|
if expired {
|
||||||
|
// The lock is no longer active at server that originated
|
||||||
|
// the lock, attempt to remove the lock.
|
||||||
|
globalLockServers[lendpoint].mutex.Lock()
|
||||||
|
// Purge the stale entry if it exists.
|
||||||
|
globalLockServers[lendpoint].removeEntryIfExists(nlrip)
|
||||||
|
globalLockServers[lendpoint].mutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the connection regardless of the call response.
|
||||||
|
c.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start lock maintenance from all lock servers.
|
||||||
|
func startLockMaintenance() {
|
||||||
|
// Start with random sleep time, so as to avoid "synchronous checks" between servers
|
||||||
|
time.Sleep(time.Duration(rand.Float64() * float64(lockMaintenanceInterval)))
|
||||||
|
|
||||||
|
// Initialize a new ticker with a minute between each ticks.
|
||||||
|
ticker := time.NewTicker(lockMaintenanceInterval)
|
||||||
|
// Stop the timer upon service closure and cleanup the go-routine.
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
// Verifies every minute for locks held more than 2 minutes.
|
||||||
|
select {
|
||||||
|
case <-GlobalServiceDoneCh:
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
lockMaintenance(lockValidityCheckInterval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// registerLockRESTHandlers - register lock rest router.
|
// registerLockRESTHandlers - register lock rest router.
|
||||||
func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) {
|
func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) {
|
||||||
queries := restQueries(lockRESTUID, lockRESTSource, lockRESTResource)
|
queries := restQueries(lockRESTUID, lockRESTSource, lockRESTResource)
|
||||||
@ -145,8 +271,11 @@ func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) {
|
|||||||
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler)).Queries(queries...)
|
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler)).Queries(queries...)
|
||||||
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler)).Queries(queries...)
|
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler)).Queries(queries...)
|
||||||
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler)).Queries(queries...)
|
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler)).Queries(queries...)
|
||||||
|
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodExpired).HandlerFunc(httpTraceAll(lockServer.ExpiredHandler)).Queries(queries...)
|
||||||
|
|
||||||
globalLockServers[endpoint] = lockServer.ll
|
globalLockServers[endpoint] = lockServer.ll
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go startLockMaintenance()
|
||||||
}
|
}
|
||||||
|
@ -109,9 +109,9 @@ func (rpcClient *ReconnectRPCClient) Unlock(args LockArgs) (status bool, err err
|
|||||||
return status, err
|
return status, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rpcClient *ReconnectRPCClient) ForceUnlock(args LockArgs) (status bool, err error) {
|
func (rpcClient *ReconnectRPCClient) Expired(args LockArgs) (expired bool, err error) {
|
||||||
err = rpcClient.Call("Dsync.ForceUnlock", &args, &status)
|
err = rpcClient.Call("Dsync.Expired", &args, &expired)
|
||||||
return status, err
|
return expired, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rpcClient *ReconnectRPCClient) String() string {
|
func (rpcClient *ReconnectRPCClient) String() string {
|
||||||
|
@ -51,6 +51,9 @@ type NetLocker interface {
|
|||||||
// * an error on failure of unlock request operation.
|
// * an error on failure of unlock request operation.
|
||||||
Unlock(args LockArgs) (bool, error)
|
Unlock(args LockArgs) (bool, error)
|
||||||
|
|
||||||
|
// Expired returns if current lock args has expired.
|
||||||
|
Expired(args LockArgs) (bool, error)
|
||||||
|
|
||||||
// Returns underlying endpoint of this lock client instance.
|
// Returns underlying endpoint of this lock client instance.
|
||||||
String() string
|
String() string
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user