Vendorize rpc reconnect changes from minio/dsync (#2405)

This commit is contained in:
Krishnan Parthasarathi 2016-08-11 00:20:19 -07:00 committed by Harshavardhana
parent 61af764f8a
commit 64a7e6992e
3 changed files with 38 additions and 23 deletions

View File

@ -34,7 +34,6 @@ type DMutex struct {
uids []string // Array of uids for verification of sending correct release messages uids []string // Array of uids for verification of sending correct release messages
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
// TODO: Decide: create per object or create once for whole class
} }
type Granted struct { type Granted struct {
@ -43,15 +42,25 @@ type Granted struct {
uid string uid string
} }
// Connect to respective lock server nodes on the first Lock() call.
func connectLazy(dm *DMutex) { func connectLazy(dm *DMutex) {
if clnts == nil { if clnts == nil {
clnts = make([]*rpc.Client, n) panic("rpc client connections weren't initialized.")
} }
for i := range clnts { for i := range clnts {
if clnts[i] == nil { if clnts[i].rpc != nil {
// pass in unique path (as required by server.HandleHTTP() continue
clnts[i], _ = rpc.DialHTTPPath("tcp", nodes[i], rpcPaths[i])
} }
// Pass in unique path (as required by server.HandleHTTP().
// Ignore failure to connect, the lock server node may join the
// cluster later.
clnt, err := rpc.DialHTTPPath("tcp", nodes[i], rpcPaths[i])
if err != nil {
clnts[i].SetRPC(nil)
continue
}
clnts[i].SetRPC(clnt)
} }
} }
@ -69,7 +78,6 @@ func (dm *DMutex) Lock() {
runs, backOff := 1, 1 runs, backOff := 1, 1
for { for {
// TODO: Implement reconnect
connectLazy(dm) connectLazy(dm)
// create temp arrays on stack // create temp arrays on stack
@ -130,18 +138,15 @@ func (dm *DMutex) tryLockTimeout() bool {
// lock tries to acquire the distributed lock, returning true or false // lock tries to acquire the distributed lock, returning true or false
// //
func lock(clnts []*rpc.Client, locks *[]bool, uids *[]string, lockName string) bool { func lock(clnts []*RPCClient, locks *[]bool, uids *[]string, lockName string) bool {
// Create buffered channel of quorum size // Create buffered channel of quorum size
ch := make(chan Granted, n/2+1) ch := make(chan Granted, n/2+1)
for index, c := range clnts { for index, c := range clnts {
if c == nil {
continue
}
// broadcast lock request to all nodes // broadcast lock request to all nodes
go func(index int, c *rpc.Client) { go func(index int, c *RPCClient) {
// All client methods issuing RPCs are thread-safe and goroutine-safe, // All client methods issuing RPCs are thread-safe and goroutine-safe,
// i.e. it is safe to call them from multiple concurrently running go routines. // i.e. it is safe to call them from multiple concurrently running go routines.
var status bool var status bool
@ -153,6 +158,11 @@ func lock(clnts []*rpc.Client, locks *[]bool, uids *[]string, lockName string) b
// TODO: Get UIOD again // TODO: Get UIOD again
uid = "" uid = ""
} else { } else {
// If rpc call failed due to connection related errors, reset rpc.Client object
// to trigger reconnect on subsequent Lock()/Unlock() requests to the same node.
if IsRPCError(err) {
clnts[index].SetRPC(nil)
}
// silently ignore error, retry later // silently ignore error, retry later
} }
@ -239,7 +249,7 @@ func quorumMet(locks *[]bool) bool {
} }
// releaseAll releases all locks that are marked as locked // releaseAll releases all locks that are marked as locked
func releaseAll(clnts []*rpc.Client, locks *[]bool, ids *[]string, lockName string) { func releaseAll(clnts []*RPCClient, locks *[]bool, ids *[]string, lockName string) {
for lock := 0; lock < n; lock++ { for lock := 0; lock < n; lock++ {
if (*locks)[lock] { if (*locks)[lock] {
@ -297,7 +307,7 @@ func (dm *DMutex) Unlock() {
} }
// sendRelease sends a release message to a node that previously granted a lock // sendRelease sends a release message to a node that previously granted a lock
func sendRelease(c *rpc.Client, name, uid string) { func sendRelease(c *RPCClient, name, uid string) {
// All client methods issuing RPCs are thread-safe and goroutine-safe, // All client methods issuing RPCs are thread-safe and goroutine-safe,
// i.e. it is safe to call them from multiple concurrently running goroutines. // i.e. it is safe to call them from multiple concurrently running goroutines.

View File

@ -16,10 +16,7 @@
package dsync package dsync
import ( import "errors"
"errors"
"net/rpc"
)
const RpcPath = "/dsync" const RpcPath = "/dsync"
const DebugPath = "/debug" const DebugPath = "/debug"
@ -29,15 +26,18 @@ const DefaultPath = "/rpc/dsync"
var n int var n int
var nodes []string var nodes []string
var rpcPaths []string var rpcPaths []string
var clnts []*rpc.Client var clnts []*RPCClient
func closeClients(clients []*rpc.Client) { func closeClients(clients []*RPCClient) {
for _, clnt := range clients { for _, clnt := range clients {
clnt.Close() clnt.Close()
} }
} }
// Same as SetNodes, but takes a slice of rpc paths as argument different from the package-level default. // SetNodesWithPath - initializes package-level global state variables such as
// nodes, rpcPaths, clnts.
// N B - This function should be called only once inside any program that uses
// dsync.
func SetNodesWithPath(nodeList []string, paths []string) (err error) { func SetNodesWithPath(nodeList []string, paths []string) (err error) {
// Validate if number of nodes is within allowable range. // Validate if number of nodes is within allowable range.
@ -54,5 +54,10 @@ func SetNodesWithPath(nodeList []string, paths []string) (err error) {
rpcPaths = make([]string, len(paths)) rpcPaths = make([]string, len(paths))
copy(rpcPaths, paths[:]) copy(rpcPaths, paths[:])
n = len(nodes) n = len(nodes)
clnts = make([]*RPCClient, n)
// Initialize node name and rpc path for each RPCClient object.
for i := range clnts {
clnts[i] = newClient(nodes[i], rpcPaths[i])
}
return nil return nil
} }

6
vendor/vendor.json vendored
View File

@ -98,10 +98,10 @@
"revisionTime": "2015-11-18T20:00:48-08:00" "revisionTime": "2015-11-18T20:00:48-08:00"
}, },
{ {
"checksumSHA1": "BqEf+ElZXcofLdav5iGfHH93vMY=", "checksumSHA1": "Ev8FdU+RSmpHQsLGzRpg5/ka7zE=",
"path": "github.com/minio/dsync", "path": "github.com/minio/dsync",
"revision": "9c1a398a7d687901939a31d50f8639b11bb3c5fe", "revision": "c84de1533e4cd403cdaaf9c4018e12c60a916e23",
"revisionTime": "2016-08-10T17:09:05Z" "revisionTime": "2016-08-10T23:17:05Z"
}, },
{ {
"path": "github.com/minio/go-homedir", "path": "github.com/minio/go-homedir",