Add force unlock support (#12274)

Add admin API to force unlock some stale locks

Signed-off-by: Anis Elleuch <anis@min.io>
This commit is contained in:
Anis Elleuch 2021-05-11 17:09:39 +01:00 committed by Harshavardhana
parent 0e1dce37ad
commit ed264449b1
10 changed files with 129 additions and 7 deletions

View File

@ -42,6 +42,7 @@ import (
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/logger/message/log"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/dsync"
"github.com/minio/minio/pkg/handlers"
iampolicy "github.com/minio/minio/pkg/iam/policy"
"github.com/minio/minio/pkg/madmin"
@ -452,6 +453,45 @@ func (a adminAPIHandlers) TopLocksHandler(w http.ResponseWriter, r *http.Request
writeSuccessResponseJSON(w, jsonBytes)
}
// ForceUnlockHandler force unlocks requested resource
func (a adminAPIHandlers) ForceUnlockHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "ForceUnlock")
defer logger.AuditLog(w, r, "ForceUnlock", mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.ForceUnlockAdminAction)
if objectAPI == nil {
return
}
z, ok := objectAPI.(*erasureServerSets)
if !ok {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
vars := mux.Vars(r)
var args dsync.LockArgs
lockersMap := make(map[string]dsync.NetLocker)
for _, path := range strings.Split(vars["paths"], ",") {
if path == "" {
continue
}
args.Resources = append(args.Resources, path)
lockers, _ := z.serverSets[0].getHashedSet(path).getLockers()
for _, locker := range lockers {
if locker != nil {
lockersMap[locker.String()] = locker
}
}
}
for _, locker := range lockersMap {
locker.ForceUnlock(ctx, args)
}
}
// StartProfilingResult contains the status of the starting
// profiling action in a given server
type StartProfilingResult struct {

View File

@ -214,6 +214,8 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool)
// Top locks
if globalIsDistErasure {
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/top/locks").HandlerFunc(httpTraceHdrs(adminAPI.TopLocksHandler))
adminRouter.Methods(http.MethodPost).Path(adminVersion+"/force-unlock").
Queries("paths", "{paths:.*}").HandlerFunc(httpTraceHdrs(adminAPI.ForceUnlockHandler))
}
// HTTP Trace

View File

@ -215,6 +215,23 @@ func (l *localLocker) IsLocal() bool {
return true
}
func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
l.mutex.Lock()
defer l.mutex.Unlock()
if len(args.UID) != 0 {
return false, fmt.Errorf("ForceUnlock called with non-empty UID: %s", args.UID)
}
for _, resource := range args.Resources {
delete(l.lockMap, resource) // Remove the lock (irrespective of write or read lock)
}
return true, nil
}
}
func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired bool, err error) {
select {
case <-ctx.Done():

View File

@ -137,6 +137,11 @@ func (client *lockRESTClient) Expired(ctx context.Context, args dsync.LockArgs)
return client.restCall(ctx, lockRESTMethodExpired, args)
}
// ForceUnlock calls force unlock handler to forcibly unlock an active lock.
func (client *lockRESTClient) ForceUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
return client.restCall(ctx, lockRESTMethodForceUnlock, args)
}
func newLockAPI(endpoint Endpoint) dsync.NetLocker {
if endpoint.IsLocal {
return globalLockServers[endpoint]

View File

@ -21,18 +21,19 @@ import (
)
const (
lockRESTVersion = "v4" // Add Quorum query param
lockRESTVersion = "v5" // Add Force unlock
lockRESTVersionPrefix = SlashSeparator + lockRESTVersion
lockRESTPrefix = minioReservedBucketPath + "/lock"
)
const (
lockRESTMethodHealth = "/health"
lockRESTMethodLock = "/lock"
lockRESTMethodRLock = "/rlock"
lockRESTMethodUnlock = "/unlock"
lockRESTMethodRUnlock = "/runlock"
lockRESTMethodExpired = "/expired"
lockRESTMethodHealth = "/health"
lockRESTMethodLock = "/lock"
lockRESTMethodRLock = "/rlock"
lockRESTMethodUnlock = "/unlock"
lockRESTMethodRUnlock = "/runlock"
lockRESTMethodExpired = "/expired"
lockRESTMethodForceUnlock = "/force-unlock"
// lockRESTOwner represents owner UUID
lockRESTOwner = "owner"

View File

@ -209,6 +209,25 @@ func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request)
}
}
// ForceUnlockHandler - query expired lock status.
func (l *lockRESTServer) ForceUnlockHandler(w http.ResponseWriter, r *http.Request) {
if !l.IsValid(w, r) {
l.writeErrorResponse(w, errors.New("invalid request"))
return
}
args, err := getLockArgs(r)
if err != nil {
l.writeErrorResponse(w, err)
return
}
if _, err = l.ll.ForceUnlock(r.Context(), args); err != nil {
l.writeErrorResponse(w, err)
return
}
}
// nameLockRequesterInfoPair is a helper type for lock maintenance
type nameLockRequesterInfoPair struct {
name string
@ -378,6 +397,7 @@ func registerLockRESTHandlers(router *mux.Router, endpointServerSets EndpointSer
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodForceUnlock).HandlerFunc(httpTraceHdrs(lockServer.ForceUnlockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodExpired).HandlerFunc(httpTraceAll(lockServer.ExpiredHandler))
globalLockServers[endpoint] = lockServer.ll

View File

@ -119,6 +119,11 @@ func (rpcClient *ReconnectRPCClient) Expired(ctx context.Context, args LockArgs)
return expired, err
}
func (rpcClient *ReconnectRPCClient) ForceUnlock(ctx context.Context, args LockArgs) (status bool, err error) {
err = rpcClient.Call("Dsync.ForceUnlock", &args, &status)
return status, err
}
func (rpcClient *ReconnectRPCClient) String() string {
return "http://" + rpcClient.addr + "/" + rpcClient.endpoint
}

View File

@ -60,6 +60,9 @@ type NetLocker interface {
// * an error on failure of unlock request operation.
Unlock(args LockArgs) (bool, error)
// Force unlock a resource
ForceUnlock(ctx context.Context, args LockArgs) (bool, error)
// Expired returns if current lock args has expired.
Expired(ctx context.Context, args LockArgs) (bool, error)

View File

@ -35,6 +35,8 @@ const (
DataUsageInfoAdminAction = "admin:DataUsageInfo"
// TopLocksAdminAction - allow listing top locks
TopLocksAdminAction = "admin:TopLocksInfo"
// ForceUnlockAdminAction - allow force unlock locks
ForceUnlockAdminAction = "admin:ForceUnlock"
// ProfilingAdminAction - allow profiling
ProfilingAdminAction = "admin:Profiling"
// TraceAdminAction - allow listing server trace
@ -127,6 +129,7 @@ var supportedAdminActions = map[AdminAction]struct{}{
StorageInfoAdminAction: {},
DataUsageInfoAdminAction: {},
TopLocksAdminAction: {},
ForceUnlockAdminAction: {},
ProfilingAdminAction: {},
TraceAdminAction: {},
ConsoleLogAdminAction: {},
@ -178,6 +181,7 @@ var adminActionConditionKeyMap = map[Action]condition.KeySet{
OBDInfoAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...),
BandwidthMonitorAction: condition.NewKeySet(condition.AllSupportedAdminKeys...),
TopLocksAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...),
ForceUnlockAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...),
ProfilingAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...),
TraceAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...),
ConsoleLogAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...),

View File

@ -24,6 +24,7 @@ import (
"net/http"
"net/url"
"strconv"
"strings"
"time"
)
@ -102,3 +103,27 @@ func (adm *AdminClient) TopLocksWithOpts(ctx context.Context, opts TopLockOpts)
func (adm *AdminClient) TopLocks(ctx context.Context) (LockEntries, error) {
return adm.TopLocksWithOpts(ctx, TopLockOpts{Count: 10})
}
// ForceUnlock force unlocks input paths...
func (adm *AdminClient) ForceUnlock(ctx context.Context, paths ...string) error {
// Execute POST on /minio/admin/v3/force-unlock
queryVals := make(url.Values)
queryVals.Set("paths", strings.Join(paths, ","))
resp, err := adm.executeMethod(ctx,
http.MethodPost,
requestData{
relPath: adminAPIPrefix + "/force-unlock",
queryValues: queryVals,
},
)
defer closeResponse(resp)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return httpRespToErrorResponse(resp)
}
return nil
}