Implement bucket expansion (#8509)

This commit is contained in:
Harshavardhana
2019-11-19 17:42:27 -08:00
committed by kannappanr
parent 3a34d98db8
commit 347b29d059
63 changed files with 2208 additions and 1166 deletions

View File

@@ -20,6 +20,7 @@ import (
"context"
"fmt"
golog "log"
"math"
"math/rand"
"os"
"path"
@@ -75,7 +76,7 @@ func isLocked(uid string) bool {
func NewDRWMutex(ctx context.Context, name string, clnt *Dsync) *DRWMutex {
return &DRWMutex{
Name: name,
writeLocks: make([]string, clnt.dNodeCount),
writeLocks: make([]string, len(clnt.GetLockersFn())),
clnt: clnt,
ctx: ctx,
}
@@ -133,6 +134,8 @@ func (dm *DRWMutex) lockBlocking(timeout time.Duration, id, source string, isRea
doneCh, start := make(chan struct{}), time.Now().UTC()
defer close(doneCh)
restClnts := dm.clnt.GetLockersFn()
// Use incremental back-off algorithm for repeated attempts to acquire the lock
for range newRetryTimerSimple(doneCh) {
select {
@@ -142,7 +145,7 @@ func (dm *DRWMutex) lockBlocking(timeout time.Duration, id, source string, isRea
}
// Create temp array on stack.
locks := make([]string, dm.clnt.dNodeCount)
locks := make([]string, len(restClnts))
// Try to acquire the lock.
success := lock(dm.clnt, &locks, dm.Name, id, source, isReadLock)
@@ -152,7 +155,7 @@ func (dm *DRWMutex) lockBlocking(timeout time.Duration, id, source string, isRea
// If success, copy array to object
if isReadLock {
// Append new array of strings at the end
dm.readersLocks = append(dm.readersLocks, make([]string, dm.clnt.dNodeCount))
dm.readersLocks = append(dm.readersLocks, make([]string, len(restClnts)))
// and copy stack array into last spot
copy(dm.readersLocks[len(dm.readersLocks)-1], locks[:])
} else {
@@ -174,12 +177,14 @@ func (dm *DRWMutex) lockBlocking(timeout time.Duration, id, source string, isRea
// lock tries to acquire the distributed lock, returning true or false.
func lock(ds *Dsync, locks *[]string, lockName, id, source string, isReadLock bool) bool {
restClnts := ds.GetLockersFn()
// Create buffered channel of size equal to total number of nodes.
ch := make(chan Granted, ds.dNodeCount)
ch := make(chan Granted, len(restClnts))
defer close(ch)
var wg sync.WaitGroup
for index, c := range ds.restClnts {
for index, c := range restClnts {
wg.Add(1)
// broadcast lock request to all nodes
@@ -229,7 +234,10 @@ func lock(ds *Dsync, locks *[]string, lockName, id, source string, isReadLock bo
done := false
timeout := time.After(DRWMutexAcquireTimeout)
for ; i < ds.dNodeCount; i++ { // Loop until we acquired all locks
dquorum := int(len(restClnts)/2) + 1
dquorumReads := int(math.Ceil(float64(len(restClnts)) / 2.0))
for ; i < len(restClnts); i++ { // Loop until we acquired all locks
select {
case grant := <-ch:
@@ -238,22 +246,22 @@ func lock(ds *Dsync, locks *[]string, lockName, id, source string, isReadLock bo
(*locks)[grant.index] = grant.lockUID
} else {
locksFailed++
if !isReadLock && locksFailed > ds.dNodeCount-ds.dquorum ||
isReadLock && locksFailed > ds.dNodeCount-ds.dquorumReads {
if !isReadLock && locksFailed > len(restClnts)-dquorum ||
isReadLock && locksFailed > len(restClnts)-dquorumReads {
// We know that we are not going to get the lock anymore,
// so exit out and release any locks that did get acquired
done = true
// Increment the number of grants received from the buffered channel.
i++
releaseAll(ds, locks, lockName, isReadLock)
releaseAll(ds, locks, lockName, isReadLock, restClnts)
}
}
case <-timeout:
done = true
// timeout happened, maybe one of the nodes is slow, count
// number of locks to check whether we have quorum or not
if !quorumMet(locks, isReadLock, ds.dquorum, ds.dquorumReads) {
releaseAll(ds, locks, lockName, isReadLock)
if !quorumMet(locks, isReadLock, dquorum, dquorumReads) {
releaseAll(ds, locks, lockName, isReadLock, restClnts)
}
}
@@ -263,7 +271,7 @@ func lock(ds *Dsync, locks *[]string, lockName, id, source string, isReadLock bo
}
// Count locks in order to determine whether we have quorum or not
quorum = quorumMet(locks, isReadLock, ds.dquorum, ds.dquorumReads)
quorum = quorumMet(locks, isReadLock, dquorum, dquorumReads)
// Signal that we have the quorum
wg.Done()
@@ -271,11 +279,12 @@ func lock(ds *Dsync, locks *[]string, lockName, id, source string, isReadLock bo
// Wait for the other responses and immediately release the locks
// (do not add them to the locks array because the DRWMutex could
// already has been unlocked again by the original calling thread)
for ; i < ds.dNodeCount; i++ {
for ; i < len(restClnts); i++ {
grantToBeReleased := <-ch
if grantToBeReleased.isLocked() {
// release lock
sendRelease(ds, ds.restClnts[grantToBeReleased.index], lockName, grantToBeReleased.lockUID, isReadLock)
sendRelease(ds, restClnts[grantToBeReleased.index], lockName,
grantToBeReleased.lockUID, isReadLock)
}
}
}(isReadLock)
@@ -306,10 +315,10 @@ func quorumMet(locks *[]string, isReadLock bool, quorum, quorumReads int) bool {
}
// releaseAll releases all locks that are marked as locked
func releaseAll(ds *Dsync, locks *[]string, lockName string, isReadLock bool) {
for lock := 0; lock < ds.dNodeCount; lock++ {
func releaseAll(ds *Dsync, locks *[]string, lockName string, isReadLock bool, restClnts []NetLocker) {
for lock := 0; lock < len(restClnts); lock++ {
if isLocked((*locks)[lock]) {
sendRelease(ds, ds.restClnts[lock], lockName, (*locks)[lock], isReadLock)
sendRelease(ds, restClnts[lock], lockName, (*locks)[lock], isReadLock)
(*locks)[lock] = ""
}
}
@@ -320,8 +329,9 @@ func releaseAll(ds *Dsync, locks *[]string, lockName string, isReadLock bool) {
// It is a run-time error if dm is not locked on entry to Unlock.
func (dm *DRWMutex) Unlock() {
restClnts := dm.clnt.GetLockersFn()
// create temp array on stack
locks := make([]string, dm.clnt.dNodeCount)
locks := make([]string, len(restClnts))
{
dm.m.Lock()
@@ -342,11 +352,11 @@ func (dm *DRWMutex) Unlock() {
// Copy write locks to stack array
copy(locks, dm.writeLocks[:])
// Clear write locks array
dm.writeLocks = make([]string, dm.clnt.dNodeCount)
dm.writeLocks = make([]string, len(restClnts))
}
isReadLock := false
unlock(dm.clnt, locks, dm.Name, isReadLock)
unlock(dm.clnt, locks, dm.Name, isReadLock, restClnts)
}
// RUnlock releases a read lock held on dm.
@@ -355,8 +365,9 @@ func (dm *DRWMutex) Unlock() {
func (dm *DRWMutex) RUnlock() {
// create temp array on stack
locks := make([]string, dm.clnt.dNodeCount)
restClnts := dm.clnt.GetLockersFn()
locks := make([]string, len(restClnts))
{
dm.m.Lock()
defer dm.m.Unlock()
@@ -370,15 +381,15 @@ func (dm *DRWMutex) RUnlock() {
}
isReadLock := true
unlock(dm.clnt, locks, dm.Name, isReadLock)
unlock(dm.clnt, locks, dm.Name, isReadLock, restClnts)
}
func unlock(ds *Dsync, locks []string, name string, isReadLock bool) {
func unlock(ds *Dsync, locks []string, name string, isReadLock bool, restClnts []NetLocker) {
// We don't need to synchronously wait until we have released all the locks (or the quorum)
// (a subsequent lock will retry automatically in case it would fail to get quorum)
for index, c := range ds.restClnts {
for index, c := range restClnts {
if isLocked(locks[index]) {
// broadcast lock release to all nodes that granted the lock

View File

@@ -16,45 +16,9 @@
package dsync
import (
"errors"
"math"
)
// Dsync represents dsync client object which is initialized with
// authenticated clients, used to initiate lock REST calls.
type Dsync struct {
// Number of nodes participating in the distributed locking.
dNodeCount int
// List of rest client objects, one per lock server.
restClnts []NetLocker
// Simple majority based quorum, set to dNodeCount/2+1
dquorum int
// Simple quorum for read operations, set to dNodeCount/2
dquorumReads int
}
// New - initializes a new dsync object with input restClnts.
func New(restClnts []NetLocker) (*Dsync, error) {
if len(restClnts) < 2 {
return nil, errors.New("Dsync is not designed for less than 2 nodes")
} else if len(restClnts) > 32 {
return nil, errors.New("Dsync is not designed for more than 32 nodes")
}
ds := &Dsync{}
ds.dNodeCount = len(restClnts)
// With odd number of nodes, write and read quorum is basically the same
ds.dquorum = int(ds.dNodeCount/2) + 1
ds.dquorumReads = int(math.Ceil(float64(ds.dNodeCount) / 2.0))
// Initialize node name and rest path for each NetLocker object.
ds.restClnts = make([]NetLocker, ds.dNodeCount)
copy(ds.restClnts, restClnts)
return ds, nil
GetLockersFn func() []NetLocker
}

View File

@@ -1,58 +0,0 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// GOMAXPROCS=10 go test
package dsync
import "testing"
// Tests dsync.New
func TestNew(t *testing.T) {
nclnts := make([]NetLocker, 33)
if _, err := New(nclnts); err == nil {
t.Fatal("Should have failed")
}
nclnts = make([]NetLocker, 1)
if _, err := New(nclnts); err == nil {
t.Fatal("Should have failed")
}
nclnts = make([]NetLocker, 2)
nds, err := New(nclnts)
if err != nil {
t.Fatal("Should pass", err)
}
if nds.dquorumReads != 1 {
t.Fatalf("Unexpected read quorum values expected 1, got %d", nds.dquorumReads)
}
if nds.dquorum != 2 {
t.Fatalf("Unexpected quorum values expected 2, got %d", nds.dquorum)
}
nclnts = make([]NetLocker, 3)
nds, err = New(nclnts)
if err != nil {
t.Fatal("Should pass", err)
}
if nds.dquorumReads != nds.dquorum {
t.Fatalf("Unexpected quorum values for odd nodes we expect read %d and write %d quorum to be same", nds.dquorumReads, nds.dquorum)
}
}

View File

@@ -78,10 +78,8 @@ func TestMain(m *testing.M) {
clnts = append(clnts, newClient(nodes[i], rpcPaths[i]))
}
var err error
ds, err = New(clnts)
if err != nil {
log.Fatalf("set nodes failed with %v", err)
ds = &Dsync{
GetLockersFn: func() []NetLocker { return clnts },
}
startRPCServers(nodes)
@@ -256,11 +254,10 @@ func TestMutex(t *testing.T) {
func BenchmarkMutexUncontended(b *testing.B) {
type PaddedMutex struct {
DRWMutex
pad [128]uint8
*DRWMutex
}
b.RunParallel(func(pb *testing.PB) {
var mu PaddedMutex
var mu = PaddedMutex{NewDRWMutex(context.Background(), "", ds)}
for pb.Next() {
mu.Lock(id, source)
mu.Unlock()

View File

@@ -41,6 +41,14 @@ func newClient(addr, endpoint string) NetLocker {
}
}
// Close closes the underlying socket file descriptor.
func (rpcClient *ReconnectRPCClient) IsOnline() bool {
rpcClient.mutex.Lock()
defer rpcClient.mutex.Unlock()
// If rpc client has not connected yet there is nothing to close.
return rpcClient.rpc != nil
}
// Close closes the underlying socket file descriptor.
func (rpcClient *ReconnectRPCClient) Close() error {
rpcClient.mutex.Lock()

View File

@@ -56,4 +56,7 @@ type NetLocker interface {
// Close closes any underlying connection to the service endpoint
Close() error
// Is the underlying connection online? (is always true for any local lockers)
IsOnline() bool
}