mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
fix: refactor locks to apply them uniquely per node (#11052)
This refactor is done for few reasons below - to avoid deadlocks in scenarios when number of nodes are smaller < actual erasure stripe count where in N participating local lockers can lead to deadlocks across systems. - avoids expiry routines to run 1000 of separate network operations and routes per disk where as each of them are still accessing one single local entity. - it is ideal to have since globalLockServer per instance. - In a 32node deployment however, each server group is still concentrated towards the same set of lockers that partipicate during the write/read phase, unlike previous minio/dsync implementation - this potentially avoids send 32 requests instead we will still send at max requests of unique nodes participating in a write/read phase. - reduces overall chattiness on smaller setups.
This commit is contained in:
parent
97856bfebf
commit
4550ac6fff
2
Makefile
2
Makefile
@ -57,7 +57,7 @@ test-race: verifiers build
|
||||
# Verify minio binary
|
||||
verify:
|
||||
@echo "Verifying build with race"
|
||||
@GO111MODULE=on CGO_ENABLED=1 go build -race -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null
|
||||
@GO111MODULE=on CGO_ENABLED=1 go build -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null
|
||||
@(env bash $(PWD)/buildscripts/verify-build.sh)
|
||||
|
||||
# Verify healing of disks with minio binary
|
||||
|
@ -56,7 +56,8 @@ function start_minio_erasure()
|
||||
|
||||
function start_minio_erasure_sets()
|
||||
{
|
||||
"${MINIO[@]}" server "${WORK_DIR}/erasure-disk-sets{1...32}" >"$WORK_DIR/erasure-minio-sets.log" 2>&1 &
|
||||
export MINIO_ENDPOINTS="${WORK_DIR}/erasure-disk-sets{1...32}"
|
||||
"${MINIO[@]}" server > "$WORK_DIR/erasure-minio-sets.log" 2>&1 &
|
||||
sleep 15
|
||||
}
|
||||
|
||||
@ -64,9 +65,9 @@ function start_minio_pool_erasure_sets()
|
||||
{
|
||||
export MINIO_ACCESS_KEY=$ACCESS_KEY
|
||||
export MINIO_SECRET_KEY=$SECRET_KEY
|
||||
|
||||
"${MINIO[@]}" server --address=:9000 "http://127.0.0.1:9000${WORK_DIR}/pool-disk-sets{1...4}" "http://127.0.0.1:9001${WORK_DIR}/pool-disk-sets{5...8}" >"$WORK_DIR/pool-minio-9000.log" 2>&1 &
|
||||
"${MINIO[@]}" server --address=:9001 "http://127.0.0.1:9000${WORK_DIR}/pool-disk-sets{1...4}" "http://127.0.0.1:9001${WORK_DIR}/pool-disk-sets{5...8}" >"$WORK_DIR/pool-minio-9001.log" 2>&1 &
|
||||
export MINIO_ENDPOINTS="http://127.0.0.1:9000${WORK_DIR}/pool-disk-sets{1...4} http://127.0.0.1:9001${WORK_DIR}/pool-disk-sets{5...8}"
|
||||
"${MINIO[@]}" server --address ":9000" > "$WORK_DIR/pool-minio-9000.log" 2>&1 &
|
||||
"${MINIO[@]}" server --address ":9001" > "$WORK_DIR/pool-minio-9001.log" 2>&1 &
|
||||
|
||||
sleep 40
|
||||
}
|
||||
@ -75,9 +76,9 @@ function start_minio_pool_erasure_sets_ipv6()
|
||||
{
|
||||
export MINIO_ACCESS_KEY=$ACCESS_KEY
|
||||
export MINIO_SECRET_KEY=$SECRET_KEY
|
||||
|
||||
"${MINIO[@]}" server --address="[::1]:9000" "http://[::1]:9000${WORK_DIR}/pool-disk-sets{1...4}" "http://[::1]:9001${WORK_DIR}/pool-disk-sets{5...8}" >"$WORK_DIR/pool-minio-ipv6-9000.log" 2>&1 &
|
||||
"${MINIO[@]}" server --address="[::1]:9001" "http://[::1]:9000${WORK_DIR}/pool-disk-sets{1...4}" "http://[::1]:9001${WORK_DIR}/pool-disk-sets{5...8}" >"$WORK_DIR/pool-minio-ipv6-9001.log" 2>&1 &
|
||||
export MINIO_ENDPOINTS="http://[::1]:9000${WORK_DIR}/pool-disk-sets{1...4} http://[::1]:9001${WORK_DIR}/pool-disk-sets{5...8}"
|
||||
"${MINIO[@]}" server --address="[::1]:9000" > "$WORK_DIR/pool-minio-ipv6-9000.log" 2>&1 &
|
||||
"${MINIO[@]}" server --address="[::1]:9001" > "$WORK_DIR/pool-minio-ipv6-9001.log" 2>&1 &
|
||||
|
||||
sleep 40
|
||||
}
|
||||
@ -86,10 +87,10 @@ function start_minio_dist_erasure()
|
||||
{
|
||||
export MINIO_ACCESS_KEY=$ACCESS_KEY
|
||||
export MINIO_SECRET_KEY=$SECRET_KEY
|
||||
"${MINIO[@]}" server --address=:9000 "http://127.0.0.1:9000${WORK_DIR}/dist-disk1" "http://127.0.0.1:9001${WORK_DIR}/dist-disk2" "http://127.0.0.1:9002${WORK_DIR}/dist-disk3" "http://127.0.0.1:9003${WORK_DIR}/dist-disk4" >"$WORK_DIR/dist-minio-9000.log" 2>&1 &
|
||||
"${MINIO[@]}" server --address=:9001 "http://127.0.0.1:9000${WORK_DIR}/dist-disk1" "http://127.0.0.1:9001${WORK_DIR}/dist-disk2" "http://127.0.0.1:9002${WORK_DIR}/dist-disk3" "http://127.0.0.1:9003${WORK_DIR}/dist-disk4" >"$WORK_DIR/dist-minio-9001.log" 2>&1 &
|
||||
"${MINIO[@]}" server --address=:9002 "http://127.0.0.1:9000${WORK_DIR}/dist-disk1" "http://127.0.0.1:9001${WORK_DIR}/dist-disk2" "http://127.0.0.1:9002${WORK_DIR}/dist-disk3" "http://127.0.0.1:9003${WORK_DIR}/dist-disk4" >"$WORK_DIR/dist-minio-9002.log" 2>&1 &
|
||||
"${MINIO[@]}" server --address=:9003 "http://127.0.0.1:9000${WORK_DIR}/dist-disk1" "http://127.0.0.1:9001${WORK_DIR}/dist-disk2" "http://127.0.0.1:9002${WORK_DIR}/dist-disk3" "http://127.0.0.1:9003${WORK_DIR}/dist-disk4" >"$WORK_DIR/dist-minio-9003.log" 2>&1 &
|
||||
export MINIO_ENDPOINTS="http://127.0.0.1:9000${WORK_DIR}/dist-disk1 http://127.0.0.1:9001${WORK_DIR}/dist-disk2 http://127.0.0.1:9002${WORK_DIR}/dist-disk3 http://127.0.0.1:9003${WORK_DIR}/dist-disk4"
|
||||
for i in $(seq 0 3); do
|
||||
"${MINIO[@]}" server --address ":900${i}" > "$WORK_DIR/dist-minio-900${i}.log" 2>&1 &
|
||||
done
|
||||
|
||||
sleep 40
|
||||
}
|
||||
@ -112,7 +113,8 @@ function run_test_fs()
|
||||
return "$rv"
|
||||
}
|
||||
|
||||
function run_test_erasure_sets() {
|
||||
function run_test_erasure_sets()
|
||||
{
|
||||
start_minio_erasure_sets
|
||||
|
||||
(cd "$WORK_DIR" && "$FUNCTIONAL_TESTS")
|
||||
@ -220,7 +222,7 @@ function run_test_dist_erasure()
|
||||
|
||||
rm -f "$WORK_DIR/dist-minio-9000.log" "$WORK_DIR/dist-minio-9001.log" "$WORK_DIR/dist-minio-9002.log" "$WORK_DIR/dist-minio-9003.log"
|
||||
|
||||
return "$rv"
|
||||
return "$rv"
|
||||
}
|
||||
|
||||
function purge()
|
||||
|
@ -373,14 +373,12 @@ func topLockEntries(peerLocks []*PeerLocks, stale bool) madmin.LockEntries {
|
||||
if peerLock == nil {
|
||||
continue
|
||||
}
|
||||
for _, locks := range peerLock.Locks {
|
||||
for k, v := range locks {
|
||||
for _, lockReqInfo := range v {
|
||||
if val, ok := entryMap[lockReqInfo.UID]; ok {
|
||||
val.ServerList = append(val.ServerList, peerLock.Addr)
|
||||
} else {
|
||||
entryMap[lockReqInfo.UID] = lriToLockEntry(lockReqInfo, k, peerLock.Addr)
|
||||
}
|
||||
for k, v := range peerLock.Locks {
|
||||
for _, lockReqInfo := range v {
|
||||
if val, ok := entryMap[lockReqInfo.UID]; ok {
|
||||
val.ServerList = append(val.ServerList, peerLock.Addr)
|
||||
} else {
|
||||
entryMap[lockReqInfo.UID] = lriToLockEntry(lockReqInfo, k, peerLock.Addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -402,7 +400,7 @@ func topLockEntries(peerLocks []*PeerLocks, stale bool) madmin.LockEntries {
|
||||
// PeerLocks holds server information result of one node
|
||||
type PeerLocks struct {
|
||||
Addr string
|
||||
Locks GetLocksResp
|
||||
Locks map[string][]lockRequesterInfo
|
||||
}
|
||||
|
||||
// TopLocksHandler Get list of locks in use
|
||||
|
@ -242,6 +242,22 @@ func (l *EndpointServerPools) Add(zeps ZoneEndpoints) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Localhost - returns the local hostname from list of endpoints
|
||||
func (l EndpointServerPools) Localhost() string {
|
||||
for _, ep := range l {
|
||||
for _, endpoint := range ep.Endpoints {
|
||||
if endpoint.IsLocal {
|
||||
u := &url.URL{
|
||||
Scheme: endpoint.Scheme,
|
||||
Host: endpoint.Host,
|
||||
}
|
||||
return u.String()
|
||||
}
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// FirstLocal returns true if the first endpoint is local.
|
||||
func (l EndpointServerPools) FirstLocal() bool {
|
||||
return l[0].Endpoints[0].IsLocal
|
||||
|
@ -89,27 +89,12 @@ func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir stri
|
||||
}
|
||||
|
||||
// Clean-up the old multipart uploads. Should be run in a Go routine.
|
||||
func (er erasureObjects) cleanupStaleUploads(ctx context.Context, cleanupInterval, expiry time.Duration) {
|
||||
ticker := time.NewTicker(cleanupInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
var disk StorageAPI
|
||||
// run multiple cleanup's local to this server.
|
||||
for _, d := range er.getLoadBalancedLocalDisks() {
|
||||
if d != nil {
|
||||
disk = d
|
||||
break
|
||||
}
|
||||
}
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
func (er erasureObjects) cleanupStaleUploads(ctx context.Context, expiry time.Duration) {
|
||||
// run multiple cleanup's local to this server.
|
||||
for _, disk := range er.getLoadBalancedLocalDisks() {
|
||||
if disk != nil {
|
||||
er.cleanupStaleUploadsOnDisk(ctx, disk, expiry)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
|
||||
"github.com/dchest/siphash"
|
||||
"github.com/google/uuid"
|
||||
"github.com/minio/minio-go/v7/pkg/set"
|
||||
"github.com/minio/minio-go/v7/pkg/tags"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/bpool"
|
||||
@ -278,10 +279,20 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt
|
||||
|
||||
// GetAllLockers return a list of all lockers for all sets.
|
||||
func (s *erasureSets) GetAllLockers() []dsync.NetLocker {
|
||||
allLockers := make([]dsync.NetLocker, s.setDriveCount*s.setCount)
|
||||
for i, lockers := range s.erasureLockers {
|
||||
for j, locker := range lockers {
|
||||
allLockers[i*s.setDriveCount+j] = locker
|
||||
var allLockers []dsync.NetLocker
|
||||
lockEpSet := set.NewStringSet()
|
||||
for _, lockers := range s.erasureLockers {
|
||||
for _, locker := range lockers {
|
||||
if locker == nil || !locker.IsOnline() {
|
||||
// Skip any offline lockers.
|
||||
continue
|
||||
}
|
||||
if lockEpSet.Contains(locker.String()) {
|
||||
// Skip duplicate lockers.
|
||||
continue
|
||||
}
|
||||
lockEpSet.Add(locker.String())
|
||||
allLockers = append(allLockers, locker)
|
||||
}
|
||||
}
|
||||
return allLockers
|
||||
@ -289,15 +300,8 @@ func (s *erasureSets) GetAllLockers() []dsync.NetLocker {
|
||||
|
||||
func (s *erasureSets) GetLockers(setIndex int) func() ([]dsync.NetLocker, string) {
|
||||
return func() ([]dsync.NetLocker, string) {
|
||||
lockers := make([]dsync.NetLocker, s.setDriveCount)
|
||||
lockers := make([]dsync.NetLocker, len(s.erasureLockers[setIndex]))
|
||||
copy(lockers, s.erasureLockers[setIndex])
|
||||
sort.Slice(lockers, func(i, j int) bool {
|
||||
// re-order lockers with affinity for
|
||||
// - non-local lockers
|
||||
// - online lockers
|
||||
// are used first
|
||||
return !lockers[i].IsLocal() && lockers[i].IsOnline()
|
||||
})
|
||||
return lockers, s.erasureLockOwner
|
||||
}
|
||||
}
|
||||
@ -362,14 +366,24 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
|
||||
|
||||
for i := 0; i < setCount; i++ {
|
||||
s.erasureDisks[i] = make([]StorageAPI, setDriveCount)
|
||||
s.erasureLockers[i] = make([]dsync.NetLocker, setDriveCount)
|
||||
}
|
||||
|
||||
var erasureLockers = map[string]dsync.NetLocker{}
|
||||
for _, endpoint := range endpoints {
|
||||
if _, ok := erasureLockers[endpoint.Host]; !ok {
|
||||
erasureLockers[endpoint.Host] = newLockAPI(endpoint)
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < setCount; i++ {
|
||||
var lockerEpSet = set.NewStringSet()
|
||||
for j := 0; j < setDriveCount; j++ {
|
||||
endpoint := endpoints[i*setDriveCount+j]
|
||||
// Rely on endpoints list to initialize, init lockers and available disks.
|
||||
s.erasureLockers[i][j] = newLockAPI(endpoint)
|
||||
// Only add lockers only one per endpoint and per erasure set.
|
||||
if locker, ok := erasureLockers[endpoint.Host]; ok && !lockerEpSet.Contains(endpoint.Host) {
|
||||
lockerEpSet.Add(endpoint.Host)
|
||||
s.erasureLockers[i] = append(s.erasureLockers[i], locker)
|
||||
}
|
||||
disk := storageDisks[i*setDriveCount+j]
|
||||
if disk == nil {
|
||||
continue
|
||||
@ -396,11 +410,11 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
|
||||
bp: bp,
|
||||
mrfOpCh: make(chan partialOperation, 10000),
|
||||
}
|
||||
|
||||
go s.sets[i].cleanupStaleUploads(ctx,
|
||||
GlobalStaleUploadsCleanupInterval, GlobalStaleUploadsExpiry)
|
||||
}
|
||||
|
||||
// start cleanup stale uploads go-routine.
|
||||
go s.cleanupStaleUploads(ctx, GlobalStaleUploadsCleanupInterval, GlobalStaleUploadsExpiry)
|
||||
|
||||
// Start the disk monitoring and connect routine.
|
||||
go s.monitorAndConnectEndpoints(ctx, defaultMonitorConnectEndpointInterval)
|
||||
go s.maintainMRFList()
|
||||
@ -409,6 +423,22 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *erasureSets) cleanupStaleUploads(ctx context.Context, cleanupInterval, expiry time.Duration) {
|
||||
ticker := time.NewTicker(cleanupInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
for _, set := range s.sets {
|
||||
set.cleanupStaleUploads(ctx, expiry)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewNSLock - initialize a new namespace RWLocker instance.
|
||||
func (s *erasureSets) NewNSLock(bucket string, objects ...string) RWLocker {
|
||||
if len(objects) == 1 {
|
||||
|
@ -46,13 +46,12 @@ func isWriteLock(lri []lockRequesterInfo) bool {
|
||||
|
||||
// localLocker implements Dsync.NetLocker
|
||||
type localLocker struct {
|
||||
mutex sync.Mutex
|
||||
endpoint Endpoint
|
||||
lockMap map[string][]lockRequesterInfo
|
||||
mutex sync.Mutex
|
||||
lockMap map[string][]lockRequesterInfo
|
||||
}
|
||||
|
||||
func (l *localLocker) String() string {
|
||||
return l.endpoint.String()
|
||||
return globalEndpoints.Localhost()
|
||||
}
|
||||
|
||||
func (l *localLocker) canTakeUnlock(resources ...string) bool {
|
||||
@ -194,7 +193,7 @@ func (l *localLocker) DupLockMap() map[string][]lockRequesterInfo {
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
|
||||
lockCopy := make(map[string][]lockRequesterInfo)
|
||||
lockCopy := map[string][]lockRequesterInfo{}
|
||||
for k, v := range l.lockMap {
|
||||
lockCopy[k] = append(lockCopy[k], v...)
|
||||
}
|
||||
@ -253,9 +252,8 @@ func (l *localLocker) removeEntryIfExists(nlrip nameLockRequesterInfoPair) {
|
||||
}
|
||||
}
|
||||
|
||||
func newLocker(endpoint Endpoint) *localLocker {
|
||||
func newLocker() *localLocker {
|
||||
return &localLocker{
|
||||
endpoint: endpoint,
|
||||
lockMap: make(map[string][]lockRequesterInfo),
|
||||
lockMap: make(map[string][]lockRequesterInfo),
|
||||
}
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ import (
|
||||
// lockRESTClient is authenticable lock REST client
|
||||
type lockRESTClient struct {
|
||||
restClient *rest.Client
|
||||
endpoint Endpoint
|
||||
u *url.URL
|
||||
}
|
||||
|
||||
func toLockError(err error) error {
|
||||
@ -51,7 +51,7 @@ func toLockError(err error) error {
|
||||
|
||||
// String stringer *dsync.NetLocker* interface compatible method.
|
||||
func (client *lockRESTClient) String() string {
|
||||
return client.endpoint.String()
|
||||
return client.u.String()
|
||||
}
|
||||
|
||||
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
|
||||
@ -137,7 +137,7 @@ func (client *lockRESTClient) Expired(ctx context.Context, args dsync.LockArgs)
|
||||
|
||||
func newLockAPI(endpoint Endpoint) dsync.NetLocker {
|
||||
if endpoint.IsLocal {
|
||||
return globalLockServers[endpoint]
|
||||
return globalLockServer
|
||||
}
|
||||
return newlockRESTClient(endpoint)
|
||||
}
|
||||
@ -147,7 +147,7 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient {
|
||||
serverURL := &url.URL{
|
||||
Scheme: endpoint.Scheme,
|
||||
Host: endpoint.Host,
|
||||
Path: pathJoin(lockRESTPrefix, endpoint.Path, lockRESTVersion),
|
||||
Path: pathJoin(lockRESTPrefix, lockRESTVersion),
|
||||
}
|
||||
|
||||
restClient := rest.NewClient(serverURL, globalInternodeTransport, newAuthToken)
|
||||
@ -163,5 +163,8 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient {
|
||||
return !isNetworkError(err)
|
||||
}
|
||||
|
||||
return &lockRESTClient{endpoint: endpoint, restClient: restClient}
|
||||
return &lockRESTClient{u: &url.URL{
|
||||
Scheme: endpoint.Scheme,
|
||||
Host: endpoint.Host,
|
||||
}, restClient: restClient}
|
||||
}
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
"errors"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
@ -217,27 +216,23 @@ type nameLockRequesterInfoPair struct {
|
||||
|
||||
// getLongLivedLocks returns locks that are older than a certain time and
|
||||
// have not been 'checked' for validity too soon enough
|
||||
func getLongLivedLocks(interval time.Duration) map[Endpoint][]nameLockRequesterInfoPair {
|
||||
nlripMap := make(map[Endpoint][]nameLockRequesterInfoPair)
|
||||
for endpoint, locker := range globalLockServers {
|
||||
rslt := []nameLockRequesterInfoPair{}
|
||||
locker.mutex.Lock()
|
||||
for name, lriArray := range locker.lockMap {
|
||||
for idx := range lriArray {
|
||||
// Check whether enough time has gone by since last check
|
||||
if time.Since(lriArray[idx].TimeLastCheck) >= interval {
|
||||
rslt = append(rslt, nameLockRequesterInfoPair{
|
||||
name: name,
|
||||
lri: lriArray[idx],
|
||||
})
|
||||
lriArray[idx].TimeLastCheck = UTCNow()
|
||||
}
|
||||
func getLongLivedLocks(interval time.Duration) []nameLockRequesterInfoPair {
|
||||
nlrip := []nameLockRequesterInfoPair{}
|
||||
globalLockServer.mutex.Lock()
|
||||
for name, lriArray := range globalLockServer.lockMap {
|
||||
for idx := range lriArray {
|
||||
// Check whether enough time has gone by since last check
|
||||
if time.Since(lriArray[idx].TimeLastCheck) >= interval {
|
||||
nlrip = append(nlrip, nameLockRequesterInfoPair{
|
||||
name: name,
|
||||
lri: lriArray[idx],
|
||||
})
|
||||
lriArray[idx].TimeLastCheck = UTCNow()
|
||||
}
|
||||
}
|
||||
nlripMap[endpoint] = rslt
|
||||
locker.mutex.Unlock()
|
||||
}
|
||||
return nlripMap
|
||||
globalLockServer.mutex.Unlock()
|
||||
return nlrip
|
||||
}
|
||||
|
||||
// lockMaintenance loops over locks that have been active for some time and checks back
|
||||
@ -277,44 +272,36 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
|
||||
}
|
||||
}
|
||||
|
||||
allLockersFn := z.GetAllLockers
|
||||
|
||||
// Validate if long lived locks are indeed clean.
|
||||
// Get list of long lived locks to check for staleness.
|
||||
for lendpoint, nlrips := range getLongLivedLocks(interval) {
|
||||
nlripsMap := make(map[string]nlock, len(nlrips))
|
||||
for _, nlrip := range nlrips {
|
||||
for _, c := range allLockersFn() {
|
||||
if !c.IsOnline() || c == nil {
|
||||
continue
|
||||
}
|
||||
nlrips := getLongLivedLocks(interval)
|
||||
nlripsMap := make(map[string]nlock, len(nlrips))
|
||||
for _, nlrip := range nlrips {
|
||||
for _, c := range z.GetAllLockers() {
|
||||
ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second)
|
||||
|
||||
ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second)
|
||||
|
||||
// Call back to original server verify whether the lock is
|
||||
// still active (based on name & uid)
|
||||
expired, err := c.Expired(ctx, dsync.LockArgs{
|
||||
Owner: nlrip.lri.Owner,
|
||||
UID: nlrip.lri.UID,
|
||||
Resources: []string{nlrip.name},
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer)
|
||||
continue
|
||||
}
|
||||
|
||||
if !expired {
|
||||
updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer)
|
||||
}
|
||||
// Call back to original server verify whether the lock is
|
||||
// still active (based on name & uid)
|
||||
expired, err := c.Expired(ctx, dsync.LockArgs{
|
||||
Owner: nlrip.lri.Owner,
|
||||
UID: nlrip.lri.UID,
|
||||
Resources: []string{nlrip.name},
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer)
|
||||
continue
|
||||
}
|
||||
|
||||
// less than the quorum, we have locks expired.
|
||||
if nlripsMap[nlrip.name].locks < nlrip.lri.Quorum {
|
||||
// Purge the stale entry if it exists.
|
||||
globalLockServers[lendpoint].removeEntryIfExists(nlrip)
|
||||
if !expired {
|
||||
updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer)
|
||||
}
|
||||
}
|
||||
|
||||
// less than the quorum, we have locks expired.
|
||||
if nlripsMap[nlrip.name].locks < nlrip.lri.Quorum {
|
||||
// Purge the stale entry if it exists.
|
||||
globalLockServer.removeEntryIfExists(nlrip)
|
||||
}
|
||||
}
|
||||
|
||||
@ -361,28 +348,20 @@ func startLockMaintenance(ctx context.Context) {
|
||||
}
|
||||
|
||||
// registerLockRESTHandlers - register lock rest router.
|
||||
func registerLockRESTHandlers(router *mux.Router, endpointServerPools EndpointServerPools) {
|
||||
for _, ep := range endpointServerPools {
|
||||
for _, endpoint := range ep.Endpoints {
|
||||
if !endpoint.IsLocal {
|
||||
continue
|
||||
}
|
||||
|
||||
lockServer := &lockRESTServer{
|
||||
ll: newLocker(endpoint),
|
||||
}
|
||||
|
||||
subrouter := router.PathPrefix(path.Join(lockRESTPrefix, endpoint.Path)).Subrouter()
|
||||
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodHealth).HandlerFunc(httpTraceHdrs(lockServer.HealthHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodExpired).HandlerFunc(httpTraceAll(lockServer.ExpiredHandler))
|
||||
|
||||
globalLockServers[endpoint] = lockServer.ll
|
||||
}
|
||||
func registerLockRESTHandlers(router *mux.Router) {
|
||||
lockServer := &lockRESTServer{
|
||||
ll: newLocker(),
|
||||
}
|
||||
|
||||
subrouter := router.PathPrefix(lockRESTPrefix).Subrouter()
|
||||
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodHealth).HandlerFunc(httpTraceHdrs(lockServer.HealthHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodExpired).HandlerFunc(httpTraceAll(lockServer.ExpiredHandler))
|
||||
|
||||
globalLockServer = lockServer.ll
|
||||
|
||||
go startLockMaintenance(GlobalContext)
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ import (
|
||||
)
|
||||
|
||||
// local lock servers
|
||||
var globalLockServers = make(map[Endpoint]*localLocker)
|
||||
var globalLockServer *localLocker
|
||||
|
||||
// RWLocker - locker interface to introduce GetRLock, RUnlock.
|
||||
type RWLocker interface {
|
||||
|
@ -639,15 +639,9 @@ func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*Pe
|
||||
ctx := logger.SetReqInfo(ctx, reqInfo)
|
||||
logger.LogOnceIf(ctx, err, sys.peerClients[index].host.String())
|
||||
}
|
||||
// Once we have received all the locks currently used from peers
|
||||
// add the local peer locks list as well.
|
||||
llockers := make(GetLocksResp, 0, len(globalLockServers))
|
||||
for _, llocker := range globalLockServers {
|
||||
llockers = append(llockers, llocker.DupLockMap())
|
||||
}
|
||||
locksResp = append(locksResp, &PeerLocks{
|
||||
Addr: getHostName(r),
|
||||
Locks: llockers,
|
||||
Locks: globalLockServer.DupLockMap(),
|
||||
})
|
||||
return locksResp
|
||||
}
|
||||
|
@ -84,18 +84,16 @@ func (client *peerRESTClient) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetLocksResp stores various info from the client for each lock that is requested.
|
||||
type GetLocksResp []map[string][]lockRequesterInfo
|
||||
|
||||
// GetLocks - fetch older locks for a remote node.
|
||||
func (client *peerRESTClient) GetLocks() (locks GetLocksResp, err error) {
|
||||
func (client *peerRESTClient) GetLocks() (lockMap map[string][]lockRequesterInfo, err error) {
|
||||
respBody, err := client.call(peerRESTMethodGetLocks, nil, nil, -1)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
lockMap = map[string][]lockRequesterInfo{}
|
||||
defer http.DrainBody(respBody)
|
||||
err = gob.NewDecoder(respBody).Decode(&locks)
|
||||
return locks, err
|
||||
err = gob.NewDecoder(respBody).Decode(&lockMap)
|
||||
return lockMap, err
|
||||
}
|
||||
|
||||
// ServerInfo - fetch server information for a remote node.
|
||||
|
@ -48,12 +48,7 @@ func (s *peerRESTServer) GetLocksHandler(w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
|
||||
ctx := newContext(r, w, "GetLocks")
|
||||
|
||||
llockers := make(GetLocksResp, 0, len(globalLockServers))
|
||||
for _, llocker := range globalLockServers {
|
||||
llockers = append(llockers, llocker.DupLockMap())
|
||||
}
|
||||
logger.LogIf(ctx, gob.NewEncoder(w).Encode(llockers))
|
||||
logger.LogIf(ctx, gob.NewEncoder(w).Encode(globalLockServer.DupLockMap()))
|
||||
|
||||
w.(http.Flusher).Flush()
|
||||
|
||||
|
@ -34,7 +34,7 @@ func registerDistErasureRouters(router *mux.Router, endpointServerPools Endpoint
|
||||
registerBootstrapRESTHandlers(router)
|
||||
|
||||
// Register distributed namespace lock routers.
|
||||
registerLockRESTHandlers(router, endpointServerPools)
|
||||
registerLockRESTHandlers(router)
|
||||
}
|
||||
|
||||
// List of some generic handlers which are applied for all incoming requests.
|
||||
|
@ -130,21 +130,21 @@ func NewIDKey(time, memory uint32, threads uint8) func([]byte, []byte, []byte, [
|
||||
pool := sync.Pool{
|
||||
New: func() interface{} {
|
||||
b := make([]block, memory)
|
||||
return b
|
||||
return &b
|
||||
},
|
||||
}
|
||||
|
||||
return func(password, salt, secret, data []byte, keyLen uint32) []byte {
|
||||
B := pool.Get().([]block)
|
||||
B := pool.Get().(*[]block)
|
||||
defer func() {
|
||||
clearBlocks(B)
|
||||
clearBlocks(*B)
|
||||
pool.Put(B)
|
||||
}()
|
||||
|
||||
h0 := initHash(password, salt, secret, data, time, hashMemory, uint32(threads), keyLen, argon2id)
|
||||
B = initBlocks(&h0, B, uint32(threads))
|
||||
processBlocks(B, time, memory, uint32(threads), argon2id)
|
||||
return extractKey(B, memory, uint32(threads), keyLen)
|
||||
B1 := initBlocks(&h0, *B, uint32(threads))
|
||||
processBlocks(B1, time, memory, uint32(threads), argon2id)
|
||||
return extractKey(B1, memory, uint32(threads), keyLen)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -299,7 +299,9 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
|
||||
quorumLocked := checkQuorumLocked(locks, quorum) && locksFailed <= tolerance
|
||||
if !quorumLocked {
|
||||
log("Releasing all acquired locks now abandoned after quorum was not met\n")
|
||||
releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...)
|
||||
if !releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...) {
|
||||
log("Unable to release acquired locks, stale locks might be present\n")
|
||||
}
|
||||
}
|
||||
|
||||
// We may have some unused results in ch, release them async.
|
||||
@ -308,11 +310,10 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
|
||||
close(ch)
|
||||
for grantToBeReleased := range ch {
|
||||
if grantToBeReleased.isLocked() {
|
||||
// release lock
|
||||
// release abandoned lock
|
||||
log("Releasing abandoned lock\n")
|
||||
sendRelease(ds, restClnts[grantToBeReleased.index],
|
||||
owner,
|
||||
grantToBeReleased.lockUID, isReadLock, lockNames...)
|
||||
owner, grantToBeReleased.lockUID, isReadLock, lockNames...)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -28,7 +28,11 @@ import (
|
||||
"github.com/secure-io/sio-go/sioutil"
|
||||
)
|
||||
|
||||
var idKey = argon2.NewIDKey(1, 64*1024, 4)
|
||||
var idKey func([]byte, []byte, []byte, []byte, uint32) []byte
|
||||
|
||||
func init() {
|
||||
idKey = argon2.NewIDKey(1, 64*1024, 4)
|
||||
}
|
||||
|
||||
// EncryptData encrypts the data with an unique key
|
||||
// derived from password using the Argon2id PBKDF.
|
||||
|
Loading…
Reference in New Issue
Block a user