From f3f09ed14e309db8fff14c21d2ce7dc1781362ac Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 22 Jan 2018 10:25:10 -0800 Subject: [PATCH] Fix a bug in dsync initialization and communication (#5428) In current implementation we used as many dsync clients as per number of endpoints(along with path) which is not the expected implementation. The implementation of Dsync was expected to be just for the endpoint Host alone such that if you have 4 servers and each with 4 disks we need to only have 4 dsync clients and 4 dsync servers. But we currently had 8 clients, servers which in-fact is unexpected and should be avoided. This PR brings the implementation back to its original intention. This issue was found #5160 --- cmd/lock-rpc-server.go | 57 +++++++-------- cmd/lock-rpc-server_test.go | 25 ++++--- cmd/namespace-lock.go | 38 +++++----- cmd/server-main.go | 4 +- vendor/github.com/minio/dsync/README.md | 24 ++++--- vendor/github.com/minio/dsync/drwmutex.go | 88 ++++++++++++----------- vendor/github.com/minio/dsync/dsync.go | 65 ++++++++--------- vendor/vendor.json | 6 +- 8 files changed, 159 insertions(+), 148 deletions(-) diff --git a/cmd/lock-rpc-server.go b/cmd/lock-rpc-server.go index 92d68e514..a329af79c 100644 --- a/cmd/lock-rpc-server.go +++ b/cmd/lock-rpc-server.go @@ -19,7 +19,6 @@ package cmd import ( "fmt" "math/rand" - "path" "sync" "time" @@ -64,49 +63,45 @@ type lockServer struct { } // Start lock maintenance from all lock servers. -func startLockMaintenance(lockServers []*lockServer) { - for _, locker := range lockServers { - // Start loop for stale lock maintenance - go func(lk *lockServer) { - // Initialize a new ticker with a minute between each ticks. - ticker := time.NewTicker(lockMaintenanceInterval) +func startLockMaintenance(lkSrv *lockServer) { + // Start loop for stale lock maintenance + go func(lk *lockServer) { + // Initialize a new ticker with a minute between each ticks. + ticker := time.NewTicker(lockMaintenanceInterval) - // Start with random sleep time, so as to avoid "synchronous checks" between servers - time.Sleep(time.Duration(rand.Float64() * float64(lockMaintenanceInterval))) - for { - // Verifies every minute for locks held more than 2minutes. - select { - case <-globalServiceDoneCh: - // Stop the timer upon service closure and cleanup the go-routine. - ticker.Stop() - return - case <-ticker.C: - lk.lockMaintenance(lockValidityCheckInterval) - } + // Start with random sleep time, so as to avoid "synchronous checks" between servers + time.Sleep(time.Duration(rand.Float64() * float64(lockMaintenanceInterval))) + for { + // Verifies every minute for locks held more than 2minutes. + select { + case <-globalServiceDoneCh: + // Stop the timer upon service closure and cleanup the go-routine. + ticker.Stop() + return + case <-ticker.C: + lk.lockMaintenance(lockValidityCheckInterval) } - }(locker) - } + } + }(lkSrv) } // Register distributed NS lock handlers. func registerDistNSLockRouter(mux *router.Router, endpoints EndpointList) error { // Start lock maintenance from all lock servers. - startLockMaintenance(globalLockServers) + startLockMaintenance(globalLockServer) // Register initialized lock servers to their respective rpc endpoints. - return registerStorageLockers(mux, globalLockServers) + return registerStorageLockers(mux, globalLockServer) } // registerStorageLockers - register locker rpc handlers for net/rpc library clients -func registerStorageLockers(mux *router.Router, lockServers []*lockServer) error { - for _, lockServer := range lockServers { - lockRPCServer := newRPCServer() - if err := lockRPCServer.RegisterName(lockServiceName, lockServer); err != nil { - return errors.Trace(err) - } - lockRouter := mux.PathPrefix(minioReservedBucketPath).Subrouter() - lockRouter.Path(path.Join(lockServicePath, lockServer.ll.serviceEndpoint)).Handler(lockRPCServer) +func registerStorageLockers(mux *router.Router, lkSrv *lockServer) error { + lockRPCServer := newRPCServer() + if err := lockRPCServer.RegisterName(lockServiceName, lkSrv); err != nil { + return errors.Trace(err) } + lockRouter := mux.PathPrefix(minioReservedBucketPath).Subrouter() + lockRouter.Path(lockServicePath).Handler(lockRPCServer) return nil } diff --git a/cmd/lock-rpc-server_test.go b/cmd/lock-rpc-server_test.go index 0f4dfb2e3..da8256c3e 100644 --- a/cmd/lock-rpc-server_test.go +++ b/cmd/lock-rpc-server_test.go @@ -425,8 +425,8 @@ func TestLockRpcServerExpired(t *testing.T) { } } -// Test initialization of lock servers. -func TestLockServers(t *testing.T) { +// Test initialization of lock server. +func TestLockServerInit(t *testing.T) { if runtime.GOOS == globalWindowsOSName { return } @@ -438,8 +438,10 @@ func TestLockServers(t *testing.T) { defer os.RemoveAll(rootPath) currentIsDistXL := globalIsDistXL + currentLockServer := globalLockServer defer func() { globalIsDistXL = currentIsDistXL + globalLockServer = currentLockServer }() case1Endpoints := mustGetNewEndpointList( @@ -468,26 +470,27 @@ func TestLockServers(t *testing.T) { globalMinioHost = "" testCases := []struct { - isDistXL bool - endpoints EndpointList - totalLockServers int + isDistXL bool + endpoints EndpointList }{ // Test - 1 one lock server initialized. - {true, case1Endpoints, 1}, - // Test - 2 two servers possible. - {true, case2Endpoints, 2}, + {true, case1Endpoints}, + // Test - similar endpoint hosts should + // converge to single lock server + // initialized. + {true, case2Endpoints}, } // Validates lock server initialization. for i, testCase := range testCases { globalIsDistXL = testCase.isDistXL - globalLockServers = nil + globalLockServer = nil _, _ = newDsyncNodes(testCase.endpoints) if err != nil { t.Fatalf("Got unexpected error initializing lock servers: %v", err) } - if len(globalLockServers) != testCase.totalLockServers { - t.Fatalf("Test %d: Expected total %d, got %d", i+1, testCase.totalLockServers, len(globalLockServers)) + if globalLockServer == nil && testCase.isDistXL { + t.Errorf("Test %d: Expected initialized lockServer, but got uninitialized", i+1) } } } diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 9d7f58369..c6df980ba 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -28,13 +28,17 @@ import ( "github.com/minio/dsync" "github.com/minio/lsync" + "github.com/minio/minio-go/pkg/set" ) // Global name space lock. var globalNSMutex *nsLockMap -// Global lock servers -var globalLockServers []*lockServer +// Global lock server one per server. +var globalLockServer *lockServer + +// Instance of dsync for distributed clients. +var globalDsync *dsync.Dsync // RWLocker - locker interface to introduce GetRLock, RUnlock. type RWLocker interface { @@ -56,39 +60,41 @@ type RWLockerSync interface { // Returns lock clients and the node index for the current server. func newDsyncNodes(endpoints EndpointList) (clnts []dsync.NetLocker, myNode int) { cred := globalServerConfig.GetCredential() - clnts = make([]dsync.NetLocker, len(endpoints)) myNode = -1 - for index, endpoint := range endpoints { + seenHosts := set.NewStringSet() + for _, endpoint := range endpoints { + if seenHosts.Contains(endpoint.Host) { + continue + } + seenHosts.Add(endpoint.Host) if !endpoint.IsLocal { // For a remote endpoints setup a lock RPC client. - clnts[index] = newLockRPCClient(authConfig{ + clnts = append(clnts, newLockRPCClient(authConfig{ accessKey: cred.AccessKey, secretKey: cred.SecretKey, serverAddr: endpoint.Host, secureConn: globalIsSSL, - serviceEndpoint: pathutil.Join(minioReservedBucketPath, lockServicePath, endpoint.Path), + serviceEndpoint: pathutil.Join(minioReservedBucketPath, lockServicePath), serviceName: lockServiceName, - }) + })) continue } // Local endpoint - if myNode == -1 { - myNode = index - } + myNode = len(clnts) + // For a local endpoint, setup a local lock server to // avoid network requests. localLockServer := lockServer{ AuthRPCServer: AuthRPCServer{}, ll: localLocker{ - mutex: sync.Mutex{}, - serviceEndpoint: endpoint.Path, serverAddr: endpoint.Host, + serviceEndpoint: pathutil.Join(minioReservedBucketPath, lockServicePath), lockMap: make(map[string][]lockRequesterInfo), }, } - globalLockServers = append(globalLockServers, &localLockServer) - clnts[index] = &(localLockServer.ll) + globalLockServer = &localLockServer + clnts = append(clnts, &(localLockServer.ll)) } return clnts, myNode @@ -149,7 +155,7 @@ func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock nsLk = &nsLock{ RWLockerSync: func() RWLockerSync { if n.isDistXL { - return dsync.NewDRWMutex(pathJoin(volume, path)) + return dsync.NewDRWMutex(pathJoin(volume, path), globalDsync) } return &lsync.LRWMutex{} }(), @@ -303,7 +309,7 @@ func (n *nsLockMap) ForceUnlock(volume, path string) { // are blocking can now proceed as normal and any new locks will also // participate normally. if n.isDistXL { // For distributed mode, broadcast ForceUnlock message. - dsync.NewDRWMutex(pathJoin(volume, path)).ForceUnlock() + dsync.NewDRWMutex(pathJoin(volume, path), globalDsync).ForceUnlock() } param := nsParam{volume, path} diff --git a/cmd/server-main.go b/cmd/server-main.go index d59a1b2aa..1a67dfe77 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -186,8 +186,8 @@ func serverMain(ctx *cli.Context) { // Set nodes for dsync for distributed setup. if globalIsDistXL { - clnts, myNode := newDsyncNodes(globalEndpoints) - fatalIf(dsync.Init(clnts, myNode), "Unable to initialize distributed locking clients") + globalDsync, err = dsync.New(newDsyncNodes(globalEndpoints)) + fatalIf(err, "Unable to initialize distributed locking clients") } // Initialize name space lock. diff --git a/vendor/github.com/minio/dsync/README.md b/vendor/github.com/minio/dsync/README.md index 6bc3a8fdd..95af681b7 100644 --- a/vendor/github.com/minio/dsync/README.md +++ b/vendor/github.com/minio/dsync/README.md @@ -87,22 +87,26 @@ The system can be pushed to 75K locks/sec at 50% CPU load. Usage ----- +> NOTE: Previously if you were using `dsync.Init([]NetLocker, nodeIndex)` to initialize dsync has +been changed to `dsync.New([]NetLocker, nodeIndex)` which returns a `*Dsync` object to be used in +every instance of `NewDRWMutex("test", *Dsync)` + ### Exclusive lock Here is a simple example showing how to protect a single resource (drop-in replacement for `sync.Mutex`): -``` +```go import ( - "github.com/minio/dsync" + "github.com/minio/dsync" ) func lockSameResource() { - // Create distributed mutex to protect resource 'test' - dm := dsync.NewDRWMutex("test") + // Create distributed mutex to protect resource 'test' + dm := dsync.NewDRWMutex("test", ds) dm.Lock() - log.Println("first lock granted") + log.Println("first lock granted") // Release 1st lock after 5 seconds go func() { @@ -111,10 +115,10 @@ func lockSameResource() { dm.Unlock() }() - // Try to acquire lock again, will block until initial lock is released - log.Println("about to lock same resource again...") + // Try to acquire lock again, will block until initial lock is released + log.Println("about to lock same resource again...") dm.Lock() - log.Println("second lock granted") + log.Println("second lock granted") time.Sleep(2 * time.Second) dm.Unlock() @@ -137,7 +141,7 @@ DRWMutex also supports multiple simultaneous read locks as shown below (analogou ``` func twoReadLocksAndSingleWriteLock() { - drwm := dsync.NewDRWMutex("resource") + drwm := dsync.NewDRWMutex("resource", ds) drwm.RLock() log.Println("1st read lock acquired, waiting...") @@ -160,7 +164,7 @@ func twoReadLocksAndSingleWriteLock() { log.Println("Trying to acquire write lock, waiting...") drwm.Lock() log.Println("Write lock acquired, waiting...") - + time.Sleep(3 * time.Second) drwm.Unlock() diff --git a/vendor/github.com/minio/dsync/drwmutex.go b/vendor/github.com/minio/dsync/drwmutex.go index 719874e7f..b2d3ee61d 100644 --- a/vendor/github.com/minio/dsync/drwmutex.go +++ b/vendor/github.com/minio/dsync/drwmutex.go @@ -49,6 +49,7 @@ type DRWMutex struct { writeLocks []string // Array of nodes that granted a write lock readersLocks [][]string // Array of array of nodes that granted reader locks m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node + clnt *Dsync } // Granted - represents a structure of a granted lock. @@ -66,10 +67,11 @@ func isLocked(uid string) bool { } // NewDRWMutex - initializes a new dsync RW mutex. -func NewDRWMutex(name string) *DRWMutex { +func NewDRWMutex(name string, clnt *Dsync) *DRWMutex { return &DRWMutex{ Name: name, - writeLocks: make([]string, dnodeCount), + writeLocks: make([]string, clnt.dNodeCount), + clnt: clnt, } } @@ -128,10 +130,10 @@ func (dm *DRWMutex) lockBlocking(timeout time.Duration, isReadLock bool) (locked // Use incremental back-off algorithm for repeated attempts to acquire the lock for range newRetryTimerSimple(doneCh) { // Create temp array on stack. - locks := make([]string, dnodeCount) + locks := make([]string, dm.clnt.dNodeCount) // Try to acquire the lock. - success := lock(clnts, &locks, dm.Name, isReadLock) + success := lock(dm.clnt, &locks, dm.Name, isReadLock) if success { dm.m.Lock() defer dm.m.Unlock() @@ -139,7 +141,7 @@ func (dm *DRWMutex) lockBlocking(timeout time.Duration, isReadLock bool) (locked // If success, copy array to object if isReadLock { // Append new array of strings at the end - dm.readersLocks = append(dm.readersLocks, make([]string, dnodeCount)) + dm.readersLocks = append(dm.readersLocks, make([]string, dm.clnt.dNodeCount)) // and copy stack array into last spot copy(dm.readersLocks[len(dm.readersLocks)-1], locks[:]) } else { @@ -158,14 +160,14 @@ func (dm *DRWMutex) lockBlocking(timeout time.Duration, isReadLock bool) (locked } // lock tries to acquire the distributed lock, returning true or false. -func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) bool { +func lock(ds *Dsync, locks *[]string, lockName string, isReadLock bool) bool { // Create buffered channel of size equal to total number of nodes. - ch := make(chan Granted, dnodeCount) + ch := make(chan Granted, ds.dNodeCount) defer close(ch) var wg sync.WaitGroup - for index, c := range clnts { + for index, c := range ds.rpcClnts { wg.Add(1) // broadcast lock request to all nodes @@ -181,8 +183,8 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) args := LockArgs{ UID: uid, Resource: lockName, - ServerAddr: clnts[ownNode].ServerAddr(), - ServiceEndpoint: clnts[ownNode].ServiceEndpoint(), + ServerAddr: ds.rpcClnts[ds.ownNode].ServerAddr(), + ServiceEndpoint: ds.rpcClnts[ds.ownNode].ServiceEndpoint(), } var locked bool @@ -222,7 +224,7 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) done := false timeout := time.After(DRWMutexAcquireTimeout) - for ; i < dnodeCount; i++ { // Loop until we acquired all locks + for ; i < ds.dNodeCount; i++ { // Loop until we acquired all locks select { case grant := <-ch: @@ -231,22 +233,22 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) (*locks)[grant.index] = grant.lockUID } else { locksFailed++ - if !isReadLock && locksFailed > dnodeCount-dquorum || - isReadLock && locksFailed > dnodeCount-dquorumReads { + if !isReadLock && locksFailed > ds.dNodeCount-ds.dquorum || + isReadLock && locksFailed > ds.dNodeCount-ds.dquorumReads { // We know that we are not going to get the lock anymore, // so exit out and release any locks that did get acquired done = true // Increment the number of grants received from the buffered channel. i++ - releaseAll(clnts, locks, lockName, isReadLock) + releaseAll(ds, locks, lockName, isReadLock) } } case <-timeout: done = true // timeout happened, maybe one of the nodes is slow, count // number of locks to check whether we have quorum or not - if !quorumMet(locks, isReadLock) { - releaseAll(clnts, locks, lockName, isReadLock) + if !quorumMet(locks, isReadLock, ds.dquorum, ds.dquorumReads) { + releaseAll(ds, locks, lockName, isReadLock) } } @@ -256,7 +258,7 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) } // Count locks in order to determine whether we have quorum or not - quorum = quorumMet(locks, isReadLock) + quorum = quorumMet(locks, isReadLock, ds.dquorum, ds.dquorumReads) // Signal that we have the quorum wg.Done() @@ -264,11 +266,11 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) // Wait for the other responses and immediately release the locks // (do not add them to the locks array because the DRWMutex could // already has been unlocked again by the original calling thread) - for ; i < dnodeCount; i++ { + for ; i < ds.dNodeCount; i++ { grantToBeReleased := <-ch if grantToBeReleased.isLocked() { // release lock - sendRelease(clnts[grantToBeReleased.index], lockName, grantToBeReleased.lockUID, isReadLock) + sendRelease(ds, ds.rpcClnts[grantToBeReleased.index], lockName, grantToBeReleased.lockUID, isReadLock) } } }(isReadLock) @@ -276,9 +278,9 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) wg.Wait() // Verify that localhost server is actively participating in the lock (the lock maintenance relies on this fact) - if quorum && !isLocked((*locks)[ownNode]) { + if quorum && !isLocked((*locks)[ds.ownNode]) { // If not, release lock (and try again later) - releaseAll(clnts, locks, lockName, isReadLock) + releaseAll(ds, locks, lockName, isReadLock) quorum = false } @@ -286,7 +288,7 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) } // quorumMet determines whether we have acquired the required quorum of underlying locks or not -func quorumMet(locks *[]string, isReadLock bool) bool { +func quorumMet(locks *[]string, isReadLock bool, quorum, quorumReads int) bool { count := 0 for _, uid := range *locks { @@ -295,21 +297,21 @@ func quorumMet(locks *[]string, isReadLock bool) bool { } } - var quorum bool + var metQuorum bool if isReadLock { - quorum = count >= dquorumReads + metQuorum = count >= quorumReads } else { - quorum = count >= dquorum + metQuorum = count >= quorum } - return quorum + return metQuorum } // releaseAll releases all locks that are marked as locked -func releaseAll(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) { - for lock := 0; lock < dnodeCount; lock++ { +func releaseAll(ds *Dsync, locks *[]string, lockName string, isReadLock bool) { + for lock := 0; lock < ds.dNodeCount; lock++ { if isLocked((*locks)[lock]) { - sendRelease(clnts[lock], lockName, (*locks)[lock], isReadLock) + sendRelease(ds, ds.rpcClnts[lock], lockName, (*locks)[lock], isReadLock) (*locks)[lock] = "" } } @@ -321,7 +323,7 @@ func releaseAll(clnts []NetLocker, locks *[]string, lockName string, isReadLock func (dm *DRWMutex) Unlock() { // create temp array on stack - locks := make([]string, dnodeCount) + locks := make([]string, dm.clnt.dNodeCount) { dm.m.Lock() @@ -342,11 +344,11 @@ func (dm *DRWMutex) Unlock() { // Copy write locks to stack array copy(locks, dm.writeLocks[:]) // Clear write locks array - dm.writeLocks = make([]string, dnodeCount) + dm.writeLocks = make([]string, dm.clnt.dNodeCount) } isReadLock := false - unlock(locks, dm.Name, isReadLock) + unlock(dm.clnt, locks, dm.Name, isReadLock) } // RUnlock releases a read lock held on dm. @@ -355,7 +357,7 @@ func (dm *DRWMutex) Unlock() { func (dm *DRWMutex) RUnlock() { // create temp array on stack - locks := make([]string, dnodeCount) + locks := make([]string, dm.clnt.dNodeCount) { dm.m.Lock() @@ -370,19 +372,19 @@ func (dm *DRWMutex) RUnlock() { } isReadLock := true - unlock(locks, dm.Name, isReadLock) + unlock(dm.clnt, locks, dm.Name, isReadLock) } -func unlock(locks []string, name string, isReadLock bool) { +func unlock(ds *Dsync, locks []string, name string, isReadLock bool) { // We don't need to synchronously wait until we have released all the locks (or the quorum) // (a subsequent lock will retry automatically in case it would fail to get quorum) - for index, c := range clnts { + for index, c := range ds.rpcClnts { if isLocked(locks[index]) { // broadcast lock release to all nodes that granted the lock - sendRelease(c, name, locks[index], isReadLock) + sendRelease(ds, c, name, locks[index], isReadLock) } } } @@ -394,24 +396,24 @@ func (dm *DRWMutex) ForceUnlock() { defer dm.m.Unlock() // Clear write locks array - dm.writeLocks = make([]string, dnodeCount) + dm.writeLocks = make([]string, dm.clnt.dNodeCount) // Clear read locks array dm.readersLocks = nil } - for _, c := range clnts { + for _, c := range dm.clnt.rpcClnts { // broadcast lock release to all nodes that granted the lock - sendRelease(c, dm.Name, "", false) + sendRelease(dm.clnt, c, dm.Name, "", false) } } // sendRelease sends a release message to a node that previously granted a lock -func sendRelease(c NetLocker, name, uid string, isReadLock bool) { +func sendRelease(ds *Dsync, c NetLocker, name, uid string, isReadLock bool) { args := LockArgs{ UID: uid, Resource: name, - ServerAddr: clnts[ownNode].ServerAddr(), - ServiceEndpoint: clnts[ownNode].ServiceEndpoint(), + ServerAddr: ds.rpcClnts[ds.ownNode].ServerAddr(), + ServiceEndpoint: ds.rpcClnts[ds.ownNode].ServiceEndpoint(), } if len(uid) == 0 { if _, err := c.ForceUnlock(args); err != nil { diff --git a/vendor/github.com/minio/dsync/dsync.go b/vendor/github.com/minio/dsync/dsync.go index ba027d54f..f88438ab5 100644 --- a/vendor/github.com/minio/dsync/dsync.go +++ b/vendor/github.com/minio/dsync/dsync.go @@ -16,51 +16,52 @@ package dsync -import "errors" +import ( + "errors" +) -// Number of nodes participating in the distributed locking. -var dnodeCount int +// Dsync represents dsync client object which is initialized with +// authenticated clients, used to initiate lock RPC calls. +type Dsync struct { + // Number of nodes participating in the distributed locking. + dNodeCount int -// List of rpc client objects, one per lock server. -var clnts []NetLocker + // List of rpc client objects, one per lock server. + rpcClnts []NetLocker -// Index into rpc client array for server running on localhost -var ownNode int + // Index into rpc client array for server running on localhost + ownNode int -// Simple majority based quorum, set to dNodeCount/2+1 -var dquorum int + // Simple majority based quorum, set to dNodeCount/2+1 + dquorum int -// Simple quorum for read operations, set to dNodeCount/2 -var dquorumReads int + // Simple quorum for read operations, set to dNodeCount/2 + dquorumReads int +} -// Init - initializes package-level global state variables such as clnts. -// N B - This function should be called only once inside any program -// that uses dsync. -func Init(rpcClnts []NetLocker, rpcOwnNode int) (err error) { - - // Validate if number of nodes is within allowable range. - if dnodeCount != 0 { - return errors.New("Cannot reinitialize dsync package") - } +// New - initializes a new dsync object with input rpcClnts. +func New(rpcClnts []NetLocker, rpcOwnNode int) (*Dsync, error) { if len(rpcClnts) < 4 { - return errors.New("Dsync is not designed for less than 4 nodes") + return nil, errors.New("Dsync is not designed for less than 4 nodes") } else if len(rpcClnts) > 16 { - return errors.New("Dsync is not designed for more than 16 nodes") + return nil, errors.New("Dsync is not designed for more than 16 nodes") } else if len(rpcClnts)%2 != 0 { - return errors.New("Dsync is not designed for an uneven number of nodes") + return nil, errors.New("Dsync is not designed for an uneven number of nodes") } if rpcOwnNode > len(rpcClnts) { - return errors.New("Index for own node is too large") + return nil, errors.New("Index for own node is too large") } - dnodeCount = len(rpcClnts) - dquorum = dnodeCount/2 + 1 - dquorumReads = dnodeCount / 2 - // Initialize node name and rpc path for each NetLocker object. - clnts = make([]NetLocker, dnodeCount) - copy(clnts, rpcClnts) + ds := &Dsync{} + ds.dNodeCount = len(rpcClnts) + ds.dquorum = ds.dNodeCount/2 + 1 + ds.dquorumReads = ds.dNodeCount / 2 + ds.ownNode = rpcOwnNode - ownNode = rpcOwnNode - return nil + // Initialize node name and rpc path for each NetLocker object. + ds.rpcClnts = make([]NetLocker, ds.dNodeCount) + copy(ds.rpcClnts, rpcClnts) + + return ds, nil } diff --git a/vendor/vendor.json b/vendor/vendor.json index ae0383c6a..460f54194 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -380,10 +380,10 @@ "revisionTime": "2017-02-27T07:32:28Z" }, { - "checksumSHA1": "hQ8i4UPTbFW68oPJP3uFxYTLfxk=", + "checksumSHA1": "qhWQM7xmqaxFqADNTj8YPjE/8Ws=", "path": "github.com/minio/dsync", - "revision": "a26b9de6c8006208d10a9517720d3212b42c374e", - "revisionTime": "2017-05-25T17:53:53Z" + "revision": "ed0989bc6c7b199f749fa6be0b7ee98d689b88c7", + "revisionTime": "2017-11-22T09:16:00Z" }, { "path": "github.com/minio/go-homedir",