diff --git a/cmd/endpoint.go b/cmd/endpoint.go index 3c27b4528..cca8c85e4 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -207,6 +207,22 @@ type ZoneEndpoints struct { // EndpointServerSets - list of list of endpoints type EndpointServerSets []ZoneEndpoints +// Localhost - returns the local hostname from list of endpoints +func (l EndpointServerSets) Localhost() string { + for _, ep := range l { + for _, endpoint := range ep.Endpoints { + if endpoint.IsLocal { + u := &url.URL{ + Scheme: endpoint.Scheme, + Host: endpoint.Host, + } + return u.String() + } + } + } + return "" +} + // GetLocalZoneIdx returns the zone which endpoint belongs to locally. // if ep is remote this code will return -1 zoneIndex func (l EndpointServerSets) GetLocalZoneIdx(ep Endpoint) int { diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 4dddc037e..92e0a07b9 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -32,6 +32,7 @@ import ( "github.com/dchest/siphash" "github.com/dustin/go-humanize" "github.com/google/uuid" + "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/logger" @@ -289,10 +290,20 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt // GetAllLockers return a list of all lockers for all sets. func (s *erasureSets) GetAllLockers() []dsync.NetLocker { - allLockers := make([]dsync.NetLocker, s.setDriveCount*s.setCount) - for i, lockers := range s.erasureLockers { - for j, locker := range lockers { - allLockers[i*s.setDriveCount+j] = locker + var allLockers []dsync.NetLocker + lockEpSet := set.NewStringSet() + for _, lockers := range s.erasureLockers { + for _, locker := range lockers { + if locker == nil || !locker.IsOnline() { + // Skip any offline lockers. + continue + } + if lockEpSet.Contains(locker.String()) { + // Skip duplicate lockers. + continue + } + lockEpSet.Add(locker.String()) + allLockers = append(allLockers, locker) } } return allLockers @@ -300,15 +311,8 @@ func (s *erasureSets) GetAllLockers() []dsync.NetLocker { func (s *erasureSets) GetLockers(setIndex int) func() ([]dsync.NetLocker, string) { return func() ([]dsync.NetLocker, string) { - lockers := make([]dsync.NetLocker, s.setDriveCount) + lockers := make([]dsync.NetLocker, len(s.erasureLockers[setIndex])) copy(lockers, s.erasureLockers[setIndex]) - sort.Slice(lockers, func(i, j int) bool { - // re-order lockers with affinity for - // - non-local lockers - // - online lockers - // are used first - return !lockers[i].IsLocal() && lockers[i].IsOnline() - }) return lockers, s.erasureLockOwner } } @@ -386,14 +390,24 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto for i := 0; i < setCount; i++ { s.erasureDisks[i] = make([]StorageAPI, setDriveCount) - s.erasureLockers[i] = make([]dsync.NetLocker, setDriveCount) + } + + var erasureLockers = map[string]dsync.NetLocker{} + for _, endpoint := range endpoints { + if _, ok := erasureLockers[endpoint.Host]; !ok { + erasureLockers[endpoint.Host] = newLockAPI(endpoint) + } } for i := 0; i < setCount; i++ { + var lockerEpSet = set.NewStringSet() for j := 0; j < setDriveCount; j++ { endpoint := endpoints[i*setDriveCount+j] // Rely on endpoints list to initialize, init lockers and available disks. - s.erasureLockers[i][j] = newLockAPI(endpoint) + if locker, ok := erasureLockers[endpoint.Host]; ok && !lockerEpSet.Contains(endpoint.Host) { + lockerEpSet.Add(endpoint.Host) + s.erasureLockers[i] = append(s.erasureLockers[i], locker) + } disk := storageDisks[i*setDriveCount+j] if disk == nil { continue diff --git a/cmd/local-locker.go b/cmd/local-locker.go index 187a812c7..1aa21b320 100644 --- a/cmd/local-locker.go +++ b/cmd/local-locker.go @@ -46,13 +46,12 @@ func isWriteLock(lri []lockRequesterInfo) bool { // localLocker implements Dsync.NetLocker type localLocker struct { - mutex sync.Mutex - endpoint Endpoint - lockMap map[string][]lockRequesterInfo + mutex sync.Mutex + lockMap map[string][]lockRequesterInfo } func (l *localLocker) String() string { - return l.endpoint.String() + return globalEndpoints.Localhost() } func (l *localLocker) canTakeUnlock(resources ...string) bool { @@ -287,9 +286,8 @@ func (l *localLocker) expireOldLocks(interval time.Duration) { } } -func newLocker(endpoint Endpoint) *localLocker { +func newLocker() *localLocker { return &localLocker{ - endpoint: endpoint, - lockMap: make(map[string][]lockRequesterInfo), + lockMap: make(map[string][]lockRequesterInfo), } } diff --git a/cmd/lock-rest-client.go b/cmd/lock-rest-client.go index ce0e85491..a63486db9 100644 --- a/cmd/lock-rest-client.go +++ b/cmd/lock-rest-client.go @@ -34,7 +34,7 @@ import ( // lockRESTClient is authenticable lock REST client type lockRESTClient struct { restClient *rest.Client - endpoint Endpoint + u *url.URL } func toLockError(err error) error { @@ -53,7 +53,7 @@ func toLockError(err error) error { // String stringer *dsync.NetLocker* interface compatible method. func (client *lockRESTClient) String() string { - return client.endpoint.String() + return client.u.String() } // Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected @@ -144,7 +144,7 @@ func (client *lockRESTClient) ForceUnlock(ctx context.Context, args dsync.LockAr func newLockAPI(endpoint Endpoint) dsync.NetLocker { if endpoint.IsLocal { - return globalLockServers[endpoint] + return globalLockServer } return newlockRESTClient(endpoint) } @@ -154,7 +154,7 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient { serverURL := &url.URL{ Scheme: endpoint.Scheme, Host: endpoint.Host, - Path: pathJoin(lockRESTPrefix, endpoint.Path, lockRESTVersion), + Path: pathJoin(lockRESTPrefix, lockRESTVersion), } var tlsConfig *tls.Config @@ -179,5 +179,11 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient { return !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &ne) } - return &lockRESTClient{endpoint: endpoint, restClient: restClient} + return &lockRESTClient{ + u: &url.URL{ + Scheme: endpoint.Scheme, + Host: endpoint.Host, + }, + restClient: restClient, + } } diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index 4c6df138f..f89192806 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -21,7 +21,6 @@ import ( "context" "errors" "net/http" - "path" "sort" "strconv" "time" @@ -264,37 +263,27 @@ func lockMaintenance(ctx context.Context) { // Reset the timer for next cycle. lkTimer.Reset(lockMaintenanceInterval) - for _, lockServer := range globalLockServers { - lockServer.expireOldLocks(lockValidityDuration) - } + globalLockServer.expireOldLocks(lockValidityDuration) } } } // registerLockRESTHandlers - register lock rest router. -func registerLockRESTHandlers(router *mux.Router, endpointServerSets EndpointServerSets) { - for _, ep := range endpointServerSets { - for _, endpoint := range ep.Endpoints { - if !endpoint.IsLocal { - continue - } - - lockServer := &lockRESTServer{ - ll: newLocker(endpoint), - } - - subrouter := router.PathPrefix(path.Join(lockRESTPrefix, endpoint.Path)).Subrouter() - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodHealth).HandlerFunc(httpTraceHdrs(lockServer.HealthHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRefresh).HandlerFunc(httpTraceHdrs(lockServer.RefreshHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodForceUnlock).HandlerFunc(httpTraceHdrs(lockServer.ForceUnlockHandler)) - - globalLockServers[endpoint] = lockServer.ll - } +func registerLockRESTHandlers(router *mux.Router) { + lockServer := &lockRESTServer{ + ll: newLocker(), } + subrouter := router.PathPrefix(lockRESTPrefix).Subrouter() + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodHealth).HandlerFunc(httpTraceHdrs(lockServer.HealthHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRefresh).HandlerFunc(httpTraceHdrs(lockServer.RefreshHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodForceUnlock).HandlerFunc(httpTraceHdrs(lockServer.ForceUnlockHandler)) + + globalLockServer = lockServer.ll + go lockMaintenance(GlobalContext) } diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index b3925745c..05780ad21 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -34,7 +34,7 @@ import ( ) // local lock servers -var globalLockServers = make(map[Endpoint]*localLocker) +var globalLockServer *localLocker // RWLocker - locker interface to introduce GetRLock, RUnlock. type RWLocker interface { diff --git a/cmd/notification.go b/cmd/notification.go index 4093c805f..b4dbefdcd 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -521,10 +521,7 @@ func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*Pe } // Once we have received all the locks currently used from peers // add the local peer locks list as well. - llockers := make(GetLocksResp, 0, len(globalLockServers)) - for _, llocker := range globalLockServers { - llockers = append(llockers, llocker.DupLockMap()) - } + llockers := GetLocksResp{globalLockServer.DupLockMap()} locksResp = append(locksResp, &PeerLocks{ Addr: getHostName(r), Locks: llockers, diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 8a7eb81bf..6ac60ffb6 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -48,10 +48,7 @@ func (s *peerRESTServer) GetLocksHandler(w http.ResponseWriter, r *http.Request) ctx := newContext(r, w, "GetLocks") - llockers := make(GetLocksResp, 0, len(globalLockServers)) - for _, llocker := range globalLockServers { - llockers = append(llockers, llocker.DupLockMap()) - } + llockers := GetLocksResp{globalLockServer.DupLockMap()} logger.LogIf(ctx, gob.NewEncoder(w).Encode(llockers)) w.(http.Flusher).Flush() diff --git a/cmd/routers.go b/cmd/routers.go index fe47081ff..d0032ac53 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -34,7 +34,7 @@ func registerDistErasureRouters(router *mux.Router, endpointServerSets EndpointS registerBootstrapRESTHandlers(router) // Register distributed namespace lock routers. - registerLockRESTHandlers(router, endpointServerSets) + registerLockRESTHandlers(router) } // List of some generic handlers which are applied for all incoming requests.