From dd68cdd802e5fb61611ac2516f5e08c6d2ca1732 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 29 Dec 2016 19:42:02 -0800 Subject: [PATCH] Auto-reconnect for regular authRPC client. (#3506) Implement a storage rpc specific rpc client, which does not reconnect unnecessarily. Instead reconnect is handled at a different layer for storage alone. Rest of the calls using AuthRPC automatically reconnect, i.e upon an error equal to `rpc.ErrShutdown` they dial again and call the requested method again. --- cmd/admin-rpc-client.go | 13 +--- cmd/auth-rpc-client.go | 47 ++++++++------ cmd/bucket-metadata.go | 49 ++------------ cmd/globals.go | 4 ++ cmd/storage-rpc-client.go | 132 +++++++++++++++++++++++++++++++++----- 5 files changed, 155 insertions(+), 90 deletions(-) diff --git a/cmd/admin-rpc-client.go b/cmd/admin-rpc-client.go index d454cc05a..43c9c3ae5 100644 --- a/cmd/admin-rpc-client.go +++ b/cmd/admin-rpc-client.go @@ -17,7 +17,6 @@ package cmd import ( - "net/rpc" "net/url" "path" "sync" @@ -58,22 +57,14 @@ func (lc localAdminClient) Restart() error { func (rc remoteAdminClient) Stop() error { args := GenericArgs{} reply := GenericReply{} - err := rc.Call("Service.Shutdown", &args, &reply) - if err != nil && err == rpc.ErrShutdown { - rc.Close() - } - return err + return rc.Call("Service.Shutdown", &args, &reply) } // Restart - Sends restart command to remote server via RPC. func (rc remoteAdminClient) Restart() error { args := GenericArgs{} reply := GenericReply{} - err := rc.Call("Service.Restart", &args, &reply) - if err != nil && err == rpc.ErrShutdown { - rc.Close() - } - return err + return rc.Call("Service.Restart", &args, &reply) } // adminPeer - represents an entity that implements Stop and Restart methods. diff --git a/cmd/auth-rpc-client.go b/cmd/auth-rpc-client.go index 9284e0784..2e15ed8f1 100644 --- a/cmd/auth-rpc-client.go +++ b/cmd/auth-rpc-client.go @@ -76,7 +76,6 @@ type AuthRPCClient struct { mu sync.Mutex config *authConfig rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client - isLoggedIn bool // Indicates if the auth client has been logged in and token is valid. serverToken string // Disk rpc JWT based token. serverVersion string // Server version exchanged by the RPC. } @@ -88,8 +87,6 @@ func newAuthClient(cfg *authConfig) *AuthRPCClient { config: cfg, // Initialize a new reconnectable rpc client. rpc: newRPCClient(cfg.address, cfg.path, cfg.secureConn), - // Allocated auth client not logged in yet. - isLoggedIn: false, } } @@ -97,7 +94,7 @@ func newAuthClient(cfg *authConfig) *AuthRPCClient { func (authClient *AuthRPCClient) Close() error { authClient.mu.Lock() // reset token on closing a connection - authClient.isLoggedIn = false + authClient.serverToken = "" authClient.mu.Unlock() return authClient.rpc.Close() } @@ -109,7 +106,7 @@ func (authClient *AuthRPCClient) Login() (err error) { defer authClient.mu.Unlock() // Return if already logged in. - if authClient.isLoggedIn { + if authClient.serverToken != "" { return nil } @@ -135,7 +132,6 @@ func (authClient *AuthRPCClient) Login() (err error) { // Set token, time stamp as received from a successful login call. authClient.serverToken = reply.Token authClient.serverVersion = reply.ServerVersion - authClient.isLoggedIn = true return nil } @@ -146,21 +142,34 @@ func (authClient *AuthRPCClient) Call(serviceMethod string, args interface { SetToken(token string) SetTimestamp(tstamp time.Time) }, reply interface{}) (err error) { - // On successful login, attempt the call. - if err = authClient.Login(); err == nil { - // Set token and timestamp before the rpc call. - args.SetToken(authClient.serverToken) - args.SetTimestamp(time.Now().UTC()) + loginAndCallFn := func() error { + // On successful login, proceed to attempt the requested service method. + if err = authClient.Login(); err == nil { + // Set token and timestamp before the rpc call. + args.SetToken(authClient.serverToken) + args.SetTimestamp(time.Now().UTC()) - // Call the underlying rpc. - err = authClient.rpc.Call(serviceMethod, args, reply) - - // Invalidate token, and mark it for re-login on subsequent reconnect. - if err == rpc.ErrShutdown { - authClient.mu.Lock() - authClient.isLoggedIn = false - authClient.mu.Unlock() + // Finally make the network call using net/rpc client. + err = authClient.rpc.Call(serviceMethod, args, reply) } + return err + } + doneCh := make(chan struct{}) + defer close(doneCh) + for i := range newRetryTimer(time.Second, time.Second*30, MaxJitter, doneCh) { + // Invalidate token, and mark it for re-login and + // reconnect upon rpc shutdown. + if err = loginAndCallFn(); err == rpc.ErrShutdown { + // Close the underlying connection, and proceed to reconnect + // if we haven't reached the retry threshold. + authClient.Close() + + // No need to return error until the retry count threshold has reached. + if i < globalMaxAuthRPCRetryThreshold { + continue + } + } + break } return err } diff --git a/cmd/bucket-metadata.go b/cmd/bucket-metadata.go index 9339c553f..01784e6e0 100644 --- a/cmd/bucket-metadata.go +++ b/cmd/bucket-metadata.go @@ -16,10 +16,7 @@ package cmd -import ( - "encoding/json" - "net/rpc" -) +import "encoding/json" // BucketMetaState - Interface to update bucket metadata in-memory // state. @@ -112,62 +109,26 @@ type remoteBucketMetaState struct { // change to remote peer via RPC call. func (rc *remoteBucketMetaState) UpdateBucketNotification(args *SetBucketNotificationPeerArgs) error { reply := GenericReply{} - err := rc.Call("S3.SetBucketNotificationPeer", args, &reply) - // Check for network error and retry once. - if err != nil && err == rpc.ErrShutdown { - // Close the underlying connection to attempt once more. - rc.Close() - - // Attempt again and proceed. - err = rc.Call("S3.SetBucketNotificationPeer", args, &reply) - } - return err + return rc.Call("S3.SetBucketNotificationPeer", args, &reply) } // remoteBucketMetaState.UpdateBucketListener - sends bucket listener change to // remote peer via RPC call. func (rc *remoteBucketMetaState) UpdateBucketListener(args *SetBucketListenerPeerArgs) error { reply := GenericReply{} - err := rc.Call("S3.SetBucketListenerPeer", args, &reply) - // Check for network error and retry once. - if err != nil && err == rpc.ErrShutdown { - // Close the underlying connection to attempt once more. - rc.Close() - - // Attempt again and proceed. - err = rc.Call("S3.SetBucketListenerPeer", args, &reply) - } - return err + return rc.Call("S3.SetBucketListenerPeer", args, &reply) } // remoteBucketMetaState.UpdateBucketPolicy - sends bucket policy change to remote // peer via RPC call. func (rc *remoteBucketMetaState) UpdateBucketPolicy(args *SetBucketPolicyPeerArgs) error { reply := GenericReply{} - err := rc.Call("S3.SetBucketPolicyPeer", args, &reply) - // Check for network error and retry once. - if err != nil && err == rpc.ErrShutdown { - // Close the underlying connection to attempt once more. - rc.Close() - - // Attempt again and proceed. - err = rc.Call("S3.SetBucketPolicyPeer", args, &reply) - } - return err + return rc.Call("S3.SetBucketPolicyPeer", args, &reply) } // remoteBucketMetaState.SendEvent - sends event for bucket listener to remote // peer via RPC call. func (rc *remoteBucketMetaState) SendEvent(args *EventArgs) error { reply := GenericReply{} - err := rc.Call("S3.Event", args, &reply) - // Check for network error and retry once. - if err != nil && err == rpc.ErrShutdown { - // Close the underlying connection to attempt once more. - rc.Close() - - // Attempt again and proceed. - err = rc.Call("S3.Event", args, &reply) - } - return err + return rc.Call("S3.Event", args, &reply) } diff --git a/cmd/globals.go b/cmd/globals.go index 613d359a3..fad83d694 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -93,6 +93,10 @@ var ( // giving up on the remote disk entirely. globalMaxStorageRetryThreshold = 3 + // Attempt to retry only this many number of times before + // giving up on the remote RPC entirely. + globalMaxAuthRPCRetryThreshold = 1 + // Add new variable global values here. ) diff --git a/cmd/storage-rpc-client.go b/cmd/storage-rpc-client.go index fadf66b0c..4fd0c85f9 100644 --- a/cmd/storage-rpc-client.go +++ b/cmd/storage-rpc-client.go @@ -23,7 +23,9 @@ import ( "net/rpc" "net/url" "path" + "sync" "sync/atomic" + "time" "github.com/minio/minio/pkg/disk" ) @@ -32,7 +34,7 @@ type networkStorage struct { networkIOErrCount int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG netAddr string netPath string - rpcClient *AuthRPCClient + rpcClient *storageRPCClient } const ( @@ -97,6 +99,104 @@ func toStorageErr(err error) error { return err } +// storageRPCClient is a wrapper type for RPCClient which provides JWT based authentication across reconnects. +type storageRPCClient struct { + sync.Mutex + cfg storageConfig + rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client + serverToken string // Disk rpc JWT based token. + serverVersion string // Server version exchanged by the RPC. +} + +// Storage config represents authentication credentials and Login +// method name to be used for fetching JWT tokens from the storage +// server. +type storageConfig struct { + addr string // Network address path of storage RPC server. + path string // Network storage path for HTTP dial. + secureConn bool // Indicates if this storage RPC is on a secured connection. + creds credential +} + +// newStorageClient - returns a jwt based authenticated (go) storage rpc client. +func newStorageClient(storageCfg storageConfig) *storageRPCClient { + return &storageRPCClient{ + // Save the config. + cfg: storageCfg, + rpc: newRPCClient(storageCfg.addr, storageCfg.path, storageCfg.secureConn), + } +} + +// Close - closes underlying rpc connection. +func (storageClient *storageRPCClient) Close() error { + storageClient.Lock() + // reset token on closing a connection + storageClient.serverToken = "" + storageClient.Unlock() + return storageClient.rpc.Close() +} + +// Login - a jwt based authentication is performed with rpc server. +func (storageClient *storageRPCClient) Login() (err error) { + storageClient.Lock() + // As soon as the function returns unlock, + defer storageClient.Unlock() + + // Return if token is already set. + if storageClient.serverToken != "" { + return nil + } + + reply := RPCLoginReply{} + if err = storageClient.rpc.Call("Storage.LoginHandler", RPCLoginArgs{ + Username: storageClient.cfg.creds.AccessKey, + Password: storageClient.cfg.creds.SecretKey, + }, &reply); err != nil { + return err + } + + // Validate if version do indeed match. + if reply.ServerVersion != Version { + return errServerVersionMismatch + } + + // Validate if server timestamp is skewed. + curTime := time.Now().UTC() + if curTime.Sub(reply.Timestamp) > globalMaxSkewTime { + return errServerTimeMismatch + } + + // Set token, time stamp as received from a successful login call. + storageClient.serverToken = reply.Token + storageClient.serverVersion = reply.ServerVersion + return nil +} + +// Call - If rpc connection isn't established yet since previous disconnect, +// connection is established, a jwt authenticated login is performed and then +// the call is performed. +func (storageClient *storageRPCClient) Call(serviceMethod string, args interface { + SetToken(token string) + SetTimestamp(tstamp time.Time) +}, reply interface{}) (err error) { + // On successful login, attempt the call. + if err = storageClient.Login(); err != nil { + return err + } + // Set token and timestamp before the rpc call. + args.SetToken(storageClient.serverToken) + args.SetTimestamp(time.Now().UTC()) + + // Call the underlying rpc. + err = storageClient.rpc.Call(serviceMethod, args, reply) + + // Invalidate token, and mark it for re-login. + if err == rpc.ErrShutdown { + storageClient.Close() + } + return err +} + // Initialize new storage rpc client. func newStorageRPC(ep *url.URL) (StorageAPI, error) { if ep == nil { @@ -108,28 +208,28 @@ func newStorageRPC(ep *url.URL) (StorageAPI, error) { rpcAddr := ep.Host // Initialize rpc client with network address and rpc path. - accessKeyID := serverConfig.GetCredential().AccessKey - secretAccessKey := serverConfig.GetCredential().SecretKey + accessKey := serverConfig.GetCredential().AccessKey + secretKey := serverConfig.GetCredential().SecretKey if ep.User != nil { - accessKeyID = ep.User.Username() + accessKey = ep.User.Username() if key, set := ep.User.Password(); set { - secretAccessKey = key + secretKey = key } } - rpcClient := newAuthClient(&authConfig{ - accessKey: accessKeyID, - secretKey: secretAccessKey, - secureConn: isSSL(), - address: rpcAddr, - path: rpcPath, - loginMethod: "Storage.LoginHandler", - }) // Initialize network storage. ndisk := &networkStorage{ - netAddr: ep.Host, - netPath: getPath(ep), - rpcClient: rpcClient, + netAddr: ep.Host, + netPath: getPath(ep), + rpcClient: newStorageClient(storageConfig{ + addr: rpcAddr, + path: rpcPath, + creds: credential{ + AccessKey: accessKey, + SecretKey: secretKey, + }, + secureConn: isSSL(), + }), } // Returns successfully here.