From 64a7e6992e2ddea6fe74a410ae37cb918208f4d8 Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Thu, 11 Aug 2016 00:20:19 -0700 Subject: [PATCH] Vendorize rpc reconnect changes from minio/dsync (#2405) --- vendor/github.com/minio/dsync/dmutex.go | 36 ++++++++++++++++--------- vendor/github.com/minio/dsync/dsync.go | 19 ++++++++----- vendor/vendor.json | 6 ++--- 3 files changed, 38 insertions(+), 23 deletions(-) diff --git a/vendor/github.com/minio/dsync/dmutex.go b/vendor/github.com/minio/dsync/dmutex.go index 474d13ff8..7fcdc0927 100644 --- a/vendor/github.com/minio/dsync/dmutex.go +++ b/vendor/github.com/minio/dsync/dmutex.go @@ -34,7 +34,6 @@ type DMutex struct { uids []string // Array of uids for verification of sending correct release messages m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node - // TODO: Decide: create per object or create once for whole class } type Granted struct { @@ -43,15 +42,25 @@ type Granted struct { uid string } +// Connect to respective lock server nodes on the first Lock() call. func connectLazy(dm *DMutex) { if clnts == nil { - clnts = make([]*rpc.Client, n) + panic("rpc client connections weren't initialized.") } for i := range clnts { - if clnts[i] == nil { - // pass in unique path (as required by server.HandleHTTP() - clnts[i], _ = rpc.DialHTTPPath("tcp", nodes[i], rpcPaths[i]) + if clnts[i].rpc != nil { + continue } + + // Pass in unique path (as required by server.HandleHTTP(). + // Ignore failure to connect, the lock server node may join the + // cluster later. + clnt, err := rpc.DialHTTPPath("tcp", nodes[i], rpcPaths[i]) + if err != nil { + clnts[i].SetRPC(nil) + continue + } + clnts[i].SetRPC(clnt) } } @@ -69,7 +78,6 @@ func (dm *DMutex) Lock() { runs, backOff := 1, 1 for { - // TODO: Implement reconnect connectLazy(dm) // create temp arrays on stack @@ -130,18 +138,15 @@ func (dm *DMutex) tryLockTimeout() bool { // lock tries to acquire the distributed lock, returning true or false // -func lock(clnts []*rpc.Client, locks *[]bool, uids *[]string, lockName string) bool { +func lock(clnts []*RPCClient, locks *[]bool, uids *[]string, lockName string) bool { // Create buffered channel of quorum size ch := make(chan Granted, n/2+1) for index, c := range clnts { - if c == nil { - continue - } // broadcast lock request to all nodes - go func(index int, c *rpc.Client) { + go func(index int, c *RPCClient) { // All client methods issuing RPCs are thread-safe and goroutine-safe, // i.e. it is safe to call them from multiple concurrently running go routines. var status bool @@ -153,6 +158,11 @@ func lock(clnts []*rpc.Client, locks *[]bool, uids *[]string, lockName string) b // TODO: Get UIOD again uid = "" } else { + // If rpc call failed due to connection related errors, reset rpc.Client object + // to trigger reconnect on subsequent Lock()/Unlock() requests to the same node. + if IsRPCError(err) { + clnts[index].SetRPC(nil) + } // silently ignore error, retry later } @@ -239,7 +249,7 @@ func quorumMet(locks *[]bool) bool { } // releaseAll releases all locks that are marked as locked -func releaseAll(clnts []*rpc.Client, locks *[]bool, ids *[]string, lockName string) { +func releaseAll(clnts []*RPCClient, locks *[]bool, ids *[]string, lockName string) { for lock := 0; lock < n; lock++ { if (*locks)[lock] { @@ -297,7 +307,7 @@ func (dm *DMutex) Unlock() { } // sendRelease sends a release message to a node that previously granted a lock -func sendRelease(c *rpc.Client, name, uid string) { +func sendRelease(c *RPCClient, name, uid string) { // All client methods issuing RPCs are thread-safe and goroutine-safe, // i.e. it is safe to call them from multiple concurrently running goroutines. diff --git a/vendor/github.com/minio/dsync/dsync.go b/vendor/github.com/minio/dsync/dsync.go index fd44771da..66610bdf1 100644 --- a/vendor/github.com/minio/dsync/dsync.go +++ b/vendor/github.com/minio/dsync/dsync.go @@ -16,10 +16,7 @@ package dsync -import ( - "errors" - "net/rpc" -) +import "errors" const RpcPath = "/dsync" const DebugPath = "/debug" @@ -29,15 +26,18 @@ const DefaultPath = "/rpc/dsync" var n int var nodes []string var rpcPaths []string -var clnts []*rpc.Client +var clnts []*RPCClient -func closeClients(clients []*rpc.Client) { +func closeClients(clients []*RPCClient) { for _, clnt := range clients { clnt.Close() } } -// Same as SetNodes, but takes a slice of rpc paths as argument different from the package-level default. +// SetNodesWithPath - initializes package-level global state variables such as +// nodes, rpcPaths, clnts. +// N B - This function should be called only once inside any program that uses +// dsync. func SetNodesWithPath(nodeList []string, paths []string) (err error) { // Validate if number of nodes is within allowable range. @@ -54,5 +54,10 @@ func SetNodesWithPath(nodeList []string, paths []string) (err error) { rpcPaths = make([]string, len(paths)) copy(rpcPaths, paths[:]) n = len(nodes) + clnts = make([]*RPCClient, n) + // Initialize node name and rpc path for each RPCClient object. + for i := range clnts { + clnts[i] = newClient(nodes[i], rpcPaths[i]) + } return nil } diff --git a/vendor/vendor.json b/vendor/vendor.json index a3893f063..ae0ed47ac 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -98,10 +98,10 @@ "revisionTime": "2015-11-18T20:00:48-08:00" }, { - "checksumSHA1": "BqEf+ElZXcofLdav5iGfHH93vMY=", + "checksumSHA1": "Ev8FdU+RSmpHQsLGzRpg5/ka7zE=", "path": "github.com/minio/dsync", - "revision": "9c1a398a7d687901939a31d50f8639b11bb3c5fe", - "revisionTime": "2016-08-10T17:09:05Z" + "revision": "c84de1533e4cd403cdaaf9c4018e12c60a916e23", + "revisionTime": "2016-08-10T23:17:05Z" }, { "path": "github.com/minio/go-homedir",