Use one lock server API per node (#13908)

This commit is only backporting an adapting to implement per node
locking API

Before this commit, each disk has its own locker API that needs to be
called when locking/unlock resources. This has a performance implication
if one deployment has many disks.

This PR changes locking calls to use one single endpoint per server
instead of an endpoint for each disk.  Meaning, if you want to lock
a resource, lock calls will be sent per server and not per node.
This commit is contained in:
Anis Elleuch 2021-12-14 18:25:45 +01:00 committed by Minio Trusted
parent 287829c4a8
commit e6df34175c
9 changed files with 79 additions and 62 deletions

View File

@ -207,6 +207,22 @@ type ZoneEndpoints struct {
// EndpointServerSets - list of list of endpoints // EndpointServerSets - list of list of endpoints
type EndpointServerSets []ZoneEndpoints type EndpointServerSets []ZoneEndpoints
// Localhost - returns the local hostname from list of endpoints
func (l EndpointServerSets) Localhost() string {
for _, ep := range l {
for _, endpoint := range ep.Endpoints {
if endpoint.IsLocal {
u := &url.URL{
Scheme: endpoint.Scheme,
Host: endpoint.Host,
}
return u.String()
}
}
}
return ""
}
// GetLocalZoneIdx returns the zone which endpoint belongs to locally. // GetLocalZoneIdx returns the zone which endpoint belongs to locally.
// if ep is remote this code will return -1 zoneIndex // if ep is remote this code will return -1 zoneIndex
func (l EndpointServerSets) GetLocalZoneIdx(ep Endpoint) int { func (l EndpointServerSets) GetLocalZoneIdx(ep Endpoint) int {

View File

@ -32,6 +32,7 @@ import (
"github.com/dchest/siphash" "github.com/dchest/siphash"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
@ -289,10 +290,20 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt
// GetAllLockers return a list of all lockers for all sets. // GetAllLockers return a list of all lockers for all sets.
func (s *erasureSets) GetAllLockers() []dsync.NetLocker { func (s *erasureSets) GetAllLockers() []dsync.NetLocker {
allLockers := make([]dsync.NetLocker, s.setDriveCount*s.setCount) var allLockers []dsync.NetLocker
for i, lockers := range s.erasureLockers { lockEpSet := set.NewStringSet()
for j, locker := range lockers { for _, lockers := range s.erasureLockers {
allLockers[i*s.setDriveCount+j] = locker for _, locker := range lockers {
if locker == nil || !locker.IsOnline() {
// Skip any offline lockers.
continue
}
if lockEpSet.Contains(locker.String()) {
// Skip duplicate lockers.
continue
}
lockEpSet.Add(locker.String())
allLockers = append(allLockers, locker)
} }
} }
return allLockers return allLockers
@ -300,15 +311,8 @@ func (s *erasureSets) GetAllLockers() []dsync.NetLocker {
func (s *erasureSets) GetLockers(setIndex int) func() ([]dsync.NetLocker, string) { func (s *erasureSets) GetLockers(setIndex int) func() ([]dsync.NetLocker, string) {
return func() ([]dsync.NetLocker, string) { return func() ([]dsync.NetLocker, string) {
lockers := make([]dsync.NetLocker, s.setDriveCount) lockers := make([]dsync.NetLocker, len(s.erasureLockers[setIndex]))
copy(lockers, s.erasureLockers[setIndex]) copy(lockers, s.erasureLockers[setIndex])
sort.Slice(lockers, func(i, j int) bool {
// re-order lockers with affinity for
// - non-local lockers
// - online lockers
// are used first
return !lockers[i].IsLocal() && lockers[i].IsOnline()
})
return lockers, s.erasureLockOwner return lockers, s.erasureLockOwner
} }
} }
@ -386,14 +390,24 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
for i := 0; i < setCount; i++ { for i := 0; i < setCount; i++ {
s.erasureDisks[i] = make([]StorageAPI, setDriveCount) s.erasureDisks[i] = make([]StorageAPI, setDriveCount)
s.erasureLockers[i] = make([]dsync.NetLocker, setDriveCount) }
var erasureLockers = map[string]dsync.NetLocker{}
for _, endpoint := range endpoints {
if _, ok := erasureLockers[endpoint.Host]; !ok {
erasureLockers[endpoint.Host] = newLockAPI(endpoint)
}
} }
for i := 0; i < setCount; i++ { for i := 0; i < setCount; i++ {
var lockerEpSet = set.NewStringSet()
for j := 0; j < setDriveCount; j++ { for j := 0; j < setDriveCount; j++ {
endpoint := endpoints[i*setDriveCount+j] endpoint := endpoints[i*setDriveCount+j]
// Rely on endpoints list to initialize, init lockers and available disks. // Rely on endpoints list to initialize, init lockers and available disks.
s.erasureLockers[i][j] = newLockAPI(endpoint) if locker, ok := erasureLockers[endpoint.Host]; ok && !lockerEpSet.Contains(endpoint.Host) {
lockerEpSet.Add(endpoint.Host)
s.erasureLockers[i] = append(s.erasureLockers[i], locker)
}
disk := storageDisks[i*setDriveCount+j] disk := storageDisks[i*setDriveCount+j]
if disk == nil { if disk == nil {
continue continue

View File

@ -46,13 +46,12 @@ func isWriteLock(lri []lockRequesterInfo) bool {
// localLocker implements Dsync.NetLocker // localLocker implements Dsync.NetLocker
type localLocker struct { type localLocker struct {
mutex sync.Mutex mutex sync.Mutex
endpoint Endpoint lockMap map[string][]lockRequesterInfo
lockMap map[string][]lockRequesterInfo
} }
func (l *localLocker) String() string { func (l *localLocker) String() string {
return l.endpoint.String() return globalEndpoints.Localhost()
} }
func (l *localLocker) canTakeUnlock(resources ...string) bool { func (l *localLocker) canTakeUnlock(resources ...string) bool {
@ -287,9 +286,8 @@ func (l *localLocker) expireOldLocks(interval time.Duration) {
} }
} }
func newLocker(endpoint Endpoint) *localLocker { func newLocker() *localLocker {
return &localLocker{ return &localLocker{
endpoint: endpoint, lockMap: make(map[string][]lockRequesterInfo),
lockMap: make(map[string][]lockRequesterInfo),
} }
} }

View File

@ -34,7 +34,7 @@ import (
// lockRESTClient is authenticable lock REST client // lockRESTClient is authenticable lock REST client
type lockRESTClient struct { type lockRESTClient struct {
restClient *rest.Client restClient *rest.Client
endpoint Endpoint u *url.URL
} }
func toLockError(err error) error { func toLockError(err error) error {
@ -53,7 +53,7 @@ func toLockError(err error) error {
// String stringer *dsync.NetLocker* interface compatible method. // String stringer *dsync.NetLocker* interface compatible method.
func (client *lockRESTClient) String() string { func (client *lockRESTClient) String() string {
return client.endpoint.String() return client.u.String()
} }
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected // Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
@ -144,7 +144,7 @@ func (client *lockRESTClient) ForceUnlock(ctx context.Context, args dsync.LockAr
func newLockAPI(endpoint Endpoint) dsync.NetLocker { func newLockAPI(endpoint Endpoint) dsync.NetLocker {
if endpoint.IsLocal { if endpoint.IsLocal {
return globalLockServers[endpoint] return globalLockServer
} }
return newlockRESTClient(endpoint) return newlockRESTClient(endpoint)
} }
@ -154,7 +154,7 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient {
serverURL := &url.URL{ serverURL := &url.URL{
Scheme: endpoint.Scheme, Scheme: endpoint.Scheme,
Host: endpoint.Host, Host: endpoint.Host,
Path: pathJoin(lockRESTPrefix, endpoint.Path, lockRESTVersion), Path: pathJoin(lockRESTPrefix, lockRESTVersion),
} }
var tlsConfig *tls.Config var tlsConfig *tls.Config
@ -179,5 +179,11 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient {
return !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &ne) return !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &ne)
} }
return &lockRESTClient{endpoint: endpoint, restClient: restClient} return &lockRESTClient{
u: &url.URL{
Scheme: endpoint.Scheme,
Host: endpoint.Host,
},
restClient: restClient,
}
} }

View File

@ -21,7 +21,6 @@ import (
"context" "context"
"errors" "errors"
"net/http" "net/http"
"path"
"sort" "sort"
"strconv" "strconv"
"time" "time"
@ -264,37 +263,27 @@ func lockMaintenance(ctx context.Context) {
// Reset the timer for next cycle. // Reset the timer for next cycle.
lkTimer.Reset(lockMaintenanceInterval) lkTimer.Reset(lockMaintenanceInterval)
for _, lockServer := range globalLockServers { globalLockServer.expireOldLocks(lockValidityDuration)
lockServer.expireOldLocks(lockValidityDuration)
}
} }
} }
} }
// registerLockRESTHandlers - register lock rest router. // registerLockRESTHandlers - register lock rest router.
func registerLockRESTHandlers(router *mux.Router, endpointServerSets EndpointServerSets) { func registerLockRESTHandlers(router *mux.Router) {
for _, ep := range endpointServerSets { lockServer := &lockRESTServer{
for _, endpoint := range ep.Endpoints { ll: newLocker(),
if !endpoint.IsLocal {
continue
}
lockServer := &lockRESTServer{
ll: newLocker(endpoint),
}
subrouter := router.PathPrefix(path.Join(lockRESTPrefix, endpoint.Path)).Subrouter()
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodHealth).HandlerFunc(httpTraceHdrs(lockServer.HealthHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRefresh).HandlerFunc(httpTraceHdrs(lockServer.RefreshHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodForceUnlock).HandlerFunc(httpTraceHdrs(lockServer.ForceUnlockHandler))
globalLockServers[endpoint] = lockServer.ll
}
} }
subrouter := router.PathPrefix(lockRESTPrefix).Subrouter()
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodHealth).HandlerFunc(httpTraceHdrs(lockServer.HealthHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRefresh).HandlerFunc(httpTraceHdrs(lockServer.RefreshHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler))
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodForceUnlock).HandlerFunc(httpTraceHdrs(lockServer.ForceUnlockHandler))
globalLockServer = lockServer.ll
go lockMaintenance(GlobalContext) go lockMaintenance(GlobalContext)
} }

View File

@ -34,7 +34,7 @@ import (
) )
// local lock servers // local lock servers
var globalLockServers = make(map[Endpoint]*localLocker) var globalLockServer *localLocker
// RWLocker - locker interface to introduce GetRLock, RUnlock. // RWLocker - locker interface to introduce GetRLock, RUnlock.
type RWLocker interface { type RWLocker interface {

View File

@ -521,10 +521,7 @@ func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*Pe
} }
// Once we have received all the locks currently used from peers // Once we have received all the locks currently used from peers
// add the local peer locks list as well. // add the local peer locks list as well.
llockers := make(GetLocksResp, 0, len(globalLockServers)) llockers := GetLocksResp{globalLockServer.DupLockMap()}
for _, llocker := range globalLockServers {
llockers = append(llockers, llocker.DupLockMap())
}
locksResp = append(locksResp, &PeerLocks{ locksResp = append(locksResp, &PeerLocks{
Addr: getHostName(r), Addr: getHostName(r),
Locks: llockers, Locks: llockers,

View File

@ -48,10 +48,7 @@ func (s *peerRESTServer) GetLocksHandler(w http.ResponseWriter, r *http.Request)
ctx := newContext(r, w, "GetLocks") ctx := newContext(r, w, "GetLocks")
llockers := make(GetLocksResp, 0, len(globalLockServers)) llockers := GetLocksResp{globalLockServer.DupLockMap()}
for _, llocker := range globalLockServers {
llockers = append(llockers, llocker.DupLockMap())
}
logger.LogIf(ctx, gob.NewEncoder(w).Encode(llockers)) logger.LogIf(ctx, gob.NewEncoder(w).Encode(llockers))
w.(http.Flusher).Flush() w.(http.Flusher).Flush()

View File

@ -34,7 +34,7 @@ func registerDistErasureRouters(router *mux.Router, endpointServerSets EndpointS
registerBootstrapRESTHandlers(router) registerBootstrapRESTHandlers(router)
// Register distributed namespace lock routers. // Register distributed namespace lock routers.
registerLockRESTHandlers(router, endpointServerSets) registerLockRESTHandlers(router)
} }
// List of some generic handlers which are applied for all incoming requests. // List of some generic handlers which are applied for all incoming requests.