fix: add lock ownership to expire locks (#10571)

- Add owner information for expiry, locking, unlocking a resource
- TopLocks returns now locks in quorum by default, provides
  a way to capture stale locks as well with `?stale=true`
- Simplify the quorum handling for locks to avoid from storage
  class, because there were challenges to make it consistent
  across all situations.
- And other tiny simplifications to reset locks.
This commit is contained in:
Harshavardhana 2020-09-25 19:21:52 -07:00 committed by GitHub
parent 66b4a862e0
commit eafa775952
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 207 additions and 174 deletions

View File

@ -336,23 +336,26 @@ func (a adminAPIHandlers) DataUsageInfoHandler(w http.ResponseWriter, r *http.Re
writeSuccessResponseJSON(w, dataUsageInfoJSON) writeSuccessResponseJSON(w, dataUsageInfoJSON)
} }
func lriToLockEntry(l lockRequesterInfo, resource, server string) *madmin.LockEntry { func lriToLockEntry(l lockRequesterInfo, resource, server string, rquorum, wquorum int) *madmin.LockEntry {
entry := &madmin.LockEntry{ entry := &madmin.LockEntry{
Timestamp: l.Timestamp, Timestamp: l.Timestamp,
Resource: resource, Resource: resource,
ServerList: []string{server}, ServerList: []string{server},
Source: l.Source, Source: l.Source,
Owner: l.Owner,
ID: l.UID, ID: l.UID,
} }
if l.Writer { if l.Writer {
entry.Type = "WRITE" entry.Type = "WRITE"
entry.Quorum = wquorum
} else { } else {
entry.Type = "READ" entry.Type = "READ"
entry.Quorum = rquorum
} }
return entry return entry
} }
func topLockEntries(peerLocks []*PeerLocks, count int) madmin.LockEntries { func topLockEntries(peerLocks []*PeerLocks, count int, rquorum, wquorum int, stale bool) madmin.LockEntries {
entryMap := make(map[string]*madmin.LockEntry) entryMap := make(map[string]*madmin.LockEntry)
for _, peerLock := range peerLocks { for _, peerLock := range peerLocks {
if peerLock == nil { if peerLock == nil {
@ -364,20 +367,26 @@ func topLockEntries(peerLocks []*PeerLocks, count int) madmin.LockEntries {
if val, ok := entryMap[lockReqInfo.UID]; ok { if val, ok := entryMap[lockReqInfo.UID]; ok {
val.ServerList = append(val.ServerList, peerLock.Addr) val.ServerList = append(val.ServerList, peerLock.Addr)
} else { } else {
entryMap[lockReqInfo.UID] = lriToLockEntry(lockReqInfo, k, peerLock.Addr) entryMap[lockReqInfo.UID] = lriToLockEntry(lockReqInfo, k, peerLock.Addr, rquorum, wquorum)
} }
} }
} }
} }
} }
var lockEntries = make(madmin.LockEntries, 0, len(entryMap)) var lockEntries madmin.LockEntries
for _, v := range entryMap { for _, v := range entryMap {
if len(lockEntries) == count {
break
}
if stale {
lockEntries = append(lockEntries, *v)
continue
}
if len(v.ServerList) >= v.Quorum {
lockEntries = append(lockEntries, *v) lockEntries = append(lockEntries, *v)
} }
sort.Sort(lockEntries)
if len(lockEntries) > count {
lockEntries = lockEntries[:count]
} }
sort.Sort(lockEntries)
return lockEntries return lockEntries
} }
@ -407,21 +416,14 @@ func (a adminAPIHandlers) TopLocksHandler(w http.ResponseWriter, r *http.Request
return return
} }
} }
stale := r.URL.Query().Get("stale") == "true" // list also stale locks
peerLocks := globalNotificationSys.GetLocks(ctx) peerLocks := globalNotificationSys.GetLocks(ctx, r)
// Once we have received all the locks currently used from peers
// add the local peer locks list as well.
var getRespLocks GetLocksResp
for _, llocker := range globalLockServers {
getRespLocks = append(getRespLocks, llocker.DupLockMap())
}
peerLocks = append(peerLocks, &PeerLocks{ rquorum := getReadQuorum(objectAPI.SetDriveCount())
Addr: getHostName(r), wquorum := getWriteQuorum(objectAPI.SetDriveCount())
Locks: getRespLocks,
})
topLocks := topLockEntries(peerLocks, count) topLocks := topLockEntries(peerLocks, count, rquorum, wquorum, stale)
// Marshal API response // Marshal API response
jsonBytes, err := json.Marshal(topLocks) jsonBytes, err := json.Marshal(topLocks)

View File

