diff --git a/internal/dsync/drwmutex.go b/internal/dsync/drwmutex.go index dfb438477..2fbb12258 100644 --- a/internal/dsync/drwmutex.go +++ b/internal/dsync/drwmutex.go @@ -63,8 +63,9 @@ const drwMutexInfinite = 1<<63 - 1 // A DRWMutex is a distributed mutual exclusion lock. type DRWMutex struct { Names []string - writeLocks []string // Array of nodes that granted a write lock - readersLocks [][]string // Array of array of nodes that granted reader locks + writeLocks []string // Array of nodes that granted a write lock + readLocks []string // Array of array of nodes that granted reader locks + rng *rand.Rand m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node clnt *Dsync cancelRefresh context.CancelFunc @@ -90,8 +91,10 @@ func NewDRWMutex(clnt *Dsync, names ...string) *DRWMutex { sort.Strings(names) return &DRWMutex{ writeLocks: make([]string, len(restClnts)), + readLocks: make([]string, len(restClnts)), Names: names, clnt: clnt, + rng: rand.New(&lockedRandSource{src: rand.NewSource(time.Now().UTC().UnixNano())}), } } @@ -159,8 +162,6 @@ const ( func (dm *DRWMutex) lockBlocking(ctx context.Context, lockLossCallback func(), id, source string, isReadLock bool, opts Options) (locked bool) { restClnts, _ := dm.clnt.GetLockers() - r := rand.New(rand.NewSource(time.Now().UnixNano())) - // Create lock array to capture the successful lockers locks := make([]string, len(restClnts)) @@ -198,10 +199,7 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, lockLossCallback func(), i // If success, copy array to object if isReadLock { - // Append new array of strings at the end - dm.readersLocks = append(dm.readersLocks, make([]string, len(restClnts))) - // and copy stack array into last spot - copy(dm.readersLocks[len(dm.readersLocks)-1], locks[:]) + copy(dm.readLocks, locks[:]) } else { copy(dm.writeLocks, locks[:]) } @@ -215,7 +213,7 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, lockLossCallback func(), i return locked } - time.Sleep(time.Duration(r.Float64() * float64(lockRetryInterval))) + time.Sleep(time.Duration(dm.rng.Float64() * float64(lockRetryInterval))) } } } @@ -538,10 +536,8 @@ func releaseAll(ds *Dsync, tolerance int, owner string, locks *[]string, isReadL wg.Add(1) go func(lockID int) { defer wg.Done() - if isLocked((*locks)[lockID]) { - if sendRelease(ds, restClnts[lockID], owner, (*locks)[lockID], isReadLock, names...) { - (*locks)[lockID] = "" - } + if sendRelease(ds, restClnts[lockID], owner, (*locks)[lockID], isReadLock, names...) { + (*locks)[lockID] = "" } }(lockID) } @@ -590,9 +586,8 @@ func (dm *DRWMutex) Unlock() { tolerance := len(restClnts) / 2 isReadLock := false - r := rand.New(rand.NewSource(time.Now().UnixNano())) for !releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) { - time.Sleep(time.Duration(r.Float64() * float64(lockRetryInterval))) + time.Sleep(time.Duration(dm.rng.Float64() * float64(lockRetryInterval))) } } @@ -604,29 +599,36 @@ func (dm *DRWMutex) RUnlock() { dm.cancelRefresh() dm.m.Unlock() - // create temp array on stack restClnts, owner := dm.clnt.GetLockers() - + // create temp array on stack locks := make([]string, len(restClnts)) + { dm.m.Lock() defer dm.m.Unlock() - if len(dm.readersLocks) == 0 { + + // Check if minimally a single bool is set in the writeLocks array + lockFound := false + for _, uid := range dm.readLocks { + if isLocked(uid) { + lockFound = true + break + } + } + if !lockFound { panic("Trying to RUnlock() while no RLock() is active") } - // Copy out first element to release it first (FIFO) - copy(locks, dm.readersLocks[0][:]) - // Drop first element from array - dm.readersLocks = dm.readersLocks[1:] + + // Copy write locks to stack array + copy(locks, dm.readLocks[:]) } // Tolerance is not set, defaults to half of the locker clients. tolerance := len(restClnts) / 2 isReadLock := true - r := rand.New(rand.NewSource(time.Now().UnixNano())) for !releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) { - time.Sleep(time.Duration(r.Float64() * float64(lockRetryInterval))) + time.Sleep(time.Duration(dm.rng.Float64() * float64(lockRetryInterval))) } } @@ -637,6 +639,10 @@ func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bo return false } + if len(uid) == 0 { + return false + } + args := LockArgs{ Owner: owner, UID: uid, diff --git a/internal/dsync/drwmutex_test.go b/internal/dsync/drwmutex_test.go index 0038b008e..ce27d4df7 100644 --- a/internal/dsync/drwmutex_test.go +++ b/internal/dsync/drwmutex_test.go @@ -306,6 +306,9 @@ func TestRUnlockPanic2(t *testing.T) { // Borrowed from rwmutex_test.go func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) { + b.ResetTimer() + b.ReportAllocs() + rwm := NewDRWMutex(ds, "test") b.RunParallel(func(pb *testing.PB) { foo := 0 diff --git a/internal/dsync/dsync_test.go b/internal/dsync/dsync_test.go index 853d8e9df..0b25fa896 100644 --- a/internal/dsync/dsync_test.go +++ b/internal/dsync/dsync_test.go @@ -325,6 +325,9 @@ func TestMutex(t *testing.T) { } func BenchmarkMutexUncontended(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + type PaddedMutex struct { *DRWMutex } @@ -338,6 +341,9 @@ func BenchmarkMutexUncontended(b *testing.B) { } func benchmarkMutex(b *testing.B, slack, work bool) { + b.ResetTimer() + b.ReportAllocs() + mu := NewDRWMutex(ds, "") if slack { b.SetParallelism(10) @@ -375,6 +381,9 @@ func BenchmarkMutexWorkSlack(b *testing.B) { } func BenchmarkMutexNoSpin(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + // This benchmark models a situation where spinning in the mutex should be // non-profitable and allows to confirm that spinning does not do harm. // To achieve this we create excess of goroutines most of which do local work. @@ -409,6 +418,9 @@ func BenchmarkMutexNoSpin(b *testing.B) { } func BenchmarkMutexSpin(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + // This benchmark models a situation where spinning in the mutex should be // profitable. To achieve this we create a goroutine per-proc. // These goroutines access considerable amount of local data so that diff --git a/internal/dsync/locked_rand.go b/internal/dsync/locked_rand.go new file mode 100644 index 000000000..4c728ba43 --- /dev/null +++ b/internal/dsync/locked_rand.go @@ -0,0 +1,45 @@ +// Copyright (c) 2015-2021 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package dsync + +import ( + "math/rand" + "sync" +) + +// lockedRandSource provides protected rand source, implements rand.Source interface. +type lockedRandSource struct { + lk sync.Mutex + src rand.Source +} + +// Int63 returns a non-negative pseudo-random 63-bit integer as an int64. +func (r *lockedRandSource) Int63() (n int64) { + r.lk.Lock() + n = r.src.Int63() + r.lk.Unlock() + return +} + +// Seed uses the provided seed value to initialize the generator to a +// deterministic state. +func (r *lockedRandSource) Seed(seed int64) { + r.lk.Lock() + r.src.Seed(seed) + r.lk.Unlock() +}