From e6df34175cf0ab59da5eec758a3cbd55c3f33f78 Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Tue, 14 Dec 2021 18:25:45 +0100 Subject: [PATCH] Use one lock server API per node (#13908) This commit is only backporting an adapting to implement per node locking API Before this commit, each disk has its own locker API that needs to be called when locking/unlock resources. This has a performance implication if one deployment has many disks. This PR changes locking calls to use one single endpoint per server instead of an endpoint for each disk. Meaning, if you want to lock a resource, lock calls will be sent per server and not per node. --- cmd/endpoint.go | 16 ++++++++++++++++ cmd/erasure-sets.go | 42 +++++++++++++++++++++++++++-------------- cmd/local-locker.go | 12 +++++------- cmd/lock-rest-client.go | 16 +++++++++++----- cmd/lock-rest-server.go | 41 +++++++++++++++------------------------- cmd/namespace-lock.go | 2 +- cmd/notification.go | 5 +---- cmd/peer-rest-server.go | 5 +---- cmd/routers.go | 2 +- 9 files changed, 79 insertions(+), 62 deletions(-) diff --git a/cmd/endpoint.go b/cmd/endpoint.go index 3c27b4528..cca8c85e4 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -207,6 +207,22 @@ type ZoneEndpoints struct { // EndpointServerSets - list of list of endpoints type EndpointServerSets []ZoneEndpoints +// Localhost - returns the local hostname from list of endpoints +func (l EndpointServerSets) Localhost() string { + for _, ep := range l { + for _, endpoint := range ep.Endpoints { + if endpoint.IsLocal { + u := &url.URL{ + Scheme: endpoint.Scheme, + Host: endpoint.Host, + } + return u.String() + } + } + } + return "" +} + // GetLocalZoneIdx returns the zone which endpoint belongs to locally. // if ep is remote this code will return -1 zoneIndex func (l EndpointServerSets) GetLocalZoneIdx(ep Endpoint) int { diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 4dddc037e..92e0a07b9 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -32,6 +32,7 @@ import ( "github.com/dchest/siphash" "github.com/dustin/go-humanize" "github.com/google/uuid" + "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/logger" @@ -289,10 +290,20 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt // GetAllLockers return a list of all lockers for all sets. func (s *erasureSets) GetAllLockers() []dsync.NetLocker { - allLockers := make([]dsync.NetLocker, s.setDriveCount*s.setCount) - for i, lockers := range s.erasureLockers { - for j, locker := range lockers { - allLockers[i*s.setDriveCount+j] = locker + var allLockers []dsync.NetLocker + lockEpSet := set.NewStringSet() + for _, lockers := range s.erasureLockers { + for _, locker := range lockers { + if locker == nil || !locker.IsOnline() { + // Skip any offline lockers. + continue + } + if lockEpSet.Contains(locker.String()) { + // Skip duplicate lockers. + continue + } + lockEpSet.Add(locker.String()) + allLockers = append(allLockers, locker) } } return allLockers @@ -300,15 +311,8 @@ func (s *erasureSets) GetAllLockers() []dsync.NetLocker { func (s *erasureSets) GetLockers(setIndex int) func() ([]dsync.NetLocker, string) { return func() ([]dsync.NetLocker, string) { - lockers := make([]dsync.NetLocker, s.setDriveCount) + lockers := make([]dsync.NetLocker, len(s.erasureLockers[setIndex])) copy(lockers, s.erasureLockers[setIndex]) - sort.Slice(lockers, func(i, j int) bool { - // re-order lockers with affinity for - // - non-local lockers - // - online lockers - // are used first - return !lockers[i].IsLocal() && lockers[i].IsOnline() - }) return lockers, s.erasureLockOwner } } @@ -386,14 +390,24 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto for i := 0; i < setCount; i++ { s.erasureDisks[i] = make([]StorageAPI, setDriveCount) - s.erasureLockers[i] = make([]dsync.NetLocker, setDriveCount) + } + + var erasureLockers = map[string]dsync.NetLocker{} + for _, endpoint := range endpoints { + if _, ok := erasureLockers[endpoint.Host]; !ok { + erasureLockers[endpoint.Host] = newLockAPI(endpoint) + } } for i := 0; i < setCount; i++ { + var lockerEpSet = set.NewStringSet() for j := 0; j < setDriveCount; j++ { endpoint := endpoints[i*setDriveCount+j] // Rely on endpoints list to initialize, init lockers and available disks. - s.erasureLockers[i][j] = newLockAPI(endpoint) + if locker, ok := erasureLockers[endpoint.Host]; ok && !lockerEpSet.Contains(endpoint.Host) { + lockerEpSet.Add(endpoint.Host) + s.erasureLockers[i] = append(s.erasureLockers[i], locker) + } disk := storageDisks[i*setDriveCount+j] if disk == nil { continue diff --git a/cmd/local-locker.go b/cmd/local-locker.go index 187a812c7..1aa21b320 100644 --- a/cmd/local-locker.go +++ b/cmd/local-locker.go @@ -46,13 +46,12 @@ func isWriteLock(lri []lockRequesterInfo) bool { // localLocker implements Dsync.NetLocker type localLocker struct { - mutex sync.Mutex - endpoint Endpoint - lockMap map[string][]lockRequesterInfo + mutex sync.Mutex + lockMap map[string][]lockRequesterInfo } func (l *localLocker) String() string { - return l.endpoint.String() + return globalEndpoints.Localhost() } func (l *localLocker) canTakeUnlock(resources ...string) bool { @@ -287,9 +286,8 @@ func (l *localLocker) expireOldLocks(interval time.Duration) { } } -func newLocker(endpoint Endpoint) *localLocker { +func newLocker() *localLocker { return &localLocker{ - endpoint: endpoint, - lockMap: make(map[string][]lockRequesterInfo), + lockMap: make(map[string][]lockRequesterInfo), } } diff --git a/cmd/lock-rest-client.go b/cmd/lock-rest-client.go index ce0e85491..a63486db9 100644 --- a/cmd/lock-rest-client.go +++ b/cmd/lock-rest-client.go @@ -34,7 +34,7 @@ import ( // lockRESTClient is authenticable lock REST client type lockRESTClient struct { restClient *rest.Client - endpoint Endpoint + u *url.URL } func toLockError(err error) error { @@ -53,7 +53,7 @@ func toLockError(err error) error { // String stringer *dsync.NetLocker* interface compatible method. func (client *lockRESTClient) String() string { - return client.endpoint.String() + return client.u.String() } // Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected @@ -144,7 +144,7 @@ func (client *lockRESTClient) ForceUnlock(ctx context.Context, args dsync.LockAr func newLockAPI(endpoint Endpoint) dsync.NetLocker { if endpoint.IsLocal { - return globalLockServers[endpoint] + return globalLockServer } return newlockRESTClient(endpoint) } @@ -154,7 +154,7 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient { serverURL := &url.URL{ Scheme: endpoint.Scheme, Host: endpoint.Host, - Path: pathJoin(lockRESTPrefix, endpoint.Path, lockRESTVersion), + Path: pathJoin(lockRESTPrefix, lockRESTVersion), } var tlsConfig *tls.Config @@ -179,5 +179,11 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient { return !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &ne) } - return &lockRESTClient{endpoint: endpoint, restClient: restClient} + return &lockRESTClient{ + u: &url.URL{ + Scheme: endpoint.Scheme, + Host: endpoint.Host, + }, + restClient: restClient, + } } diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index 4c6df138f..f89192806 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -21,7 +21,6 @@ import ( "context" "errors" "net/http" - "path" "sort" "strconv" "time" @@ -264,37 +263,27 @@ func lockMaintenance(ctx context.Context) { // Reset the timer for next cycle. lkTimer.Reset(lockMaintenanceInterval) - for _, lockServer := range globalLockServers { - lockServer.expireOldLocks(lockValidityDuration) - } + globalLockServer.expireOldLocks(lockValidityDuration) } } } // registerLockRESTHandlers - register lock rest router. -func registerLockRESTHandlers(router *mux.Router, endpointServerSets EndpointServerSets) { - for _, ep := range endpointServerSets { - for _, endpoint := range ep.Endpoints { - if !endpoint.IsLocal { - continue - } - - lockServer := &lockRESTServer{ - ll: newLocker(endpoint), - } - - subrouter := router.PathPrefix(path.Join(lockRESTPrefix, endpoint.Path)).Subrouter() - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodHealth).HandlerFunc(httpTraceHdrs(lockServer.HealthHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRefresh).HandlerFunc(httpTraceHdrs(lockServer.RefreshHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodForceUnlock).HandlerFunc(httpTraceHdrs(lockServer.ForceUnlockHandler)) - - globalLockServers[endpoint] = lockServer.ll - } +func registerLockRESTHandlers(router *mux.Router) { + lockServer := &lockRESTServer{ + ll: newLocker(), } + subrouter := router.PathPrefix(lockRESTPrefix).Subrouter() + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodHealth).HandlerFunc(httpTraceHdrs(lockServer.HealthHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRefresh).HandlerFunc(httpTraceHdrs(lockServer.RefreshHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodForceUnlock).HandlerFunc(httpTraceHdrs(lockServer.ForceUnlockHandler)) + + globalLockServer = lockServer.ll + go lockMaintenance(GlobalContext) } diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index b3925745c..05780ad21 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -34,7 +34,7 @@ import ( ) // local lock servers -var globalLockServers = make(map[Endpoint]*localLocker) +var globalLockServer *localLocker // RWLocker - locker interface to introduce GetRLock, RUnlock. type RWLocker interface { diff --git a/cmd/notification.go b/cmd/notification.go index 4093c805f..b4dbefdcd 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -521,10 +521,7 @@ func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*Pe } // Once we have received all the locks currently used from peers // add the local peer locks list as well. - llockers := make(GetLocksResp, 0, len(globalLockServers)) - for _, llocker := range globalLockServers { - llockers = append(llockers, llocker.DupLockMap()) - } + llockers := GetLocksResp{globalLockServer.DupLockMap()} locksResp = append(locksResp, &PeerLocks{ Addr: getHostName(r), Locks: llockers, diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 8a7eb81bf..6ac60ffb6 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -48,10 +48,7 @@ func (s *peerRESTServer) GetLocksHandler(w http.ResponseWriter, r *http.Request) ctx := newContext(r, w, "GetLocks") - llockers := make(GetLocksResp, 0, len(globalLockServers)) - for _, llocker := range globalLockServers { - llockers = append(llockers, llocker.DupLockMap()) - } + llockers := GetLocksResp{globalLockServer.DupLockMap()} logger.LogIf(ctx, gob.NewEncoder(w).Encode(llockers)) w.(http.Flusher).Flush() diff --git a/cmd/routers.go b/cmd/routers.go index fe47081ff..d0032ac53 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -34,7 +34,7 @@ func registerDistErasureRouters(router *mux.Router, endpointServerSets EndpointS registerBootstrapRESTHandlers(router) // Register distributed namespace lock routers. - registerLockRESTHandlers(router, endpointServerSets) + registerLockRESTHandlers(router) } // List of some generic handlers which are applied for all incoming requests.