@ -21,6 +21,7 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"errors" "errors"
"math/rand"
"os" "os"
"path" "path"
"strconv" "strconv"
@ -49,10 +50,6 @@ const (
healDeleteDangling = true healDeleteDangling = true
healFolderIncludeProb = 32 // Include a clean folder one in n cycles. healFolderIncludeProb = 32 // Include a clean folder one in n cycles.
healObjectSelectProb = 512 // Overall probability of a file being scanned; one in n. healObjectSelectProb = 512 // Overall probability of a file being scanned; one in n.
// sleep for an hour after a lock timeout
// before retrying to acquire lock again.
dataCrawlerLeaderLockTimeoutSleepInterval = time.Hour
) )
var ( var (
@ -73,10 +70,11 @@ func initDataCrawler(ctx context.Context, objAPI ObjectLayer) {
func runDataCrawler(ctx context.Context, objAPI ObjectLayer) { func runDataCrawler(ctx context.Context, objAPI ObjectLayer) {
// Make sure only 1 crawler is running on the cluster. // Make sure only 1 crawler is running on the cluster.
locker := objAPI.NewNSLock(ctx, minioMetaBucket, "runDataCrawler.lock") locker := objAPI.NewNSLock(ctx, minioMetaBucket, "runDataCrawler.lock")
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for { for {
err := locker.GetLock(dataCrawlerLeaderLockTimeout) err := locker.GetLock(dataCrawlerLeaderLockTimeout)
if err != nil { if err != nil {
time.Sleep(dataCrawlerLeaderLockTimeoutSleepInterval) time.Sleep(time.Duration(r.Float64() * float64(dataCrawlStartDelay)))
continue continue
} }
break break

View File

@ -66,6 +66,9 @@ type erasureSets struct {
// Distributed locker clients. // Distributed locker clients.
erasureLockers setsDsyncLockers erasureLockers setsDsyncLockers
// Distributed lock owner (constant per running instance).
erasureLockOwner string
// List of endpoints provided on the command line. // List of endpoints provided on the command line.
endpoints Endpoints endpoints Endpoints
@ -261,11 +264,11 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt
} }
} }
func (s *erasureSets) GetLockers(setIndex int) func() []dsync.NetLocker { func (s *erasureSets) GetLockers(setIndex int) func() ([]dsync.NetLocker, string) {
return func() []dsync.NetLocker { return func() ([]dsync.NetLocker, string) {
lockers := make([]dsync.NetLocker, s.setDriveCount) lockers := make([]dsync.NetLocker, s.setDriveCount)
copy(lockers, s.erasureLockers[setIndex]) copy(lockers, s.erasureLockers[setIndex])
return lockers return lockers, s.erasureLockOwner
} }
} }
@ -308,6 +311,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
sets: make([]*erasureObjects, setCount), sets: make([]*erasureObjects, setCount),
erasureDisks: make([][]StorageAPI, setCount), erasureDisks: make([][]StorageAPI, setCount),
erasureLockers: make([][]dsync.NetLocker, setCount), erasureLockers: make([][]dsync.NetLocker, setCount),
erasureLockOwner: mustGetUUID(),
endpoints: endpoints, endpoints: endpoints,
endpointStrings: endpointStrings, endpointStrings: endpointStrings,
setCount: setCount, setCount: setCount,

View File

@ -52,7 +52,7 @@ type erasureObjects struct {
getDisks func() []StorageAPI getDisks func() []StorageAPI
// getLockers returns list of remote and local lockers. // getLockers returns list of remote and local lockers.
getLockers func() []dsync.NetLocker getLockers func() ([]dsync.NetLocker, string)
// getEndpoints returns list of endpoint strings belonging this set. // getEndpoints returns list of endpoint strings belonging this set.
// some may be local and some remote. // some may be local and some remote.

View File

@ -32,6 +32,9 @@ type lockRequesterInfo struct {
Timestamp time.Time // Timestamp set at the time of initialization. Timestamp time.Time // Timestamp set at the time of initialization.
TimeLastCheck time.Time // Timestamp for last check of validity of lock. TimeLastCheck time.Time // Timestamp for last check of validity of lock.
Source string // Contains line, function and filename reqesting the lock. Source string // Contains line, function and filename reqesting the lock.
// Owner represents the UUID of the owner who originally requested the lock
// useful in expiry.
Owner string
} }
// isWriteLock returns whether the lock is a write or read lock. // isWriteLock returns whether the lock is a write or read lock.
@ -73,10 +76,6 @@ func (l *localLocker) canTakeLock(resources ...string) bool {
} }
func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
@ -93,6 +92,7 @@ func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool
{ {
Writer: true, Writer: true,
Source: args.Source, Source: args.Source,
Owner: args.Owner,
UID: args.UID, UID: args.UID,
Timestamp: UTCNow(), Timestamp: UTCNow(),
TimeLastCheck: UTCNow(), TimeLastCheck: UTCNow(),
@ -100,7 +100,6 @@ func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool
} }
} }
return true, nil return true, nil
}
} }
func (l *localLocker) Unlock(args dsync.LockArgs) (reply bool, err error) { func (l *localLocker) Unlock(args dsync.LockArgs) (reply bool, err error) {
@ -113,7 +112,7 @@ func (l *localLocker) Unlock(args dsync.LockArgs) (reply bool, err error) {
} }
for _, resource := range args.Resources { for _, resource := range args.Resources {
lri := l.lockMap[resource] lri := l.lockMap[resource]
if !l.removeEntry(resource, args.UID, &lri) { if !l.removeEntry(resource, args, &lri) {
return false, fmt.Errorf("Unlock unable to find corresponding lock for uid: %s on resource %s", args.UID, resource) return false, fmt.Errorf("Unlock unable to find corresponding lock for uid: %s on resource %s", args.UID, resource)
} }
} }
@ -124,10 +123,10 @@ func (l *localLocker) Unlock(args dsync.LockArgs) (reply bool, err error) {
// removeEntry based on the uid of the lock message, removes a single entry from the // removeEntry based on the uid of the lock message, removes a single entry from the
// lockRequesterInfo array or the whole array from the map (in case of a write lock // lockRequesterInfo array or the whole array from the map (in case of a write lock
// or last read lock) // or last read lock)
func (l *localLocker) removeEntry(name, uid string, lri *[]lockRequesterInfo) bool { func (l *localLocker) removeEntry(name string, args dsync.LockArgs, lri *[]lockRequesterInfo) bool {
// Find correct entry to remove based on uid. // Find correct entry to remove based on uid.
for index, entry := range *lri { for index, entry := range *lri {
if entry.UID == uid { if entry.UID == args.UID && entry.Owner == args.Owner {
if len(*lri) == 1 { if len(*lri) == 1 {
// Remove the write lock. // Remove the write lock.
delete(l.lockMap, name) delete(l.lockMap, name)
@ -145,15 +144,12 @@ func (l *localLocker) removeEntry(name, uid string, lri *[]lockRequesterInfo) bo
} }
func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
lrInfo := lockRequesterInfo{ lrInfo := lockRequesterInfo{
Writer: false, Writer: false,
Source: args.Source, Source: args.Source,
Owner: args.Owner,
UID: args.UID, UID: args.UID,
Timestamp: UTCNow(), Timestamp: UTCNow(),
TimeLastCheck: UTCNow(), TimeLastCheck: UTCNow(),
@ -170,7 +166,6 @@ func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply boo
reply = true reply = true
} }
return reply, nil return reply, nil
}
} }
func (l *localLocker) RUnlock(args dsync.LockArgs) (reply bool, err error) { func (l *localLocker) RUnlock(args dsync.LockArgs) (reply bool, err error) {
@ -187,7 +182,7 @@ func (l *localLocker) RUnlock(args dsync.LockArgs) (reply bool, err error) {
// A write-lock is held, cannot release a read lock // A write-lock is held, cannot release a read lock
return reply, fmt.Errorf("RUnlock attempted on a write locked entity: %s", resource) return reply, fmt.Errorf("RUnlock attempted on a write locked entity: %s", resource)
} }
if !l.removeEntry(resource, args.UID, &lri) { if !l.removeEntry(resource, args, &lri) {
return false, fmt.Errorf("RUnlock unable to find corresponding read lock for uid: %s", args.UID) return false, fmt.Errorf("RUnlock unable to find corresponding read lock for uid: %s", args.UID)
} }
return reply, nil return reply, nil
@ -226,7 +221,7 @@ func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired
if lri, ok := l.lockMap[resource]; ok { if lri, ok := l.lockMap[resource]; ok {
// Check whether uid is still active // Check whether uid is still active
for _, entry := range lri { for _, entry := range lri {
if entry.UID == args.UID { if entry.UID == args.UID && entry.Owner == args.Owner {
return false, nil return false, nil
} }
} }
@ -244,7 +239,7 @@ func (l *localLocker) removeEntryIfExists(nlrip nameLockRequesterInfoPair) {
// Even if the entry exists, it may not be the same entry which was // Even if the entry exists, it may not be the same entry which was
// considered as expired, so we simply an attempt to remove it if its // considered as expired, so we simply an attempt to remove it if its
// not possible there is nothing we need to do. // not possible there is nothing we need to do.
l.removeEntry(nlrip.name, nlrip.lri.UID, &lri) l.removeEntry(nlrip.name, dsync.LockArgs{Owner: nlrip.lri.Owner, UID: nlrip.lri.UID}, &lri)
} }
} }

View File

@ -87,6 +87,7 @@ func (client *lockRESTClient) Close() error {
func (client *lockRESTClient) restCall(ctx context.Context, call string, args dsync.LockArgs) (reply bool, err error) { func (client *lockRESTClient) restCall(ctx context.Context, call string, args dsync.LockArgs) (reply bool, err error) {
values := url.Values{} values := url.Values{}
values.Set(lockRESTUID, args.UID) values.Set(lockRESTUID, args.UID)
values.Set(lockRESTOwner, args.Owner)
values.Set(lockRESTSource, args.Source) values.Set(lockRESTSource, args.Source)
var buffer bytes.Buffer var buffer bytes.Buffer
for _, resource := range args.Resources { for _, resource := range args.Resources {

View File

@ -34,8 +34,12 @@ const (
lockRESTMethodRUnlock = "/runlock" lockRESTMethodRUnlock = "/runlock"
lockRESTMethodExpired = "/expired" lockRESTMethodExpired = "/expired"
// lockRESTOwner represents owner UUID
lockRESTOwner = "owner"
// Unique ID of lock/unlock request. // Unique ID of lock/unlock request.
lockRESTUID = "uid" lockRESTUID = "uid"
// Source contains the line number, function and file name of the code // Source contains the line number, function and file name of the code
// on the client node that requested the lock. // on the client node that requested the lock.
lockRESTSource = "source" lockRESTSource = "source"

View File

@ -21,6 +21,8 @@ import (
"reflect" "reflect"
"sync" "sync"
"testing" "testing"
"github.com/minio/minio/pkg/dsync"
) )
// Helper function to create a lock server for testing // Helper function to create a lock server for testing
@ -53,12 +55,14 @@ func TestLockRpcServerRemoveEntry(t *testing.T) {
defer os.RemoveAll(testPath) defer os.RemoveAll(testPath)
lockRequesterInfo1 := lockRequesterInfo{ lockRequesterInfo1 := lockRequesterInfo{
Owner: "owner",
Writer: true, Writer: true,
UID: "0123-4567", UID: "0123-4567",
Timestamp: UTCNow(), Timestamp: UTCNow(),
TimeLastCheck: UTCNow(), TimeLastCheck: UTCNow(),
} }
lockRequesterInfo2 := lockRequesterInfo{ lockRequesterInfo2 := lockRequesterInfo{
Owner: "owner",
Writer: true, Writer: true,
UID: "89ab-cdef", UID: "89ab-cdef",
Timestamp: UTCNow(), Timestamp: UTCNow(),
@ -73,11 +77,17 @@ func TestLockRpcServerRemoveEntry(t *testing.T) {
lri := locker.ll.lockMap["name"] lri := locker.ll.lockMap["name"]
// test unknown uid // test unknown uid
if locker.ll.removeEntry("name", "unknown-uid", &lri) { if locker.ll.removeEntry("name", dsync.LockArgs{
Owner: "owner",
UID: "unknown-uid",
}, &lri) {
t.Errorf("Expected %#v, got %#v", false, true) t.Errorf("Expected %#v, got %#v", false, true)
} }
if !locker.ll.removeEntry("name", "0123-4567", &lri) { if !locker.ll.removeEntry("name", dsync.LockArgs{
Owner: "owner",
UID: "0123-4567",
}, &lri) {
t.Errorf("Expected %#v, got %#v", true, false) t.Errorf("Expected %#v, got %#v", true, false)
} else { } else {
gotLri := locker.ll.lockMap["name"] gotLri := locker.ll.lockMap["name"]
@ -87,7 +97,10 @@ func TestLockRpcServerRemoveEntry(t *testing.T) {
} }
} }
if !locker.ll.removeEntry("name", "89ab-cdef", &lri) { if !locker.ll.removeEntry("name", dsync.LockArgs{
Owner: "owner",
UID: "89ab-cdef",
}, &lri) {
t.Errorf("Expected %#v, got %#v", true, false) t.Errorf("Expected %#v, got %#v", true, false)
} else { } else {
gotLri := locker.ll.lockMap["name"] gotLri := locker.ll.lockMap["name"]

View File

@ -32,7 +32,7 @@ import (
const ( const (
// Lock maintenance interval. // Lock maintenance interval.
lockMaintenanceInterval = 1 * time.Minute lockMaintenanceInterval = 30 * time.Second
// Lock validity check interval. // Lock validity check interval.
lockValidityCheckInterval = 2 * time.Minute lockValidityCheckInterval = 2 * time.Minute
@ -64,6 +64,7 @@ func (l *lockRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool {
func getLockArgs(r *http.Request) (args dsync.LockArgs, err error) { func getLockArgs(r *http.Request) (args dsync.LockArgs, err error) {
args = dsync.LockArgs{ args = dsync.LockArgs{
Owner: r.URL.Query().Get(lockRESTOwner),
UID: r.URL.Query().Get(lockRESTUID), UID: r.URL.Query().Get(lockRESTUID),
Source: r.URL.Query().Get(lockRESTSource), Source: r.URL.Query().Get(lockRESTSource),
} }
@ -246,10 +247,28 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
return nil return nil
} }
type nlock struct {
locks int
writer bool
}
updateNlocks := func(nlripsMap map[string]nlock, name string, writer bool) {
nlk, ok := nlripsMap[name]
if !ok {
nlripsMap[name] = nlock{
locks: 1,
writer: writer,
}
} else {
nlk.locks++
nlripsMap[name] = nlk
}
}
// Validate if long lived locks are indeed clean. // Validate if long lived locks are indeed clean.
// Get list of long lived locks to check for staleness. // Get list of long lived locks to check for staleness.
for lendpoint, nlrips := range getLongLivedLocks(interval) { for lendpoint, nlrips := range getLongLivedLocks(interval) {
nlripsMap := make(map[string]int, len(nlrips)) nlripsMap := make(map[string]nlock, len(nlrips))
for _, nlrip := range nlrips { for _, nlrip := range nlrips {
// Locks are only held on first zone, make sure that // Locks are only held on first zone, make sure that
// we only look for ownership of locks from endpoints // we only look for ownership of locks from endpoints
@ -257,7 +276,7 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
for _, endpoint := range globalEndpoints[0].Endpoints { for _, endpoint := range globalEndpoints[0].Endpoints {
c := newLockAPI(endpoint) c := newLockAPI(endpoint)
if !c.IsOnline() { if !c.IsOnline() {
nlripsMap[nlrip.name]++ updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer)
continue continue
} }
@ -266,18 +285,19 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
// Call back to original server verify whether the lock is // Call back to original server verify whether the lock is
// still active (based on name & uid) // still active (based on name & uid)
expired, err := c.Expired(ctx, dsync.LockArgs{ expired, err := c.Expired(ctx, dsync.LockArgs{
Owner: nlrip.lri.Owner,
UID: nlrip.lri.UID, UID: nlrip.lri.UID,
Resources: []string{nlrip.name}, Resources: []string{nlrip.name},
}) })
cancel() cancel()
if err != nil { if err != nil {
nlripsMap[nlrip.name]++ updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer)
c.Close() c.Close()
continue continue
} }
if !expired { if !expired {
nlripsMap[nlrip.name]++ updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer)
} }
// Close the connection regardless of the call response. // Close the connection regardless of the call response.
@ -285,14 +305,13 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
} }
// Read locks we assume quorum for be N/2 success // Read locks we assume quorum for be N/2 success
quorum := objAPI.SetDriveCount() / 2 quorum := getReadQuorum(objAPI.SetDriveCount())
if nlrip.lri.Writer { if nlrip.lri.Writer {
// For write locks we need N/2+1 success quorum = getWriteQuorum(objAPI.SetDriveCount())
quorum = objAPI.SetDriveCount()/2 + 1
} }
// less than the quorum, we have locks expired. // less than the quorum, we have locks expired.
if nlripsMap[nlrip.name] < quorum { if nlripsMap[nlrip.name].locks < quorum {
// The lock is no longer active at server that originated // The lock is no longer active at server that originated
// the lock, attempt to remove the lock. // the lock, attempt to remove the lock.
globalLockServers[lendpoint].mutex.Lock() globalLockServers[lendpoint].mutex.Lock()
@ -348,7 +367,6 @@ func startLockMaintenance(ctx context.Context) {
// registerLockRESTHandlers - register lock rest router. // registerLockRESTHandlers - register lock rest router.
func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) { func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) {
queries := restQueries(lockRESTUID, lockRESTSource)
for _, ep := range endpointZones { for _, ep := range endpointZones {
for _, endpoint := range ep.Endpoints { for _, endpoint := range ep.Endpoints {
if !endpoint.IsLocal { if !endpoint.IsLocal {
@ -361,11 +379,11 @@ func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) {
subrouter := router.PathPrefix(path.Join(lockRESTPrefix, endpoint.Path)).Subrouter() subrouter := router.PathPrefix(path.Join(lockRESTPrefix, endpoint.Path)).Subrouter()
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodHealth).HandlerFunc(httpTraceHdrs(lockServer.HealthHandler)) subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodHealth).HandlerFunc(httpTraceHdrs(lockServer.HealthHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler)).Queries(queries...) subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler)).Queries(queries...) subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler)).Queries(queries...) subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler)).Queries(queries...) subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodExpired).HandlerFunc(httpTraceAll(lockServer.ExpiredHandler)).Queries(queries...) subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodExpired).HandlerFunc(httpTraceAll(lockServer.ExpiredHandler))
globalLockServers[endpoint] = lockServer.ll globalLockServers[endpoint] = lockServer.ll
} }

View File

@ -28,7 +28,6 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/minio/minio/cmd/config/storageclass"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/dsync" "github.com/minio/minio/pkg/dsync"
"github.com/minio/minio/pkg/lsync" "github.com/minio/minio/pkg/lsync"
@ -148,17 +147,8 @@ func (di *distLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error)
lockSource := getSource(2) lockSource := getSource(2)
start := UTCNow() start := UTCNow()
// Lockers default to standard storage class always, why because
// we always dictate storage tolerance in terms of standard
// storage class be it number of drives or a multiplicative
// of number of nodes, defaulting lockers to this value
// simply means that locking is always similar in behavior
// and effect with erasure coded drive tolerance.
tolerance := globalStorageClass.GetParityForSC(storageclass.STANDARD)
if !di.rwMutex.GetLock(di.ctx, di.opsID, lockSource, dsync.Options{ if !di.rwMutex.GetLock(di.ctx, di.opsID, lockSource, dsync.Options{
Timeout: timeout.Timeout(), Timeout: timeout.Timeout(),
Tolerance: tolerance,
}) { }) {
timeout.LogFailure() timeout.LogFailure()
return OperationTimedOut{} return OperationTimedOut{}
@ -177,12 +167,8 @@ func (di *distLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr error
lockSource := getSource(2) lockSource := getSource(2)
start := UTCNow() start := UTCNow()
// Lockers default to standard storage class always.
tolerance := globalStorageClass.GetParityForSC(storageclass.STANDARD)
if !di.rwMutex.GetRLock(di.ctx, di.opsID, lockSource, dsync.Options{ if !di.rwMutex.GetRLock(di.ctx, di.opsID, lockSource, dsync.Options{
Timeout: timeout.Timeout(), Timeout: timeout.Timeout(),
Tolerance: tolerance,
}) { }) {
timeout.LogFailure() timeout.LogFailure()
return OperationTimedOut{} return OperationTimedOut{}
@ -208,11 +194,11 @@ type localLockInstance struct {
// NewNSLock - returns a lock instance for a given volume and // NewNSLock - returns a lock instance for a given volume and
// path. The returned lockInstance object encapsulates the nsLockMap, // path. The returned lockInstance object encapsulates the nsLockMap,
// volume, path and operation ID. // volume, path and operation ID.
func (n *nsLockMap) NewNSLock(ctx context.Context, lockersFn func() []dsync.NetLocker, volume string, paths ...string) RWLocker { func (n *nsLockMap) NewNSLock(ctx context.Context, lockers func() ([]dsync.NetLocker, string), volume string, paths ...string) RWLocker {
opsID := mustGetUUID() opsID := mustGetUUID()
if n.isDistErasure { if n.isDistErasure {
drwmutex := dsync.NewDRWMutex(&dsync.Dsync{ drwmutex := dsync.NewDRWMutex(&dsync.Dsync{
GetLockersFn: lockersFn, GetLockers: lockers,
}, pathsJoinPrefix(volume, paths...)...) }, pathsJoinPrefix(volume, paths...)...)
return &distLockInstance{drwmutex, opsID, ctx} return &distLockInstance{drwmutex, opsID, ctx}
} }

View File

@ -22,6 +22,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"net/http"
"net/url" "net/url"
"sort" "sort"
"strings" "strings"
@ -505,36 +506,24 @@ func (sys *NotificationSys) updateBloomFilter(ctx context.Context, current uint6
} }
// GetLocks - makes GetLocks RPC call on all peers. // GetLocks - makes GetLocks RPC call on all peers.
func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks { func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*PeerLocks {
locksResp := make([]*PeerLocks, len(sys.peerClients)) locksResp := make([]*PeerLocks, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients)) g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients { for index, client := range sys.peerClients {
if client == nil {
continue
}
index := index index := index
g.Go(func() error { g.Go(func() error {
// Try to fetch serverInfo remotely in three attempts. if client == nil {
for i := 0; i < 3; i++ { return nil
}
serverLocksResp, err := sys.peerClients[index].GetLocks() serverLocksResp, err := sys.peerClients[index].GetLocks()
if err == nil { if err != nil {
return err
}
locksResp[index] = &PeerLocks{ locksResp[index] = &PeerLocks{
Addr: sys.peerClients[index].host.String(), Addr: sys.peerClients[index].host.String(),
Locks: serverLocksResp, Locks: serverLocksResp,
} }
return nil return nil
}
// Last iteration log the error.
if i == 2 {
return err
}
// Wait for one second and no need wait after last attempt.
if i < 2 {
time.Sleep(1 * time.Second)
}
}
return nil
}, index) }, index)
} }
for index, err := range g.Wait() { for index, err := range g.Wait() {
@ -543,6 +532,16 @@ func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks {
ctx := logger.SetReqInfo(ctx, reqInfo) ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogOnceIf(ctx, err, sys.peerClients[index].host.String()) logger.LogOnceIf(ctx, err, sys.peerClients[index].host.String())
} }
// Once we have received all the locks currently used from peers
// add the local peer locks list as well.
var getRespLocks GetLocksResp
for _, llocker := range globalLockServers {
getRespLocks = append(getRespLocks, llocker.DupLockMap())
}
locksResp = append(locksResp, &PeerLocks{
Addr: getHostName(r),
Locks: getRespLocks,
})
return locksResp return locksResp
} }

View File

@ -71,8 +71,9 @@ func isLocked(uid string) bool {
// NewDRWMutex - initializes a new dsync RW mutex. // NewDRWMutex - initializes a new dsync RW mutex.
func NewDRWMutex(clnt *Dsync, names ...string) *DRWMutex { func NewDRWMutex(clnt *Dsync, names ...string) *DRWMutex {
restClnts, _ := clnt.GetLockers()
return &DRWMutex{ return &DRWMutex{
writeLocks: make([]string, len(clnt.GetLockersFn())), writeLocks: make([]string, len(restClnts)),
Names: names, Names: names,
clnt: clnt, clnt: clnt,
} }
@ -141,28 +142,19 @@ const (
// algorithm until either the lock is acquired successfully or more // algorithm until either the lock is acquired successfully or more
// time has elapsed than the timeout value. // time has elapsed than the timeout value.
func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadLock bool, opts Options) (locked bool) { func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadLock bool, opts Options) (locked bool) {
restClnts := dm.clnt.GetLockersFn() restClnts, _ := dm.clnt.GetLockers()
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
// Create lock array to capture the successful lockers // Create lock array to capture the successful lockers
locks := make([]string, len(restClnts)) locks := make([]string, len(restClnts))
cleanLocks := func(locks []string) {
for i := range locks {
locks[i] = ""
}
}
log("lockBlocking %s/%s for %#v: lockType readLock(%t), additional opts: %#v\n", id, source, dm.Names, isReadLock, opts) log("lockBlocking %s/%s for %#v: lockType readLock(%t), additional opts: %#v\n", id, source, dm.Names, isReadLock, opts)
retryCtx, cancel := context.WithTimeout(ctx, opts.Timeout) retryCtx, cancel := context.WithTimeout(ctx, opts.Timeout)
defer cancel() defer cancel()
for { for {
// cleanup any older state, re-use the lock slice.
cleanLocks(locks)
select { select {
case <-retryCtx.Done(): case <-retryCtx.Done():
log("lockBlocking canceled %s/%s for %#v: lockType readLock(%t), additional opts: %#v\n", id, source, dm.Names, isReadLock, opts) log("lockBlocking canceled %s/%s for %#v: lockType readLock(%t), additional opts: %#v\n", id, source, dm.Names, isReadLock, opts)
@ -195,8 +187,11 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL
// lock tries to acquire the distributed lock, returning true or false. // lock tries to acquire the distributed lock, returning true or false.
func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, isReadLock bool, tolerance int, lockNames ...string) bool { func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, isReadLock bool, tolerance int, lockNames ...string) bool {
for i := range *locks {
(*locks)[i] = ""
}
restClnts := ds.GetLockersFn() restClnts, owner := ds.GetLockers()
// Tolerance is not set, defaults to half of the locker clients. // Tolerance is not set, defaults to half of the locker clients.
if tolerance == 0 { if tolerance == 0 {
@ -237,6 +232,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
} }
args := LockArgs{ args := LockArgs{
Owner: owner,
UID: id, UID: id,
Resources: lockNames, Resources: lockNames,
Source: source, Source: source,
@ -293,7 +289,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
done = true done = true
// Increment the number of grants received from the buffered channel. // Increment the number of grants received from the buffered channel.
i++ i++
releaseAll(ds, locks, isReadLock, restClnts, lockNames...) releaseAll(ds, owner, locks, isReadLock, restClnts, lockNames...)
} }
} }
case <-timeout: case <-timeout:
@ -302,7 +298,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
// number of locks to check whether we have quorum or not // number of locks to check whether we have quorum or not
if !checkQuorumMet(locks, quorum) { if !checkQuorumMet(locks, quorum) {
log("Quorum not met after timeout\n") log("Quorum not met after timeout\n")
releaseAll(ds, locks, isReadLock, restClnts, lockNames...) releaseAll(ds, owner, locks, isReadLock, restClnts, lockNames...)
} else { } else {
log("Quorum met after timeout\n") log("Quorum met after timeout\n")
} }
@ -327,6 +323,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
if grantToBeReleased.isLocked() { if grantToBeReleased.isLocked() {
// release lock // release lock
sendRelease(ds, restClnts[grantToBeReleased.index], sendRelease(ds, restClnts[grantToBeReleased.index],
owner,
grantToBeReleased.lockUID, isReadLock, lockNames...) grantToBeReleased.lockUID, isReadLock, lockNames...)
} }
} }
@ -350,10 +347,10 @@ func checkQuorumMet(locks *[]string, quorum int) bool {
} }
// releaseAll releases all locks that are marked as locked // releaseAll releases all locks that are marked as locked
func releaseAll(ds *Dsync, locks *[]string, isReadLock bool, restClnts []NetLocker, lockNames ...string) { func releaseAll(ds *Dsync, owner string, locks *[]string, isReadLock bool, restClnts []NetLocker, lockNames ...string) {
for lock := range restClnts { for lock := range restClnts {
if isLocked((*locks)[lock]) { if isLocked((*locks)[lock]) {
sendRelease(ds, restClnts[lock], (*locks)[lock], isReadLock, lockNames...) sendRelease(ds, restClnts[lock], owner, (*locks)[lock], isReadLock, lockNames...)
(*locks)[lock] = "" (*locks)[lock] = ""
} }
} }
@ -364,7 +361,7 @@ func releaseAll(ds *Dsync, locks *[]string, isReadLock bool, restClnts []NetLock
// It is a run-time error if dm is not locked on entry to Unlock. // It is a run-time error if dm is not locked on entry to Unlock.
func (dm *DRWMutex) Unlock() { func (dm *DRWMutex) Unlock() {
restClnts := dm.clnt.GetLockersFn() restClnts, owner := dm.clnt.GetLockers()
// create temp array on stack // create temp array on stack
locks := make([]string, len(restClnts)) locks := make([]string, len(restClnts))
@ -391,7 +388,7 @@ func (dm *DRWMutex) Unlock() {
} }
isReadLock := false isReadLock := false
unlock(dm.clnt, locks, isReadLock, restClnts, dm.Names...) unlock(dm.clnt, owner, locks, isReadLock, restClnts, dm.Names...)
} }
// RUnlock releases a read lock held on dm. // RUnlock releases a read lock held on dm.
@ -400,7 +397,7 @@ func (dm *DRWMutex) Unlock() {
func (dm *DRWMutex) RUnlock() { func (dm *DRWMutex) RUnlock() {
// create temp array on stack // create temp array on stack
restClnts := dm.clnt.GetLockersFn() restClnts, owner := dm.clnt.GetLockers()
locks := make([]string, len(restClnts)) locks := make([]string, len(restClnts))
{ {
@ -416,10 +413,10 @@ func (dm *DRWMutex) RUnlock() {
} }
isReadLock := true isReadLock := true
unlock(dm.clnt, locks, isReadLock, restClnts, dm.Names...) unlock(dm.clnt, owner, locks, isReadLock, restClnts, dm.Names...)
} }
func unlock(ds *Dsync, locks []string, isReadLock bool, restClnts []NetLocker, names ...string) { func unlock(ds *Dsync, owner string, locks []string, isReadLock bool, restClnts []NetLocker, names ...string) {
// We don't need to synchronously wait until we have released all the locks (or the quorum) // We don't need to synchronously wait until we have released all the locks (or the quorum)
// (a subsequent lock will retry automatically in case it would fail to get quorum) // (a subsequent lock will retry automatically in case it would fail to get quorum)
@ -428,19 +425,20 @@ func unlock(ds *Dsync, locks []string, isReadLock bool, restClnts []NetLocker, n
if isLocked(locks[index]) { if isLocked(locks[index]) {
// broadcast lock release to all nodes that granted the lock // broadcast lock release to all nodes that granted the lock
sendRelease(ds, c, locks[index], isReadLock, names...) sendRelease(ds, c, owner, locks[index], isReadLock, names...)
} }
} }
} }
// sendRelease sends a release message to a node that previously granted a lock // sendRelease sends a release message to a node that previously granted a lock
func sendRelease(ds *Dsync, c NetLocker, uid string, isReadLock bool, names ...string) { func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bool, names ...string) {
if c == nil { if c == nil {
log("Unable to call RUnlock failed with %s\n", errors.New("netLocker is offline")) log("Unable to call RUnlock failed with %s\n", errors.New("netLocker is offline"))
return return
} }
args := LockArgs{ args := LockArgs{
Owner: owner,
UID: uid, UID: uid,
Resources: names, Resources: names,
} }

View File

@ -20,5 +20,5 @@ package dsync
// authenticated clients, used to initiate lock REST calls. // authenticated clients, used to initiate lock REST calls.
type Dsync struct { type Dsync struct {
// List of rest client objects, one per lock server. // List of rest client objects, one per lock server.
GetLockersFn func() []NetLocker GetLockers func() ([]NetLocker, string)
} }

View File

@ -31,6 +31,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/uuid"
. "github.com/minio/minio/pkg/dsync" . "github.com/minio/minio/pkg/dsync"
) )
@ -78,7 +79,7 @@ func TestMain(m *testing.M) {
} }
ds = &Dsync{ ds = &Dsync{
GetLockersFn: func() []NetLocker { return clnts }, GetLockers: func() ([]NetLocker, string) { return clnts, uuid.New().String() },
} }
startRPCServers(nodes) startRPCServers(nodes)

View File

@ -29,6 +29,10 @@ type LockArgs struct {
// Source contains the line number, function and file name of the code // Source contains the line number, function and file name of the code
// on the client node that requested the lock. // on the client node that requested the lock.
Source string Source string
// Owner represents unique ID for this instance, an owner who originally requested
// the locked resource, useful primarily in figuring our stale locks.
Owner string
} }
// NetLocker is dsync compatible locker interface. // NetLocker is dsync compatible locker interface.

View File

@ -36,7 +36,9 @@ type LockEntry struct {
Type string `json:"type"` // Type indicates if 'Write' or 'Read' lock Type string `json:"type"` // Type indicates if 'Write' or 'Read' lock
Source string `json:"source"` // Source at which lock was granted Source string `json:"source"` // Source at which lock was granted
ServerList []string `json:"serverlist"` // List of servers participating in the lock. ServerList []string `json:"serverlist"` // List of servers participating in the lock.
Owner string `json:"owner"` // Owner UUID indicates server owns the lock.
ID string `json:"id"` // UID to uniquely identify request of client. ID string `json:"id"` // UID to uniquely identify request of client.
Quorum int `json:"quorum"` // represents quorum number of servers required to hold this lock, used to look for stale locks.
} }
// LockEntries - To sort the locks // LockEntries - To sort the locks
@ -54,13 +56,21 @@ func (l LockEntries) Swap(i, j int) {
l[i], l[j] = l[j], l[i] l[i], l[j] = l[j], l[i]
} }
// TopNLocks - returns the count number of oldest locks currently active on the server. // TopLockOpts top lock options
func (adm *AdminClient) TopNLocks(ctx context.Context, count int) (LockEntries, error) { type TopLockOpts struct {
Count int
Stale bool
}
// TopLocksWithOpts - returns the count number of oldest locks currently active on the server.
// additionally we can also enable `stale` to get stale locks currently present on server.
func (adm *AdminClient) TopLocksWithOpts(ctx context.Context, opts TopLockOpts) (LockEntries, error) {
// Execute GET on /minio/admin/v3/top/locks?count=10 // Execute GET on /minio/admin/v3/top/locks?count=10
// to get the 'count' number of oldest locks currently // to get the 'count' number of oldest locks currently
// active on the server. // active on the server.
queryVals := make(url.Values) queryVals := make(url.Values)
queryVals.Set("count", strconv.Itoa(count)) queryVals.Set("count", strconv.Itoa(opts.Count))
queryVals.Set("stale", strconv.FormatBool(opts.Stale))
resp, err := adm.executeMethod(ctx, resp, err := adm.executeMethod(ctx,
http.MethodGet, http.MethodGet,
requestData{ requestData{
@ -89,5 +99,5 @@ func (adm *AdminClient) TopNLocks(ctx context.Context, count int) (LockEntries,
// TopLocks - returns top '10' oldest locks currently active on the server. // TopLocks - returns top '10' oldest locks currently active on the server.
func (adm *AdminClient) TopLocks(ctx context.Context) (LockEntries, error) { func (adm *AdminClient) TopLocks(ctx context.Context) (LockEntries, error) {
return adm.TopNLocks(ctx, 10) return adm.TopLocksWithOpts(ctx, TopLockOpts{Count: 10})
} }