fix: expire locks only on participating lockers (#11335)

additionally also add a new ForceUnlock API, to
allow forcibly unlocking locks if possible.
This commit is contained in:
Harshavardhana 2021-01-25 10:01:27 -08:00 committed by GitHub
parent bd8020aba8
commit 9cdd981ce7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 227 additions and 107 deletions

View File

@ -42,6 +42,7 @@ import (
"github.com/minio/minio/cmd/logger/message/log"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/bandwidth"
"github.com/minio/minio/pkg/dsync"
"github.com/minio/minio/pkg/handlers"
iampolicy "github.com/minio/minio/pkg/iam/policy"
"github.com/minio/minio/pkg/madmin"
@ -403,6 +404,45 @@ type PeerLocks struct {
Locks map[string][]lockRequesterInfo
}
// ForceUnlockHandler force unlocks requested resource
func (a adminAPIHandlers) ForceUnlockHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "ForceUnlock")
defer logger.AuditLog(w, r, "ForceUnlock", mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.ForceUnlockAdminAction)
if objectAPI == nil {
return
}
z, ok := objectAPI.(*erasureServerPools)
if !ok {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
vars := mux.Vars(r)
var args dsync.LockArgs
lockersMap := make(map[string]dsync.NetLocker)
for _, path := range strings.Split(vars["paths"], ",") {
if path == "" {
continue
}
args.Resources = append(args.Resources, path)
lockers, _ := z.serverPools[0].getHashedSet(path).getLockers()
for _, locker := range lockers {
if locker != nil {
lockersMap[locker.String()] = locker
}
}
}
for _, locker := range lockersMap {
locker.ForceUnlock(ctx, args)
}
}
// TopLocksHandler Get list of locks in use
func (a adminAPIHandlers) TopLocksHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "TopLocks")

View File

@ -189,10 +189,12 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool)
httpTraceHdrs(adminAPI.RemoveRemoteTargetHandler)).Queries("bucket", "{bucket:.*}", "arn", "{arn:.*}")
}
// -- Top APIs --
// Top locks
if globalIsDistErasure {
// Top locks
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/top/locks").HandlerFunc(httpTraceHdrs(adminAPI.TopLocksHandler))
// Force unlocks paths
adminRouter.Methods(http.MethodPost).Path(adminVersion+"/force-unlock").
Queries("paths", "{paths:.*}").HandlerFunc(httpTraceHdrs(adminAPI.ForceUnlockHandler))
}
// HTTP Trace

View File

