From bda6bcd5be84eaf95920fbf8581d0a82a7da9a1f Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Mon, 22 Aug 2016 11:01:21 -0700 Subject: [PATCH] Layered rpc-client implementation (#2512) --- cmd/auth-rpc-client.go | 79 ++++++++++++ cmd/lock-rpc-server.go | 96 +++++++++++---- cmd/namespace-lock.go | 6 +- cmd/storage-rpc-client.go | 112 +++++++----------- cmd/storage-rpc-server-datatypes.go | 54 ++++++--- cmd/storage-rpc-server.go | 4 +- vendor/github.com/minio/dsync/drwmutex.go | 73 ++++-------- vendor/github.com/minio/dsync/dsync.go | 29 ++--- .../minio/dsync/rpc-client-interface.go | 29 +++++ vendor/github.com/minio/dsync/rpc-client.go | 77 ------------ vendor/vendor.json | 6 +- 11 files changed, 307 insertions(+), 258 deletions(-) create mode 100644 cmd/auth-rpc-client.go create mode 100644 vendor/github.com/minio/dsync/rpc-client-interface.go delete mode 100644 vendor/github.com/minio/dsync/rpc-client.go diff --git a/cmd/auth-rpc-client.go b/cmd/auth-rpc-client.go new file mode 100644 index 000000000..3018d23b9 --- /dev/null +++ b/cmd/auth-rpc-client.go @@ -0,0 +1,79 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "time" + + "github.com/minio/dsync" +) + +// AuthRPCClient is a wrapper type for RPCClient which provides JWT based authentication across reconnects. +type AuthRPCClient struct { + rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client + cred credential // AccessKey and SecretKey + token string // JWT based token + tstamp time.Time // Timestamp as received on Login RPC. + loginMethod string // RPC service name for authenticating using JWT +} + +// newAuthClient - returns a jwt based authenticated (go) rpc client, which does automatic reconnect. +func newAuthClient(node, rpcPath string, cred credential, loginMethod string) *AuthRPCClient { + return &AuthRPCClient{ + rpc: newClient(node, rpcPath), + cred: cred, + loginMethod: loginMethod, + } +} + +// Close - closes underlying rpc connection. +func (authClient *AuthRPCClient) Close() error { + // reset token on closing a connection + authClient.token = "" + return authClient.rpc.Close() +} + +// Login - a jwt based authentication is performed with rpc server. +func (authClient *AuthRPCClient) Login() (string, time.Time, error) { + reply := RPCLoginReply{} + if err := authClient.rpc.Call(authClient.loginMethod, RPCLoginArgs{ + Username: authClient.cred.AccessKeyID, + Password: authClient.cred.SecretAccessKey, + }, &reply); err != nil { + return "", time.Time{}, err + } + return reply.Token, reply.Timestamp, nil +} + +// Call - If rpc connection isn't established yet since previous disconnect, +// connection is established, a jwt authenticated login is performed and then +// the call is performed. +func (authClient *AuthRPCClient) Call(serviceMethod string, args dsync.TokenSetter, reply interface{}) (err error) { + if authClient.token == "" { + token, tstamp, err := authClient.Login() + if err != nil { + return err + } + // set token, time stamp as received from a successful login call. + authClient.token = token + authClient.tstamp = tstamp + // Update the RPC call's token with that received from the recent login call. + args.SetToken(token) + args.SetTimestamp(tstamp) + } + return authClient.rpc.Call(serviceMethod, args, reply) +} diff --git a/cmd/lock-rpc-server.go b/cmd/lock-rpc-server.go index 30ac8d246..bc757d160 100644 --- a/cmd/lock-rpc-server.go +++ b/cmd/lock-rpc-server.go @@ -17,36 +17,82 @@ package cmd import ( + "errors" "fmt" "net/rpc" "path" "strings" "sync" + "time" router "github.com/gorilla/mux" ) const lockRPCPath = "/minio/lock" +// LockArgs besides lock name, holds Token and Timestamp for session +// authentication and validation server restart. +type LockArgs struct { + Name string + Token string + Timestamp time.Time +} + +// SetToken - sets the token to the supplied value. +func (l *LockArgs) SetToken(token string) { + l.Token = token +} + +// SetTimestamp - sets the timestamp to the supplied value. +func (l *LockArgs) SetTimestamp(tstamp time.Time) { + l.Timestamp = tstamp +} + type lockServer struct { rpcPath string mutex sync.Mutex // e.g, when a Lock(name) is held, map[string][]bool{"name" : []bool{true}} // when one or more RLock() is held, map[string][]bool{"name" : []bool{false, false}} - lockMap map[string][]bool + lockMap map[string][]bool + timestamp time.Time // Timestamp set at the time of initialization. Resets naturally on minio server restart. +} + +func (l *lockServer) verifyTimestamp(tstamp time.Time) bool { + return l.timestamp.Equal(tstamp) } /// Distributed lock handlers +// LoginHandler - handles LoginHandler RPC call. +func (l *lockServer) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) error { + jwt, err := newJWT(defaultTokenExpiry) + if err != nil { + return err + } + if err = jwt.Authenticate(args.Username, args.Password); err != nil { + return err + } + token, err := jwt.GenerateToken(args.Username) + if err != nil { + return err + } + reply.Token = token + reply.Timestamp = l.timestamp + return nil +} + // LockHandler - rpc handler for lock operation. -func (l *lockServer) Lock(name *string, reply *bool) error { +func (l *lockServer) Lock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - _, ok := l.lockMap[*name] + if l.verifyTimestamp(args.Timestamp) { + return errors.New("Timestamps don't match, server may have restarted.") + } + _, ok := l.lockMap[args.Name] // No locks held on the given name. if !ok { *reply = true - l.lockMap[*name] = []bool{true} + l.lockMap[args.Name] = []bool{true} return nil } // Either a read or write lock is held on the given name. @@ -55,56 +101,65 @@ func (l *lockServer) Lock(name *string, reply *bool) error { } // UnlockHandler - rpc handler for unlock operation. -func (l *lockServer) Unlock(name *string, reply *bool) error { +func (l *lockServer) Unlock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - _, ok := l.lockMap[*name] + if l.verifyTimestamp(args.Timestamp) { + return errors.New("Timestamps don't match, server may have restarted.") + } + _, ok := l.lockMap[args.Name] // No lock is held on the given name, there must be some issue at the lock client side. if !ok { - return fmt.Errorf("Unlock attempted on an un-locked entity: %s", *name) + return fmt.Errorf("Unlock attempted on an un-locked entity: %s", args.Name) } *reply = true - delete(l.lockMap, *name) + delete(l.lockMap, args.Name) return nil } -func (l *lockServer) RLock(name *string, reply *bool) error { +func (l *lockServer) RLock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - locksHeld, ok := l.lockMap[*name] + if l.verifyTimestamp(args.Timestamp) { + return errors.New("Timestamps don't match, server may have restarted.") + } + locksHeld, ok := l.lockMap[args.Name] // No locks held on the given name. if !ok { // First read-lock to be held on *name. - l.lockMap[*name] = []bool{false} + l.lockMap[args.Name] = []bool{false} *reply = true } else if len(locksHeld) == 1 && locksHeld[0] == true { // A write-lock is held, read lock can't be granted. *reply = false } else { // Add an entry for this read lock. - l.lockMap[*name] = append(locksHeld, false) + l.lockMap[args.Name] = append(locksHeld, false) *reply = true } return nil } -func (l *lockServer) RUnlock(name *string, reply *bool) error { +func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - locksHeld, ok := l.lockMap[*name] + if l.verifyTimestamp(args.Timestamp) { + return errors.New("Timestamps don't match, server may have restarted.") + } + locksHeld, ok := l.lockMap[args.Name] if !ok { - return fmt.Errorf("RUnlock attempted on an un-locked entity: %s", *name) + return fmt.Errorf("RUnlock attempted on an un-locked entity: %s", args.Name) } if len(locksHeld) > 1 { // Remove one of the read locks held. locksHeld = locksHeld[1:] - l.lockMap[*name] = locksHeld + l.lockMap[args.Name] = locksHeld *reply = true } else { // Delete the map entry since this is the last read lock held // on *name. - delete(l.lockMap, *name) + delete(l.lockMap, args.Name) *reply = true } return nil @@ -136,9 +191,10 @@ func newLockServers(serverConfig serverCmdConfig) (lockServers []*lockServer) { export = export[idx+1:] } lockServers = append(lockServers, &lockServer{ - rpcPath: export, - mutex: sync.Mutex{}, - lockMap: make(map[string][]bool), + rpcPath: export, + mutex: sync.Mutex{}, + lockMap: make(map[string][]bool), + timestamp: time.Now().UTC(), }) } } diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 746de5e24..53bb5b124 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -56,8 +56,12 @@ func initDsyncNodes(disks []string, port int) (bool, error) { } // Initialize rpc lock client information only if this instance is a // distributed setup. + clnts := make([]dsync.RPC, len(disks)) + for i := 0; i < len(disks); i++ { + clnts[i] = newAuthClient(dsyncNodes[i], rpcPaths[i], serverConfig.GetCredential(), "Dsync.LoginHandler") + } if isDist { - return isDist, dsync.SetNodesWithPath(dsyncNodes, rpcPaths) + return isDist, dsync.SetNodesWithClients(clnts) } return isDist, nil } diff --git a/cmd/storage-rpc-client.go b/cmd/storage-rpc-client.go index 0fc6973ff..7a040a9ec 100644 --- a/cmd/storage-rpc-client.go +++ b/cmd/storage-rpc-client.go @@ -27,8 +27,7 @@ type networkStorage struct { netScheme string netAddr string netPath string - rpcClient *RPCClient - rpcToken string + rpcClient *AuthRPCClient } const ( @@ -75,22 +74,6 @@ func toStorageErr(err error) error { return err } -// Login rpc client makes an authentication request to the rpc server. -// Receives a session token which will be used for subsequent requests. -// FIXME: Currently these tokens expire in 100yrs. -func loginRPCClient(rpcClient *RPCClient) (tokenStr string, err error) { - cred := serverConfig.GetCredential() - reply := RPCLoginReply{} - if err = rpcClient.Call("Storage.LoginHandler", RPCLoginArgs{ - Username: cred.AccessKeyID, - Password: cred.SecretAccessKey, - }, &reply); err != nil { - return "", err - } - // Reply back server provided token. - return reply.Token, nil -} - // Initialize new rpc client. func newRPCClient(networkPath string) (StorageAPI, error) { // Input validation. @@ -109,16 +92,8 @@ func newRPCClient(networkPath string) (StorageAPI, error) { port := getPort(srvConfig.serverAddr) rpcAddr := netAddr + ":" + strconv.Itoa(port) // Initialize rpc client with network address and rpc path. - rpcClient := newClient(rpcAddr, rpcPath) - - token, err := loginRPCClient(rpcClient) - if err != nil { - // Close the corresponding network connection w/ server to - // avoid leaking socket file descriptor. - rpcClient.Close() - return nil, err - } - + cred := serverConfig.GetCredential() + rpcClient := newAuthClient(rpcAddr, rpcPath, cred, "Storage.LoginHandler") // Initialize network storage. ndisk := &networkStorage{ netScheme: func() string { @@ -130,7 +105,6 @@ func newRPCClient(networkPath string) (StorageAPI, error) { netAddr: netAddr, netPath: netPath, rpcClient: rpcClient, - rpcToken: token, } // Returns successfully here. @@ -143,8 +117,8 @@ func (n networkStorage) MakeVol(volume string) error { return errVolumeBusy } reply := GenericReply{} - args := GenericVolArgs{n.rpcToken, volume} - if err := n.rpcClient.Call("Storage.MakeVolHandler", args, &reply); err != nil { + args := GenericVolArgs{GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, volume} + if err := n.rpcClient.Call("Storage.MakeVolHandler", &args, &reply); err != nil { return toStorageErr(err) } return nil @@ -156,7 +130,7 @@ func (n networkStorage) ListVols() (vols []VolInfo, err error) { return nil, errVolumeBusy } ListVols := ListVolsReply{} - err = n.rpcClient.Call("Storage.ListVolsHandler", n.rpcToken, &ListVols) + err = n.rpcClient.Call("Storage.ListVolsHandler", &GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, &ListVols) if err != nil { return nil, err } @@ -168,8 +142,8 @@ func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) { if n.rpcClient == nil { return VolInfo{}, errVolumeBusy } - args := GenericVolArgs{n.rpcToken, volume} - if err = n.rpcClient.Call("Storage.StatVolHandler", args, &volInfo); err != nil { + args := GenericVolArgs{GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, volume} + if err = n.rpcClient.Call("Storage.StatVolHandler", &args, &volInfo); err != nil { return VolInfo{}, toStorageErr(err) } return volInfo, nil @@ -181,8 +155,8 @@ func (n networkStorage) DeleteVol(volume string) error { return errVolumeBusy } reply := GenericReply{} - args := GenericVolArgs{n.rpcToken, volume} - if err := n.rpcClient.Call("Storage.DeleteVolHandler", args, &reply); err != nil { + args := GenericVolArgs{GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, volume} + if err := n.rpcClient.Call("Storage.DeleteVolHandler", &args, &reply); err != nil { return toStorageErr(err) } return nil @@ -196,11 +170,11 @@ func (n networkStorage) AppendFile(volume, path string, buffer []byte) (err erro return errVolumeBusy } reply := GenericReply{} - if err = n.rpcClient.Call("Storage.AppendFileHandler", AppendFileArgs{ - Token: n.rpcToken, - Vol: volume, - Path: path, - Buffer: buffer, + if err = n.rpcClient.Call("Storage.AppendFileHandler", &AppendFileArgs{ + GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, + Vol: volume, + Path: path, + Buffer: buffer, }, &reply); err != nil { return toStorageErr(err) } @@ -212,10 +186,10 @@ func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err er if n.rpcClient == nil { return FileInfo{}, errVolumeBusy } - if err = n.rpcClient.Call("Storage.StatFileHandler", StatFileArgs{ - Token: n.rpcToken, - Vol: volume, - Path: path, + if err = n.rpcClient.Call("Storage.StatFileHandler", &StatFileArgs{ + GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, + Vol: volume, + Path: path, }, &fileInfo); err != nil { return FileInfo{}, toStorageErr(err) } @@ -230,10 +204,10 @@ func (n networkStorage) ReadAll(volume, path string) (buf []byte, err error) { if n.rpcClient == nil { return nil, errVolumeBusy } - if err = n.rpcClient.Call("Storage.ReadAllHandler", ReadAllArgs{ - Token: n.rpcToken, - Vol: volume, - Path: path, + if err = n.rpcClient.Call("Storage.ReadAllHandler", &ReadAllArgs{ + GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, + Vol: volume, + Path: path, }, &buf); err != nil { return nil, toStorageErr(err) } @@ -246,12 +220,12 @@ func (n networkStorage) ReadFile(volume string, path string, offset int64, buffe return 0, errVolumeBusy } var result []byte - err = n.rpcClient.Call("Storage.ReadFileHandler", ReadFileArgs{ - Token: n.rpcToken, - Vol: volume, - Path: path, - Offset: offset, - Size: len(buffer), + err = n.rpcClient.Call("Storage.ReadFileHandler", &ReadFileArgs{ + GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, + Vol: volume, + Path: path, + Offset: offset, + Size: len(buffer), }, &result) // Copy results to buffer. copy(buffer, result) @@ -264,10 +238,10 @@ func (n networkStorage) ListDir(volume, path string) (entries []string, err erro if n.rpcClient == nil { return nil, errVolumeBusy } - if err = n.rpcClient.Call("Storage.ListDirHandler", ListDirArgs{ - Token: n.rpcToken, - Vol: volume, - Path: path, + if err = n.rpcClient.Call("Storage.ListDirHandler", &ListDirArgs{ + GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, + Vol: volume, + Path: path, }, &entries); err != nil { return nil, toStorageErr(err) } @@ -281,10 +255,10 @@ func (n networkStorage) DeleteFile(volume, path string) (err error) { return errVolumeBusy } reply := GenericReply{} - if err = n.rpcClient.Call("Storage.DeleteFileHandler", DeleteFileArgs{ - Token: n.rpcToken, - Vol: volume, - Path: path, + if err = n.rpcClient.Call("Storage.DeleteFileHandler", &DeleteFileArgs{ + GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, + Vol: volume, + Path: path, }, &reply); err != nil { return toStorageErr(err) } @@ -297,12 +271,12 @@ func (n networkStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string return errVolumeBusy } reply := GenericReply{} - if err = n.rpcClient.Call("Storage.RenameFileHandler", RenameFileArgs{ - Token: n.rpcToken, - SrcVol: srcVolume, - SrcPath: srcPath, - DstVol: dstVolume, - DstPath: dstPath, + if err = n.rpcClient.Call("Storage.RenameFileHandler", &RenameFileArgs{ + GenericArgs: GenericArgs{Token: n.rpcClient.token}, + SrcVol: srcVolume, + SrcPath: srcPath, + DstVol: dstVolume, + DstPath: dstPath, }, &reply); err != nil { return toStorageErr(err) } diff --git a/cmd/storage-rpc-server-datatypes.go b/cmd/storage-rpc-server-datatypes.go index f2c649875..798e2b3cd 100644 --- a/cmd/storage-rpc-server-datatypes.go +++ b/cmd/storage-rpc-server-datatypes.go @@ -16,6 +16,35 @@ package cmd +import "time" + +// TokenSetter is to be implemented by types that need a way to update member that represents a token. +// e.g, See GenericArgs. +type TokenSetter interface { + SetToken(token string) + SetTimestamp(tstamp time.Time) +} + +// GenericReply represents any generic RPC reply. +type GenericReply struct { +} + +// GenericArgs represents any generic RPC arguments. +type GenericArgs struct { + Token string // Used to authenticate every RPC call. + Timestamp time.Time // Used to verify if the RPC call was issued between the same Login() and disconnect event pair. +} + +// SetToken - sets the token to the supplied value. +func (ga *GenericArgs) SetToken(token string) { + ga.Token = token +} + +// SetTimestamp - sets the timestamp to the supplied value. +func (ga *GenericArgs) SetTimestamp(tstamp time.Time) { + ga.Timestamp = tstamp +} + // RPCLoginArgs - login username and password for RPC. type RPCLoginArgs struct { Username string @@ -27,18 +56,13 @@ type RPCLoginArgs struct { type RPCLoginReply struct { Token string ServerVersion string + Timestamp time.Time } -// GenericReply represents any generic RPC reply. -type GenericReply struct{} - -// GenericArgs represents any generic RPC arguments. -type GenericArgs struct{} - // GenericVolArgs - generic volume args. type GenericVolArgs struct { // Authentication token generated by Login. - Token string + GenericArgs // Name of the volume. Vol string @@ -53,8 +77,7 @@ type ListVolsReply struct { // ReadAllArgs represents read all RPC arguments. type ReadAllArgs struct { // Authentication token generated by Login. - Token string - + GenericArgs // Name of the volume. Vol string @@ -65,8 +88,7 @@ type ReadAllArgs struct { // ReadFileArgs represents read file RPC arguments. type ReadFileArgs struct { // Authentication token generated by Login. - Token string - + GenericArgs // Name of the volume. Vol string @@ -83,7 +105,7 @@ type ReadFileArgs struct { // AppendFileArgs represents append file RPC arguments. type AppendFileArgs struct { // Authentication token generated by Login. - Token string + GenericArgs // Name of the volume. Vol string @@ -98,7 +120,7 @@ type AppendFileArgs struct { // StatFileArgs represents stat file RPC arguments. type StatFileArgs struct { // Authentication token generated by Login. - Token string + GenericArgs // Name of the volume. Vol string @@ -110,7 +132,7 @@ type StatFileArgs struct { // DeleteFileArgs represents delete file RPC arguments. type DeleteFileArgs struct { // Authentication token generated by Login. - Token string + GenericArgs // Name of the volume. Vol string @@ -122,7 +144,7 @@ type DeleteFileArgs struct { // ListDirArgs represents list contents RPC arguments. type ListDirArgs struct { // Authentication token generated by Login. - Token string + GenericArgs // Name of the volume. Vol string @@ -134,7 +156,7 @@ type ListDirArgs struct { // RenameFileArgs represents rename file RPC arguments. type RenameFileArgs struct { // Authentication token generated by Login. - Token string + GenericArgs // Name of source volume. SrcVol string diff --git a/cmd/storage-rpc-server.go b/cmd/storage-rpc-server.go index 079b0895d..3ef7297f4 100644 --- a/cmd/storage-rpc-server.go +++ b/cmd/storage-rpc-server.go @@ -88,8 +88,8 @@ func (s *storageServer) MakeVolHandler(args *GenericVolArgs, reply *GenericReply } // ListVolsHandler - list vols handler is rpc wrapper for ListVols operation. -func (s *storageServer) ListVolsHandler(token *string, reply *ListVolsReply) error { - if !isRPCTokenValid(*token) { +func (s *storageServer) ListVolsHandler(args *GenericArgs, reply *ListVolsReply) error { + if !isRPCTokenValid(args.Token) { return errors.New("Invalid token") } vols, err := s.storage.ListVols() diff --git a/vendor/github.com/minio/dsync/drwmutex.go b/vendor/github.com/minio/dsync/drwmutex.go index 4460b5a1f..9b814b6c8 100644 --- a/vendor/github.com/minio/dsync/drwmutex.go +++ b/vendor/github.com/minio/dsync/drwmutex.go @@ -19,7 +19,6 @@ package dsync import ( "math" "math/rand" - "net/rpc" "sync" "time" ) @@ -40,6 +39,20 @@ type Granted struct { uid string } +type LockArgs struct { + Token string + Timestamp time.Time + Name string +} + +func (l *LockArgs) SetToken(token string) { + l.Token = token +} + +func (l *LockArgs) SetTimestamp(tstamp time.Time) { + l.Timestamp = tstamp +} + func NewDRWMutex(name string) *DRWMutex { return &DRWMutex{ Name: name, @@ -48,28 +61,6 @@ func NewDRWMutex(name string) *DRWMutex { } } -// Connect to respective lock server nodes on the first Lock() call. -func connectLazy() { - if clnts == nil { - panic("rpc client connections weren't initialized.") - } - for i := range clnts { - if clnts[i].rpc != nil { - continue - } - - // Pass in unique path (as required by server.HandleHTTP(). - // Ignore failure to connect, the lock server node may join the - // cluster later. - clnt, err := rpc.DialHTTPPath("tcp", nodes[i], rpcPaths[i]) - if err != nil { - clnts[i].SetRPC(nil) - continue - } - clnts[i].SetRPC(clnt) - } -} - // RLock holds a read lock on dm. // // If the lock is already in use, the calling goroutine @@ -83,7 +74,6 @@ func (dm *DRWMutex) RLock() { runs, backOff := 1, 1 for { - connectLazy() // create temp arrays on stack locks := make([]bool, dnodeCount) @@ -128,8 +118,6 @@ func (dm *DRWMutex) Lock() { runs, backOff := 1, 1 for { - connectLazy() - // create temp arrays on stack locks := make([]bool, dnodeCount) ids := make([]string, dnodeCount) @@ -161,7 +149,7 @@ func (dm *DRWMutex) Lock() { // lock tries to acquire the distributed lock, returning true or false // -func lock(clnts []*RPCClient, locks *[]bool, uids *[]string, lockName string, isReadLock bool) bool { +func lock(clnts []RPC, locks *[]bool, uids *[]string, lockName string, isReadLock bool) bool { // Create buffered channel of quorum size ch := make(chan Granted, dquorum) @@ -169,15 +157,15 @@ func lock(clnts []*RPCClient, locks *[]bool, uids *[]string, lockName string, is for index, c := range clnts { // broadcast lock request to all nodes - go func(index int, isReadLock bool, c *RPCClient) { + go func(index int, isReadLock bool, c RPC) { // All client methods issuing RPCs are thread-safe and goroutine-safe, // i.e. it is safe to call them from multiple concurrently running go routines. var status bool var err error if isReadLock { - err = c.Call("Dsync.RLock", lockName, &status) + err = c.Call("Dsync.RLock", &LockArgs{Name: lockName}, &status) } else { - err = c.Call("Dsync.Lock", lockName, &status) + err = c.Call("Dsync.Lock", &LockArgs{Name: lockName}, &status) } locked, uid := false, "" @@ -185,14 +173,8 @@ func lock(clnts []*RPCClient, locks *[]bool, uids *[]string, lockName string, is locked = status // TODO: Get UIOD again uid = "" - } else { - // If rpc call failed due to connection related errors, reset rpc.Client object - // to trigger reconnect on subsequent Lock()/Unlock() requests to the same node. - if IsRPCError(err) { - clnts[index].SetRPC(nil) - } - // silently ignore error, retry later } + // silently ignore error, retry later ch <- Granted{index: index, locked: locked, uid: uid} @@ -277,7 +259,7 @@ func quorumMet(locks *[]bool) bool { } // releaseAll releases all locks that are marked as locked -func releaseAll(clnts []*RPCClient, locks *[]bool, ids *[]string, lockName string, isReadLock bool) { +func releaseAll(clnts []RPC, locks *[]bool, ids *[]string, lockName string, isReadLock bool) { for lock := 0; lock < dnodeCount; lock++ { if (*locks)[lock] { @@ -337,38 +319,31 @@ func (dm *DRWMutex) Unlock() { } // sendRelease sends a release message to a node that previously granted a lock -func sendRelease(c *RPCClient, name, uid string, isReadLock bool) { +func sendRelease(c RPC, name, uid string, isReadLock bool) { backOffArray := []time.Duration{30 * time.Second, 1 * time.Minute, 3 * time.Minute, 10 * time.Minute, 30 * time.Minute, 1 * time.Hour} - go func(c *RPCClient, name, uid string) { + go func(c RPC, name, uid string) { for _, backOff := range backOffArray { - // Make sure we are connected - connectLazy() - // All client methods issuing RPCs are thread-safe and goroutine-safe, // i.e. it is safe to call them from multiple concurrently running goroutines. var status bool var err error // TODO: Send UID to server if isReadLock { - if err = c.Call("Dsync.RUnlock", name, &status); err == nil { + if err = c.Call("Dsync.RUnlock", &LockArgs{Name: name}, &status); err == nil { // RUnlock delivered, exit out return } } else { - if err = c.Call("Dsync.Unlock", name, &status); err == nil { + if err = c.Call("Dsync.Unlock", &LockArgs{Name: name}, &status); err == nil { // Unlock delivered, exit out return } } - // If rpc call failed due to connection related errors, reset rpc.Client object - // to trigger reconnect on subsequent Lock()/Unlock() requests to the same node. - c.SetRPC(nil) - // wait time.Sleep(backOff) } diff --git a/vendor/github.com/minio/dsync/dsync.go b/vendor/github.com/minio/dsync/dsync.go index 9172e1eef..8c28abdf4 100644 --- a/vendor/github.com/minio/dsync/dsync.go +++ b/vendor/github.com/minio/dsync/dsync.go @@ -26,43 +26,30 @@ const DefaultPath = "/rpc/dsync" // Number of nodes participating in the distributed locking. var dnodeCount int -// List of nodes participating. -var nodes []string - -// List of rpc paths, one per lock server. -var rpcPaths []string - // List of rpc client objects, one per lock server. -var clnts []*RPCClient +var clnts []RPC // Simple majority based quorum, set to dNodeCount/2+1 var dquorum int -// SetNodesWithPath - initializes package-level global state variables such as -// nodes, rpcPaths, clnts. +// SetNodesWithPath - initializes package-level global state variables such as clnts. // N B - This function should be called only once inside any program that uses // dsync. -func SetNodesWithPath(nodeList []string, paths []string) (err error) { +func SetNodesWithClients(rpcClnts []RPC) (err error) { // Validate if number of nodes is within allowable range. if dnodeCount != 0 { return errors.New("Cannot reinitialize dsync package") - } else if len(nodeList) < 4 { + } else if len(rpcClnts) < 4 { return errors.New("Dsync not designed for less than 4 nodes") - } else if len(nodeList) > 16 { + } else if len(rpcClnts) > 16 { return errors.New("Dsync not designed for more than 16 nodes") } - nodes = make([]string, len(nodeList)) - copy(nodes, nodeList[:]) - rpcPaths = make([]string, len(paths)) - copy(rpcPaths, paths[:]) - dnodeCount = len(nodes) + dnodeCount = len(rpcClnts) dquorum = dnodeCount/2 + 1 - clnts = make([]*RPCClient, dnodeCount) // Initialize node name and rpc path for each RPCClient object. - for i := range clnts { - clnts[i] = newClient(nodes[i], rpcPaths[i]) - } + clnts = make([]RPC, dnodeCount) + copy(clnts, rpcClnts) return nil } diff --git a/vendor/github.com/minio/dsync/rpc-client-interface.go b/vendor/github.com/minio/dsync/rpc-client-interface.go new file mode 100644 index 000000000..decf8d57b --- /dev/null +++ b/vendor/github.com/minio/dsync/rpc-client-interface.go @@ -0,0 +1,29 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dsync + +import "time" + +type TokenSetter interface { + SetToken(token string) + SetTimestamp(tstamp time.Time) +} + +type RPC interface { + Call(serviceMethod string, args TokenSetter, reply interface{}) error + Close() error +} diff --git a/vendor/github.com/minio/dsync/rpc-client.go b/vendor/github.com/minio/dsync/rpc-client.go deleted file mode 100644 index fe87866b3..000000000 --- a/vendor/github.com/minio/dsync/rpc-client.go +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dsync - -import ( - "net/rpc" - "reflect" - "sync" -) - -// Wrapper type for rpc.Client that provides connection management like -// reconnect on first failure. -type RPCClient struct { - sync.Mutex - rpc *rpc.Client - node string - rpcPath string -} - -func newClient(node, rpcPath string) *RPCClient { - return &RPCClient{ - node: node, - rpcPath: rpcPath, - } -} - -func (rpcClient *RPCClient) SetRPC(rpc *rpc.Client) { - rpcClient.Lock() - defer rpcClient.Unlock() - rpcClient.rpc = rpc -} - -func (rpcClient *RPCClient) Call(serviceMethod string, args interface{}, reply interface{}) error { - rpcClient.Lock() - defer rpcClient.Unlock() - if rpcClient.rpc == nil { - return rpc.ErrShutdown - } - err := rpcClient.rpc.Call(serviceMethod, args, reply) - return err - -} - -func (rpcClient *RPCClient) Reconnect() error { - rpcClient.Lock() - defer rpcClient.Unlock() - clnt, err := rpc.DialHTTPPath("tcp", rpcClient.node, rpcClient.rpcPath) - if err != nil { - return err - } - rpcClient.rpc = clnt - return nil -} - -func IsRPCError(err error) bool { - // The following are net/rpc specific errors that indicate that - // the connection may have been reset. - if err == rpc.ErrShutdown || - reflect.TypeOf(err) == reflect.TypeOf((*rpc.ServerError)(nil)).Elem() { - return true - } - return false -} diff --git a/vendor/vendor.json b/vendor/vendor.json index fdc4dfacb..516d7773d 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -98,10 +98,10 @@ "revisionTime": "2015-11-18T20:00:48-08:00" }, { - "checksumSHA1": "kbVCnnU0gR/i8WA8Gs2I+/7kONY=", + "checksumSHA1": "UmlhYLEvnNk+1e4CEDpVZ3c5mhQ=", "path": "github.com/minio/dsync", - "revision": "8f4819554f1f4fffc2e1c8c706b23e5c844997f4", - "revisionTime": "2016-08-17T23:34:37Z" + "revision": "a095ea2cf13223a1bf7e20efcb83edacc3a610c1", + "revisionTime": "2016-08-22T23:56:01Z" }, { "path": "github.com/minio/go-homedir",