diff --git a/Makefile b/Makefile index ff0455da2..3bf6489b6 100644 --- a/Makefile +++ b/Makefile @@ -57,7 +57,7 @@ test-race: verifiers build # Verify minio binary verify: @echo "Verifying build with race" - @GO111MODULE=on CGO_ENABLED=1 go build -race -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null + @GO111MODULE=on CGO_ENABLED=1 go build -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null @(env bash $(PWD)/buildscripts/verify-build.sh) # Verify healing of disks with minio binary diff --git a/buildscripts/verify-build.sh b/buildscripts/verify-build.sh index b392593ec..046ca757a 100755 --- a/buildscripts/verify-build.sh +++ b/buildscripts/verify-build.sh @@ -56,7 +56,8 @@ function start_minio_erasure() function start_minio_erasure_sets() { - "${MINIO[@]}" server "${WORK_DIR}/erasure-disk-sets{1...32}" >"$WORK_DIR/erasure-minio-sets.log" 2>&1 & + export MINIO_ENDPOINTS="${WORK_DIR}/erasure-disk-sets{1...32}" + "${MINIO[@]}" server > "$WORK_DIR/erasure-minio-sets.log" 2>&1 & sleep 15 } @@ -64,9 +65,9 @@ function start_minio_pool_erasure_sets() { export MINIO_ACCESS_KEY=$ACCESS_KEY export MINIO_SECRET_KEY=$SECRET_KEY - - "${MINIO[@]}" server --address=:9000 "http://127.0.0.1:9000${WORK_DIR}/pool-disk-sets{1...4}" "http://127.0.0.1:9001${WORK_DIR}/pool-disk-sets{5...8}" >"$WORK_DIR/pool-minio-9000.log" 2>&1 & - "${MINIO[@]}" server --address=:9001 "http://127.0.0.1:9000${WORK_DIR}/pool-disk-sets{1...4}" "http://127.0.0.1:9001${WORK_DIR}/pool-disk-sets{5...8}" >"$WORK_DIR/pool-minio-9001.log" 2>&1 & + export MINIO_ENDPOINTS="http://127.0.0.1:9000${WORK_DIR}/pool-disk-sets{1...4} http://127.0.0.1:9001${WORK_DIR}/pool-disk-sets{5...8}" + "${MINIO[@]}" server --address ":9000" > "$WORK_DIR/pool-minio-9000.log" 2>&1 & + "${MINIO[@]}" server --address ":9001" > "$WORK_DIR/pool-minio-9001.log" 2>&1 & sleep 40 } @@ -75,9 +76,9 @@ function start_minio_pool_erasure_sets_ipv6() { export MINIO_ACCESS_KEY=$ACCESS_KEY export MINIO_SECRET_KEY=$SECRET_KEY - - "${MINIO[@]}" server --address="[::1]:9000" "http://[::1]:9000${WORK_DIR}/pool-disk-sets{1...4}" "http://[::1]:9001${WORK_DIR}/pool-disk-sets{5...8}" >"$WORK_DIR/pool-minio-ipv6-9000.log" 2>&1 & - "${MINIO[@]}" server --address="[::1]:9001" "http://[::1]:9000${WORK_DIR}/pool-disk-sets{1...4}" "http://[::1]:9001${WORK_DIR}/pool-disk-sets{5...8}" >"$WORK_DIR/pool-minio-ipv6-9001.log" 2>&1 & + export MINIO_ENDPOINTS="http://[::1]:9000${WORK_DIR}/pool-disk-sets{1...4} http://[::1]:9001${WORK_DIR}/pool-disk-sets{5...8}" + "${MINIO[@]}" server --address="[::1]:9000" > "$WORK_DIR/pool-minio-ipv6-9000.log" 2>&1 & + "${MINIO[@]}" server --address="[::1]:9001" > "$WORK_DIR/pool-minio-ipv6-9001.log" 2>&1 & sleep 40 } @@ -86,10 +87,10 @@ function start_minio_dist_erasure() { export MINIO_ACCESS_KEY=$ACCESS_KEY export MINIO_SECRET_KEY=$SECRET_KEY - "${MINIO[@]}" server --address=:9000 "http://127.0.0.1:9000${WORK_DIR}/dist-disk1" "http://127.0.0.1:9001${WORK_DIR}/dist-disk2" "http://127.0.0.1:9002${WORK_DIR}/dist-disk3" "http://127.0.0.1:9003${WORK_DIR}/dist-disk4" >"$WORK_DIR/dist-minio-9000.log" 2>&1 & - "${MINIO[@]}" server --address=:9001 "http://127.0.0.1:9000${WORK_DIR}/dist-disk1" "http://127.0.0.1:9001${WORK_DIR}/dist-disk2" "http://127.0.0.1:9002${WORK_DIR}/dist-disk3" "http://127.0.0.1:9003${WORK_DIR}/dist-disk4" >"$WORK_DIR/dist-minio-9001.log" 2>&1 & - "${MINIO[@]}" server --address=:9002 "http://127.0.0.1:9000${WORK_DIR}/dist-disk1" "http://127.0.0.1:9001${WORK_DIR}/dist-disk2" "http://127.0.0.1:9002${WORK_DIR}/dist-disk3" "http://127.0.0.1:9003${WORK_DIR}/dist-disk4" >"$WORK_DIR/dist-minio-9002.log" 2>&1 & - "${MINIO[@]}" server --address=:9003 "http://127.0.0.1:9000${WORK_DIR}/dist-disk1" "http://127.0.0.1:9001${WORK_DIR}/dist-disk2" "http://127.0.0.1:9002${WORK_DIR}/dist-disk3" "http://127.0.0.1:9003${WORK_DIR}/dist-disk4" >"$WORK_DIR/dist-minio-9003.log" 2>&1 & + export MINIO_ENDPOINTS="http://127.0.0.1:9000${WORK_DIR}/dist-disk1 http://127.0.0.1:9001${WORK_DIR}/dist-disk2 http://127.0.0.1:9002${WORK_DIR}/dist-disk3 http://127.0.0.1:9003${WORK_DIR}/dist-disk4" + for i in $(seq 0 3); do + "${MINIO[@]}" server --address ":900${i}" > "$WORK_DIR/dist-minio-900${i}.log" 2>&1 & + done sleep 40 } @@ -112,7 +113,8 @@ function run_test_fs() return "$rv" } -function run_test_erasure_sets() { +function run_test_erasure_sets() +{ start_minio_erasure_sets (cd "$WORK_DIR" && "$FUNCTIONAL_TESTS") @@ -220,7 +222,7 @@ function run_test_dist_erasure() rm -f "$WORK_DIR/dist-minio-9000.log" "$WORK_DIR/dist-minio-9001.log" "$WORK_DIR/dist-minio-9002.log" "$WORK_DIR/dist-minio-9003.log" - return "$rv" + return "$rv" } function purge() diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 6b06b122d..95bff82d1 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -373,14 +373,12 @@ func topLockEntries(peerLocks []*PeerLocks, stale bool) madmin.LockEntries { if peerLock == nil { continue } - for _, locks := range peerLock.Locks { - for k, v := range locks { - for _, lockReqInfo := range v { - if val, ok := entryMap[lockReqInfo.UID]; ok { - val.ServerList = append(val.ServerList, peerLock.Addr) - } else { - entryMap[lockReqInfo.UID] = lriToLockEntry(lockReqInfo, k, peerLock.Addr) - } + for k, v := range peerLock.Locks { + for _, lockReqInfo := range v { + if val, ok := entryMap[lockReqInfo.UID]; ok { + val.ServerList = append(val.ServerList, peerLock.Addr) + } else { + entryMap[lockReqInfo.UID] = lriToLockEntry(lockReqInfo, k, peerLock.Addr) } } } @@ -402,7 +400,7 @@ func topLockEntries(peerLocks []*PeerLocks, stale bool) madmin.LockEntries { // PeerLocks holds server information result of one node type PeerLocks struct { Addr string - Locks GetLocksResp + Locks map[string][]lockRequesterInfo } // TopLocksHandler Get list of locks in use diff --git a/cmd/endpoint.go b/cmd/endpoint.go index f7ef9ec3c..bfc96d944 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -242,6 +242,22 @@ func (l *EndpointServerPools) Add(zeps ZoneEndpoints) error { return nil } +// Localhost - returns the local hostname from list of endpoints +func (l EndpointServerPools) Localhost() string { + for _, ep := range l { + for _, endpoint := range ep.Endpoints { + if endpoint.IsLocal { + u := &url.URL{ + Scheme: endpoint.Scheme, + Host: endpoint.Host, + } + return u.String() + } + } + } + return "" +} + // FirstLocal returns true if the first endpoint is local. func (l EndpointServerPools) FirstLocal() bool { return l[0].Endpoints[0].IsLocal diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index f416f0dc3..8916d207f 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -89,27 +89,12 @@ func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir stri } // Clean-up the old multipart uploads. Should be run in a Go routine. -func (er erasureObjects) cleanupStaleUploads(ctx context.Context, cleanupInterval, expiry time.Duration) { - ticker := time.NewTicker(cleanupInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - var disk StorageAPI - // run multiple cleanup's local to this server. - for _, d := range er.getLoadBalancedLocalDisks() { - if d != nil { - disk = d - break - } - } - if disk == nil { - continue - } +func (er erasureObjects) cleanupStaleUploads(ctx context.Context, expiry time.Duration) { + // run multiple cleanup's local to this server. + for _, disk := range er.getLoadBalancedLocalDisks() { + if disk != nil { er.cleanupStaleUploadsOnDisk(ctx, disk, expiry) + return } } } diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 2b3ef75ca..8d6e2a691 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -30,6 +30,7 @@ import ( "github.com/dchest/siphash" "github.com/google/uuid" + "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/bpool" @@ -278,10 +279,20 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt // GetAllLockers return a list of all lockers for all sets. func (s *erasureSets) GetAllLockers() []dsync.NetLocker { - allLockers := make([]dsync.NetLocker, s.setDriveCount*s.setCount) - for i, lockers := range s.erasureLockers { - for j, locker := range lockers { - allLockers[i*s.setDriveCount+j] = locker + var allLockers []dsync.NetLocker + lockEpSet := set.NewStringSet() + for _, lockers := range s.erasureLockers { + for _, locker := range lockers { + if locker == nil || !locker.IsOnline() { + // Skip any offline lockers. + continue + } + if lockEpSet.Contains(locker.String()) { + // Skip duplicate lockers. + continue + } + lockEpSet.Add(locker.String()) + allLockers = append(allLockers, locker) } } return allLockers @@ -289,15 +300,8 @@ func (s *erasureSets) GetAllLockers() []dsync.NetLocker { func (s *erasureSets) GetLockers(setIndex int) func() ([]dsync.NetLocker, string) { return func() ([]dsync.NetLocker, string) { - lockers := make([]dsync.NetLocker, s.setDriveCount) + lockers := make([]dsync.NetLocker, len(s.erasureLockers[setIndex])) copy(lockers, s.erasureLockers[setIndex]) - sort.Slice(lockers, func(i, j int) bool { - // re-order lockers with affinity for - // - non-local lockers - // - online lockers - // are used first - return !lockers[i].IsLocal() && lockers[i].IsOnline() - }) return lockers, s.erasureLockOwner } } @@ -362,14 +366,24 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto for i := 0; i < setCount; i++ { s.erasureDisks[i] = make([]StorageAPI, setDriveCount) - s.erasureLockers[i] = make([]dsync.NetLocker, setDriveCount) + } + + var erasureLockers = map[string]dsync.NetLocker{} + for _, endpoint := range endpoints { + if _, ok := erasureLockers[endpoint.Host]; !ok { + erasureLockers[endpoint.Host] = newLockAPI(endpoint) + } } for i := 0; i < setCount; i++ { + var lockerEpSet = set.NewStringSet() for j := 0; j < setDriveCount; j++ { endpoint := endpoints[i*setDriveCount+j] - // Rely on endpoints list to initialize, init lockers and available disks. - s.erasureLockers[i][j] = newLockAPI(endpoint) + // Only add lockers only one per endpoint and per erasure set. + if locker, ok := erasureLockers[endpoint.Host]; ok && !lockerEpSet.Contains(endpoint.Host) { + lockerEpSet.Add(endpoint.Host) + s.erasureLockers[i] = append(s.erasureLockers[i], locker) + } disk := storageDisks[i*setDriveCount+j] if disk == nil { continue @@ -396,11 +410,11 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto bp: bp, mrfOpCh: make(chan partialOperation, 10000), } - - go s.sets[i].cleanupStaleUploads(ctx, - GlobalStaleUploadsCleanupInterval, GlobalStaleUploadsExpiry) } + // start cleanup stale uploads go-routine. + go s.cleanupStaleUploads(ctx, GlobalStaleUploadsCleanupInterval, GlobalStaleUploadsExpiry) + // Start the disk monitoring and connect routine. go s.monitorAndConnectEndpoints(ctx, defaultMonitorConnectEndpointInterval) go s.maintainMRFList() @@ -409,6 +423,22 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto return s, nil } +func (s *erasureSets) cleanupStaleUploads(ctx context.Context, cleanupInterval, expiry time.Duration) { + ticker := time.NewTicker(cleanupInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + for _, set := range s.sets { + set.cleanupStaleUploads(ctx, expiry) + } + } + } +} + // NewNSLock - initialize a new namespace RWLocker instance. func (s *erasureSets) NewNSLock(bucket string, objects ...string) RWLocker { if len(objects) == 1 { diff --git a/cmd/local-locker.go b/cmd/local-locker.go index f1ff8199c..8bdc51c5e 100644 --- a/cmd/local-locker.go +++ b/cmd/local-locker.go @@ -46,13 +46,12 @@ func isWriteLock(lri []lockRequesterInfo) bool { // localLocker implements Dsync.NetLocker type localLocker struct { - mutex sync.Mutex - endpoint Endpoint - lockMap map[string][]lockRequesterInfo + mutex sync.Mutex + lockMap map[string][]lockRequesterInfo } func (l *localLocker) String() string { - return l.endpoint.String() + return globalEndpoints.Localhost() } func (l *localLocker) canTakeUnlock(resources ...string) bool { @@ -194,7 +193,7 @@ func (l *localLocker) DupLockMap() map[string][]lockRequesterInfo { l.mutex.Lock() defer l.mutex.Unlock() - lockCopy := make(map[string][]lockRequesterInfo) + lockCopy := map[string][]lockRequesterInfo{} for k, v := range l.lockMap { lockCopy[k] = append(lockCopy[k], v...) } @@ -253,9 +252,8 @@ func (l *localLocker) removeEntryIfExists(nlrip nameLockRequesterInfoPair) { } } -func newLocker(endpoint Endpoint) *localLocker { +func newLocker() *localLocker { return &localLocker{ - endpoint: endpoint, - lockMap: make(map[string][]lockRequesterInfo), + lockMap: make(map[string][]lockRequesterInfo), } } diff --git a/cmd/lock-rest-client.go b/cmd/lock-rest-client.go index 897afcdab..1a4747e6c 100644 --- a/cmd/lock-rest-client.go +++ b/cmd/lock-rest-client.go @@ -32,7 +32,7 @@ import ( // lockRESTClient is authenticable lock REST client type lockRESTClient struct { restClient *rest.Client - endpoint Endpoint + u *url.URL } func toLockError(err error) error { @@ -51,7 +51,7 @@ func toLockError(err error) error { // String stringer *dsync.NetLocker* interface compatible method. func (client *lockRESTClient) String() string { - return client.endpoint.String() + return client.u.String() } // Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected @@ -137,7 +137,7 @@ func (client *lockRESTClient) Expired(ctx context.Context, args dsync.LockArgs) func newLockAPI(endpoint Endpoint) dsync.NetLocker { if endpoint.IsLocal { - return globalLockServers[endpoint] + return globalLockServer } return newlockRESTClient(endpoint) } @@ -147,7 +147,7 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient { serverURL := &url.URL{ Scheme: endpoint.Scheme, Host: endpoint.Host, - Path: pathJoin(lockRESTPrefix, endpoint.Path, lockRESTVersion), + Path: pathJoin(lockRESTPrefix, lockRESTVersion), } restClient := rest.NewClient(serverURL, globalInternodeTransport, newAuthToken) @@ -163,5 +163,8 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient { return !isNetworkError(err) } - return &lockRESTClient{endpoint: endpoint, restClient: restClient} + return &lockRESTClient{u: &url.URL{ + Scheme: endpoint.Scheme, + Host: endpoint.Host, + }, restClient: restClient} } diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index 36942fa50..b808bfbf3 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -22,7 +22,6 @@ import ( "errors" "math/rand" "net/http" - "path" "sort" "strconv" "time" @@ -217,27 +216,23 @@ type nameLockRequesterInfoPair struct { // getLongLivedLocks returns locks that are older than a certain time and // have not been 'checked' for validity too soon enough -func getLongLivedLocks(interval time.Duration) map[Endpoint][]nameLockRequesterInfoPair { - nlripMap := make(map[Endpoint][]nameLockRequesterInfoPair) - for endpoint, locker := range globalLockServers { - rslt := []nameLockRequesterInfoPair{} - locker.mutex.Lock() - for name, lriArray := range locker.lockMap { - for idx := range lriArray { - // Check whether enough time has gone by since last check - if time.Since(lriArray[idx].TimeLastCheck) >= interval { - rslt = append(rslt, nameLockRequesterInfoPair{ - name: name, - lri: lriArray[idx], - }) - lriArray[idx].TimeLastCheck = UTCNow() - } +func getLongLivedLocks(interval time.Duration) []nameLockRequesterInfoPair { + nlrip := []nameLockRequesterInfoPair{} + globalLockServer.mutex.Lock() + for name, lriArray := range globalLockServer.lockMap { + for idx := range lriArray { + // Check whether enough time has gone by since last check + if time.Since(lriArray[idx].TimeLastCheck) >= interval { + nlrip = append(nlrip, nameLockRequesterInfoPair{ + name: name, + lri: lriArray[idx], + }) + lriArray[idx].TimeLastCheck = UTCNow() } } - nlripMap[endpoint] = rslt - locker.mutex.Unlock() } - return nlripMap + globalLockServer.mutex.Unlock() + return nlrip } // lockMaintenance loops over locks that have been active for some time and checks back @@ -277,44 +272,36 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error { } } - allLockersFn := z.GetAllLockers - // Validate if long lived locks are indeed clean. // Get list of long lived locks to check for staleness. - for lendpoint, nlrips := range getLongLivedLocks(interval) { - nlripsMap := make(map[string]nlock, len(nlrips)) - for _, nlrip := range nlrips { - for _, c := range allLockersFn() { - if !c.IsOnline() || c == nil { - continue - } + nlrips := getLongLivedLocks(interval) + nlripsMap := make(map[string]nlock, len(nlrips)) + for _, nlrip := range nlrips { + for _, c := range z.GetAllLockers() { + ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second) - ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second) - - // Call back to original server verify whether the lock is - // still active (based on name & uid) - expired, err := c.Expired(ctx, dsync.LockArgs{ - Owner: nlrip.lri.Owner, - UID: nlrip.lri.UID, - Resources: []string{nlrip.name}, - }) - cancel() - if err != nil { - updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer) - continue - } - - if !expired { - updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer) - } + // Call back to original server verify whether the lock is + // still active (based on name & uid) + expired, err := c.Expired(ctx, dsync.LockArgs{ + Owner: nlrip.lri.Owner, + UID: nlrip.lri.UID, + Resources: []string{nlrip.name}, + }) + cancel() + if err != nil { + updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer) + continue } - // less than the quorum, we have locks expired. - if nlripsMap[nlrip.name].locks < nlrip.lri.Quorum { - // Purge the stale entry if it exists. - globalLockServers[lendpoint].removeEntryIfExists(nlrip) + if !expired { + updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer) } + } + // less than the quorum, we have locks expired. + if nlripsMap[nlrip.name].locks < nlrip.lri.Quorum { + // Purge the stale entry if it exists. + globalLockServer.removeEntryIfExists(nlrip) } } @@ -361,28 +348,20 @@ func startLockMaintenance(ctx context.Context) { } // registerLockRESTHandlers - register lock rest router. -func registerLockRESTHandlers(router *mux.Router, endpointServerPools EndpointServerPools) { - for _, ep := range endpointServerPools { - for _, endpoint := range ep.Endpoints { - if !endpoint.IsLocal { - continue - } - - lockServer := &lockRESTServer{ - ll: newLocker(endpoint), - } - - subrouter := router.PathPrefix(path.Join(lockRESTPrefix, endpoint.Path)).Subrouter() - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodHealth).HandlerFunc(httpTraceHdrs(lockServer.HealthHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodExpired).HandlerFunc(httpTraceAll(lockServer.ExpiredHandler)) - - globalLockServers[endpoint] = lockServer.ll - } +func registerLockRESTHandlers(router *mux.Router) { + lockServer := &lockRESTServer{ + ll: newLocker(), } + subrouter := router.PathPrefix(lockRESTPrefix).Subrouter() + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodHealth).HandlerFunc(httpTraceHdrs(lockServer.HealthHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodExpired).HandlerFunc(httpTraceAll(lockServer.ExpiredHandler)) + + globalLockServer = lockServer.ll + go startLockMaintenance(GlobalContext) } diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index ede34793f..79adb366a 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -34,7 +34,7 @@ import ( ) // local lock servers -var globalLockServers = make(map[Endpoint]*localLocker) +var globalLockServer *localLocker // RWLocker - locker interface to introduce GetRLock, RUnlock. type RWLocker interface { diff --git a/cmd/notification.go b/cmd/notification.go index 528831b68..951c9e87f 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -639,15 +639,9 @@ func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*Pe ctx := logger.SetReqInfo(ctx, reqInfo) logger.LogOnceIf(ctx, err, sys.peerClients[index].host.String()) } - // Once we have received all the locks currently used from peers - // add the local peer locks list as well. - llockers := make(GetLocksResp, 0, len(globalLockServers)) - for _, llocker := range globalLockServers { - llockers = append(llockers, llocker.DupLockMap()) - } locksResp = append(locksResp, &PeerLocks{ Addr: getHostName(r), - Locks: llockers, + Locks: globalLockServer.DupLockMap(), }) return locksResp } diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 55232564d..27314a248 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -84,18 +84,16 @@ func (client *peerRESTClient) Close() error { return nil } -// GetLocksResp stores various info from the client for each lock that is requested. -type GetLocksResp []map[string][]lockRequesterInfo - // GetLocks - fetch older locks for a remote node. -func (client *peerRESTClient) GetLocks() (locks GetLocksResp, err error) { +func (client *peerRESTClient) GetLocks() (lockMap map[string][]lockRequesterInfo, err error) { respBody, err := client.call(peerRESTMethodGetLocks, nil, nil, -1) if err != nil { return } + lockMap = map[string][]lockRequesterInfo{} defer http.DrainBody(respBody) - err = gob.NewDecoder(respBody).Decode(&locks) - return locks, err + err = gob.NewDecoder(respBody).Decode(&lockMap) + return lockMap, err } // ServerInfo - fetch server information for a remote node. diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index ea21774cd..bc9b60d77 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -48,12 +48,7 @@ func (s *peerRESTServer) GetLocksHandler(w http.ResponseWriter, r *http.Request) } ctx := newContext(r, w, "GetLocks") - - llockers := make(GetLocksResp, 0, len(globalLockServers)) - for _, llocker := range globalLockServers { - llockers = append(llockers, llocker.DupLockMap()) - } - logger.LogIf(ctx, gob.NewEncoder(w).Encode(llockers)) + logger.LogIf(ctx, gob.NewEncoder(w).Encode(globalLockServer.DupLockMap())) w.(http.Flusher).Flush() diff --git a/cmd/routers.go b/cmd/routers.go index 9cd4f2631..8c993c8e3 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -34,7 +34,7 @@ func registerDistErasureRouters(router *mux.Router, endpointServerPools Endpoint registerBootstrapRESTHandlers(router) // Register distributed namespace lock routers. - registerLockRESTHandlers(router, endpointServerPools) + registerLockRESTHandlers(router) } // List of some generic handlers which are applied for all incoming requests. diff --git a/pkg/argon2/argon2.go b/pkg/argon2/argon2.go index 308cab532..f823c5ad3 100644 --- a/pkg/argon2/argon2.go +++ b/pkg/argon2/argon2.go @@ -130,21 +130,21 @@ func NewIDKey(time, memory uint32, threads uint8) func([]byte, []byte, []byte, [ pool := sync.Pool{ New: func() interface{} { b := make([]block, memory) - return b + return &b }, } return func(password, salt, secret, data []byte, keyLen uint32) []byte { - B := pool.Get().([]block) + B := pool.Get().(*[]block) defer func() { - clearBlocks(B) + clearBlocks(*B) pool.Put(B) }() h0 := initHash(password, salt, secret, data, time, hashMemory, uint32(threads), keyLen, argon2id) - B = initBlocks(&h0, B, uint32(threads)) - processBlocks(B, time, memory, uint32(threads), argon2id) - return extractKey(B, memory, uint32(threads), keyLen) + B1 := initBlocks(&h0, *B, uint32(threads)) + processBlocks(B1, time, memory, uint32(threads), argon2id) + return extractKey(B1, memory, uint32(threads), keyLen) } } diff --git a/pkg/dsync/drwmutex.go b/pkg/dsync/drwmutex.go index e27542a98..822ee63a5 100644 --- a/pkg/dsync/drwmutex.go +++ b/pkg/dsync/drwmutex.go @@ -299,7 +299,9 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is quorumLocked := checkQuorumLocked(locks, quorum) && locksFailed <= tolerance if !quorumLocked { log("Releasing all acquired locks now abandoned after quorum was not met\n") - releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...) + if !releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...) { + log("Unable to release acquired locks, stale locks might be present\n") + } } // We may have some unused results in ch, release them async. @@ -308,11 +310,10 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is close(ch) for grantToBeReleased := range ch { if grantToBeReleased.isLocked() { - // release lock + // release abandoned lock log("Releasing abandoned lock\n") sendRelease(ds, restClnts[grantToBeReleased.index], - owner, - grantToBeReleased.lockUID, isReadLock, lockNames...) + owner, grantToBeReleased.lockUID, isReadLock, lockNames...) } } }() diff --git a/pkg/madmin/encrypt.go b/pkg/madmin/encrypt.go index 1e582d1ed..debc42150 100644 --- a/pkg/madmin/encrypt.go +++ b/pkg/madmin/encrypt.go @@ -28,7 +28,11 @@ import ( "github.com/secure-io/sio-go/sioutil" ) -var idKey = argon2.NewIDKey(1, 64*1024, 4) +var idKey func([]byte, []byte, []byte, []byte, uint32) []byte + +func init() { + idKey = argon2.NewIDKey(1, 64*1024, 4) +} // EncryptData encrypts the data with an unique key // derived from password using the Argon2id PBKDF.