@ -18,6 +18,7 @@ package cmd
import (
"net/http"
"time"
"github.com/minio/minio/pkg/madmin"
)
@ -45,7 +46,7 @@ func getLocalServerProperty(endpointServerPools EndpointServerPools, r *http.Req
}
_, present := network[nodeName]
if !present {
if err := isServerResolvable(endpoint); err == nil {
if err := isServerResolvable(endpoint, time.Second); err == nil {
network[nodeName] = "online"
} else {
network[nodeName] = "offline"
@ -88,7 +89,7 @@ func getLocalDisks(endpointServerPools EndpointServerPools) []madmin.Disk {
}
_, present := network[nodeName]
if !present {
if err := isServerResolvable(endpoint); err == nil {
if err := isServerResolvable(endpoint, time.Second); err == nil {
network[nodeName] = "online"
} else {
network[nodeName] = "offline"

View File

@ -34,7 +34,6 @@ import (
"github.com/minio/minio/cmd/config/storageclass"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/dsync"
"github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/pkg/sync/errgroup"
)
@ -163,10 +162,6 @@ func (z *erasureServerPools) GetDisksID(ids ...string) []StorageAPI {
return res
}
func (z *erasureServerPools) GetAllLockers() []dsync.NetLocker {
return z.serverPools[0].GetAllLockers()
}
func (z *erasureServerPools) SetDriveCounts() []int {
setDriveCounts := make([]int, len(z.serverPools))
for i := range z.serverPools {

View File

@ -292,27 +292,6 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt
}
}
// GetAllLockers return a list of all lockers for all sets.
func (s *erasureSets) GetAllLockers() []dsync.NetLocker {
var allLockers []dsync.NetLocker
lockEpSet := set.NewStringSet()
for _, lockers := range s.erasureLockers {
for _, locker := range lockers {
if locker == nil || !locker.IsOnline() {
// Skip any offline lockers.
continue
}
if lockEpSet.Contains(locker.String()) {
// Skip duplicate lockers.
continue
}
lockEpSet.Add(locker.String())
allLockers = append(allLockers, locker)
}
}
return allLockers
}
func (s *erasureSets) GetLockers(setIndex int) func() ([]dsync.NetLocker, string) {
return func() ([]dsync.NetLocker, string) {
lockers := make([]dsync.NetLocker, len(s.erasureLockers[setIndex]))

View File

@ -27,11 +27,13 @@ import (
// lockRequesterInfo stores various info from the client for each lock that is requested.
type lockRequesterInfo struct {
Name string // name of the resource lock was requested for
Writer bool // Bool whether write or read lock.
UID string // UID to uniquely identify request of client.
Timestamp time.Time // Timestamp set at the time of initialization.
TimeLastCheck time.Time // Timestamp for last check of validity of lock.
Source string // Contains line, function and filename reqesting the lock.
Group bool // indicates if it was a group lock.
// Owner represents the UUID of the owner who originally requested the lock
// useful in expiry.
Owner string
@ -91,12 +93,14 @@ func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool
for _, resource := range args.Resources {
l.lockMap[resource] = []lockRequesterInfo{
{
Name: resource,
Writer: true,
Source: args.Source,
Owner: args.Owner,
UID: args.UID,
Timestamp: UTCNow(),
TimeLastCheck: UTCNow(),
Group: len(args.Resources) > 1,
Quorum: args.Quorum,
},
}
@ -148,7 +152,9 @@ func (l *localLocker) removeEntry(name string, args dsync.LockArgs, lri *[]lockR
func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
l.mutex.Lock()
defer l.mutex.Unlock()
resource := args.Resources[0]
lrInfo := lockRequesterInfo{
Name: resource,
Writer: false,
Source: args.Source,
Owner: args.Owner,
@ -157,7 +163,6 @@ func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply boo
TimeLastCheck: UTCNow(),
Quorum: args.Quorum,
}
resource := args.Resources[0]
if lri, ok := l.lockMap[resource]; ok {
if reply = !isWriteLock(lri); reply {
// Unless there is a write lock
@ -214,6 +219,23 @@ func (l *localLocker) IsLocal() bool {
return true
}
func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
l.mutex.Lock()
defer l.mutex.Unlock()
if len(args.UID) != 0 {
return false, fmt.Errorf("ForceUnlock called with non-empty UID: %s", args.UID)
}
for _, resource := range args.Resources {
delete(l.lockMap, resource) // Remove the lock (irrespective of write or read lock)
}
return true, nil
}
}
func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired bool, err error) {
select {
case <-ctx.Done():
@ -222,38 +244,42 @@ func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired
l.mutex.Lock()
defer l.mutex.Unlock()
resource := args.Resources[0] // expiry check is always per resource.
// Lock found, proceed to verify if belongs to given uid.
for _, resource := range args.Resources {
if lri, ok := l.lockMap[resource]; ok {
// Check whether uid is still active
for _, entry := range lri {
if entry.UID == args.UID && entry.Owner == args.Owner {
if ep, ok := globalRemoteEndpoints[args.Owner]; ok {
if err = isServerResolvable(ep); err != nil {
return true, nil
}
}
return false, nil
}
lri, ok := l.lockMap[resource]
if !ok {
return true, nil
}
// Check whether uid is still active
for _, entry := range lri {
if entry.UID == args.UID && entry.Owner == args.Owner {
ep := globalRemoteEndpoints[args.Owner]
if !ep.IsLocal {
// check if the owner is online
return isServerResolvable(ep, 250*time.Millisecond) != nil, nil
}
return false, nil
}
}
return true, nil
}
}
// Similar to removeEntry but only removes an entry only if the lock entry exists in map.
// Caller must hold 'l.mutex' lock.
func (l *localLocker) removeEntryIfExists(nlrip nameLockRequesterInfoPair) {
func (l *localLocker) removeEntryIfExists(lrip lockRequesterInfo) {
l.mutex.Lock()
defer l.mutex.Unlock()
// Check if entry is still in map (could have been removed altogether by 'concurrent' (R)Unlock of last entry)
if lri, ok := l.lockMap[nlrip.name]; ok {
if lri, ok := l.lockMap[lrip.Name]; ok {
// Even if the entry exists, it may not be the same entry which was
// considered as expired, so we simply an attempt to remove it if its
// not possible there is nothing we need to do.
l.removeEntry(nlrip.name, dsync.LockArgs{Owner: nlrip.lri.Owner, UID: nlrip.lri.UID}, &lri)
l.removeEntry(lrip.Name, dsync.LockArgs{Owner: lrip.Owner, UID: lrip.UID}, &lri)
}
}

View File

@ -135,6 +135,11 @@ func (client *lockRESTClient) Expired(ctx context.Context, args dsync.LockArgs)
return client.restCall(ctx, lockRESTMethodExpired, args)
}
// ForceUnlock calls force unlock handler to forcibly unlock an active lock.
func (client *lockRESTClient) ForceUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
return client.restCall(ctx, lockRESTMethodForceUnlock, args)
}
func newLockAPI(endpoint Endpoint) dsync.NetLocker {
if endpoint.IsLocal {
return globalLockServer

View File

@ -21,18 +21,19 @@ import (
)
const (
lockRESTVersion = "v4" // Add Quorum query param
lockRESTVersion = "v5" // Add Quorum query param
lockRESTVersionPrefix = SlashSeparator + lockRESTVersion
lockRESTPrefix = minioReservedBucketPath + "/lock"
)
const (
lockRESTMethodHealth = "/health"
lockRESTMethodLock = "/lock"
lockRESTMethodRLock = "/rlock"
lockRESTMethodUnlock = "/unlock"
lockRESTMethodRUnlock = "/runlock"
lockRESTMethodExpired = "/expired"
lockRESTMethodHealth = "/health"
lockRESTMethodLock = "/lock"
lockRESTMethodRLock = "/rlock"
lockRESTMethodUnlock = "/unlock"
lockRESTMethodRUnlock = "/runlock"
lockRESTMethodExpired = "/expired"
lockRESTMethodForceUnlock = "/force-unlock"
// lockRESTOwner represents owner UUID
lockRESTOwner = "owner"

View File

@ -24,6 +24,7 @@ import (
"net/http"
"sort"
"strconv"
"sync"
"time"
"github.com/gorilla/mux"
@ -32,10 +33,10 @@ import (
const (
// Lock maintenance interval.
lockMaintenanceInterval = 30 * time.Second
lockMaintenanceInterval = 10 * time.Second
// Lock validity check interval.
lockValidityCheckInterval = 5 * time.Second
lockValidityCheckInterval = 30 * time.Second
)
// To abstract a node over network.
@ -98,7 +99,7 @@ func (l *lockRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request) {
// LockHandler - Acquires a lock.
func (l *lockRESTServer) LockHandler(w http.ResponseWriter, r *http.Request) {
if !l.IsValid(w, r) {
l.writeErrorResponse(w, errors.New("Invalid request"))
l.writeErrorResponse(w, errors.New("invalid request"))
return
}
@ -121,7 +122,7 @@ func (l *lockRESTServer) LockHandler(w http.ResponseWriter, r *http.Request) {
// UnlockHandler - releases the acquired lock.
func (l *lockRESTServer) UnlockHandler(w http.ResponseWriter, r *http.Request) {
if !l.IsValid(w, r) {
l.writeErrorResponse(w, errors.New("Invalid request"))
l.writeErrorResponse(w, errors.New("invalid request"))
return
}
@ -143,7 +144,7 @@ func (l *lockRESTServer) UnlockHandler(w http.ResponseWriter, r *http.Request) {
// LockHandler - Acquires an RLock.
func (l *lockRESTServer) RLockHandler(w http.ResponseWriter, r *http.Request) {
if !l.IsValid(w, r) {
l.writeErrorResponse(w, errors.New("Invalid request"))
l.writeErrorResponse(w, errors.New("invalid request"))
return
}
@ -166,7 +167,7 @@ func (l *lockRESTServer) RLockHandler(w http.ResponseWriter, r *http.Request) {
// RUnlockHandler - releases the acquired read lock.
func (l *lockRESTServer) RUnlockHandler(w http.ResponseWriter, r *http.Request) {
if !l.IsValid(w, r) {
l.writeErrorResponse(w, errors.New("Invalid request"))
l.writeErrorResponse(w, errors.New("invalid request"))
return
}
@ -184,10 +185,29 @@ func (l *lockRESTServer) RUnlockHandler(w http.ResponseWriter, r *http.Request)
}
}
// ForceUnlockHandler - query expired lock status.
func (l *lockRESTServer) ForceUnlockHandler(w http.ResponseWriter, r *http.Request) {
if !l.IsValid(w, r) {
l.writeErrorResponse(w, errors.New("invalid request"))
return
}
args, err := getLockArgs(r)
if err != nil {
l.writeErrorResponse(w, err)
return
}
if _, err = l.ll.ForceUnlock(r.Context(), args); err != nil {
l.writeErrorResponse(w, err)
return
}
}
// ExpiredHandler - query expired lock status.
func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request) {
if !l.IsValid(w, r) {
l.writeErrorResponse(w, errors.New("Invalid request"))
l.writeErrorResponse(w, errors.New("invalid request"))
return
}
@ -208,31 +228,22 @@ func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request)
}
}
// nameLockRequesterInfoPair is a helper type for lock maintenance
type nameLockRequesterInfoPair struct {
name string
lri lockRequesterInfo
}
// getLongLivedLocks returns locks that are older than a certain time and
// have not been 'checked' for validity too soon enough
func getLongLivedLocks(interval time.Duration) []nameLockRequesterInfoPair {
nlrip := []nameLockRequesterInfoPair{}
func getLongLivedLocks(interval time.Duration) []lockRequesterInfo {
lrips := []lockRequesterInfo{}
globalLockServer.mutex.Lock()
for name, lriArray := range globalLockServer.lockMap {
for _, lriArray := range globalLockServer.lockMap {
for idx := range lriArray {
// Check whether enough time has gone by since last check
if time.Since(lriArray[idx].TimeLastCheck) >= interval {
nlrip = append(nlrip, nameLockRequesterInfoPair{
name: name,
lri: lriArray[idx],
})
lrips = append(lrips, lriArray[idx])
lriArray[idx].TimeLastCheck = UTCNow()
}
}
}
globalLockServer.mutex.Unlock()
return nlrip
return lrips
}
// lockMaintenance loops over locks that have been active for some time and checks back
@ -261,47 +272,68 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
updateNlocks := func(nlripsMap map[string]nlock, name string, writer bool) {
nlk, ok := nlripsMap[name]
if !ok {
if ok {
nlk.locks++
nlripsMap[name] = nlk
} else {
nlripsMap[name] = nlock{
locks: 1,
writer: writer,
}
} else {
nlk.locks++
nlripsMap[name] = nlk
}
}
// Validate if long lived locks are indeed clean.
// Get list of long lived locks to check for staleness.
nlrips := getLongLivedLocks(interval)
nlripsMap := make(map[string]nlock, len(nlrips))
for _, nlrip := range nlrips {
for _, c := range z.GetAllLockers() {
ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second)
// Call back to original server verify whether the lock is
// still active (based on name & uid)
expired, err := c.Expired(ctx, dsync.LockArgs{
Owner: nlrip.lri.Owner,
UID: nlrip.lri.UID,
Resources: []string{nlrip.name},
})
cancel()
if err != nil {
updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer)
continue
}
if !expired {
updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer)
}
lrips := getLongLivedLocks(interval)
lripsMap := make(map[string]nlock, len(lrips))
var mutex sync.Mutex // mutex for lripsMap updates
for _, lrip := range lrips {
// fetch the lockers participated in handing
// out locks for `nlrip.name`
var lockers []dsync.NetLocker
if lrip.Group {
lockers, _ = z.serverPools[0].getHashedSet("").getLockers()
} else {
lockers, _ = z.serverPools[0].getHashedSet(lrip.Name).getLockers()
}
var wg sync.WaitGroup
wg.Add(len(lockers))
for _, c := range lockers {
go func(lrip lockRequesterInfo, c dsync.NetLocker) {
defer wg.Done()
ctx, cancel := context.WithTimeout(GlobalContext, 3*time.Second)
// Call back to all participating servers, verify
// if each of those servers think lock is still
// active, if not expire it.
expired, err := c.Expired(ctx, dsync.LockArgs{
Owner: lrip.Owner,
UID: lrip.UID,
Resources: []string{lrip.Name},
})
cancel()
if err != nil {
mutex.Lock()
updateNlocks(lripsMap, lrip.Name, lrip.Writer)
mutex.Unlock()
return
}
if !expired {
mutex.Lock()
updateNlocks(lripsMap, lrip.Name, lrip.Writer)
mutex.Unlock()
}
}(lrip, c)
}
wg.Wait()
// less than the quorum, we have locks expired.
if nlripsMap[nlrip.name].locks < nlrip.lri.Quorum {
if lripsMap[lrip.Name].locks < lrip.Quorum {
// Purge the stale entry if it exists.
globalLockServer.removeEntryIfExists(nlrip)
globalLockServer.removeEntryIfExists(lrip)
}
}
@ -360,6 +392,7 @@ func registerLockRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodExpired).HandlerFunc(httpTraceAll(lockServer.ExpiredHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodForceUnlock).HandlerFunc(httpTraceAll(lockServer.ForceUnlockHandler))
globalLockServer = lockServer.ll

View File

@ -165,7 +165,7 @@ var errErasureV3ThisEmpty = fmt.Errorf("Erasure format version 3 has This field
// isServerResolvable - checks if the endpoint is resolvable
// by sending a naked HTTP request with liveness checks.
func isServerResolvable(endpoint Endpoint) error {
func isServerResolvable(endpoint Endpoint, timeout time.Duration) error {
serverURL := &url.URL{
Scheme: endpoint.Scheme,
Host: endpoint.Host,
@ -199,7 +199,7 @@ func isServerResolvable(endpoint Endpoint) error {
}
defer httpClient.CloseIdleConnections()
ctx, cancel := context.WithTimeout(GlobalContext, 3*time.Second)
ctx, cancel := context.WithTimeout(GlobalContext, timeout)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, serverURL.String(), nil)
if err != nil {
@ -244,7 +244,7 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints,
return nil, nil, fmt.Errorf("Disk %s: %w", endpoints[i], err)
}
if retryCount >= 5 {
logger.Info("Unable to connect to %s: %v\n", endpoints[i], isServerResolvable(endpoints[i]))
logger.Info("Unable to connect to %s: %v\n", endpoints[i], isServerResolvable(endpoints[i], time.Second))
}
}
}

View File

@ -144,9 +144,10 @@ func serverHandleCmdArgs(ctx *cli.Context) {
for _, z := range globalEndpoints {
for _, ep := range z.Endpoints {
if ep.IsLocal {
continue
globalRemoteEndpoints[GetLocalPeer(globalEndpoints)] = ep
} else {
globalRemoteEndpoints[ep.Host] = ep
}
globalRemoteEndpoints[ep.Host] = ep
}
}
logger.FatalIf(err, "Invalid command line arguments")

View File

@ -119,6 +119,11 @@ func (rpcClient *ReconnectRPCClient) Expired(ctx context.Context, args LockArgs)
return expired, err
}
func (rpcClient *ReconnectRPCClient) ForceUnlock(ctx context.Context, args LockArgs) (reply bool, err error) {
err = rpcClient.Call("Dsync.ForceUnlock", &args, &reply)
return reply, err
}
func (rpcClient *ReconnectRPCClient) String() string {
return "http://" + rpcClient.addr + "/" + rpcClient.endpoint
}

View File

@ -63,6 +63,11 @@ type NetLocker interface {
// Expired returns if current lock args has expired.
Expired(ctx context.Context, args LockArgs) (bool, error)
// Unlock (read/write) forcefully for given LockArgs. It should return
// * a boolean to indicate success/failure of the operation
// * an error on failure of unlock request operation.
ForceUnlock(ctx context.Context, args LockArgs) (bool, error)
// Returns underlying endpoint of this lock client instance.
String() string

View File

@ -33,6 +33,8 @@ const (
StorageInfoAdminAction = "admin:StorageInfo"
// DataUsageInfoAdminAction - allow listing data usage info
DataUsageInfoAdminAction = "admin:DataUsageInfo"
// ForceUnlockAdminAction - allow force unlocking locks
ForceUnlockAdminAction = "admin:ForceUnlock"
// TopLocksAdminAction - allow listing top locks
TopLocksAdminAction = "admin:TopLocksInfo"
// ProfilingAdminAction - allow profiling

View File

@ -24,6 +24,7 @@ import (
"net/http"
"net/url"
"strconv"
"strings"
"time"
)
@ -63,6 +64,30 @@ type TopLockOpts struct {
Stale bool
}
// ForceUnlock force unlocks input paths...
func (adm *AdminClient) ForceUnlock(ctx context.Context, paths ...string) error {
// Execute POST on /minio/admin/v3/force-unlock
queryVals := make(url.Values)
queryVals.Set("paths", strings.Join(paths, ","))
resp, err := adm.executeMethod(ctx,
http.MethodPost,
requestData{
relPath: adminAPIPrefix + "/force-unlock",
queryValues: queryVals,
},
)
defer closeResponse(resp)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return httpRespToErrorResponse(resp)
}
return nil
}
// TopLocksWithOpts - returns the count number of oldest locks currently active on the server.
// additionally we can also enable `stale` to get stale locks currently present on server.
func (adm *AdminClient) TopLocksWithOpts(ctx context.Context, opts TopLockOpts) (LockEntries, error) {