From 6d10f4c19af6861e4de1b22ac20a3e5136f69d67 Mon Sep 17 00:00:00 2001 From: "Bala.FA" Date: Fri, 23 Dec 2016 20:42:19 +0530 Subject: [PATCH] Adopt dsync interface changes and major cleanup on RPC server/client. * Rename GenericArgs to AuthRPCArgs * Rename GenericReply to AuthRPCReply * Remove authConfig.loginMethod and add authConfig.ServiceName * Rename loginServer to AuthRPCServer * Rename RPCLoginArgs to LoginRPCArgs * Rename RPCLoginReply to LoginRPCReply * Version and RequestTime are added to LoginRPCArgs and verified by server side, not client side. * Fix data race in lockMaintainence loop. --- cmd/admin-rpc-client.go | 23 +- cmd/admin-rpc-server.go | 16 +- cmd/admin-rpc-server_test.go | 52 ++--- cmd/auth-rpc-client.go | 202 ++++++---------- cmd/auth-rpc-client_test.go | 44 ++-- cmd/{login-server.go => auth-rpc-server.go} | 22 +- cmd/auth-rpc-server_test.go | 117 ++++++++++ cmd/browser-peer-rpc.go | 42 ++-- cmd/browser-peer-rpc_test.go | 56 ++--- cmd/browser-rpc-router.go | 4 +- cmd/bucket-metadata.go | 8 +- cmd/bucket-notification-handlers.go | 3 +- cmd/event-notifier_test.go | 10 + cmd/lock-rpc-client.go | 71 ++++++ cmd/lock-rpc-server-common.go | 14 -- cmd/lock-rpc-server.go | 187 +++++++-------- cmd/lock-rpc-server_test.go | 216 ++++++++++-------- cmd/login-server_test.go | 67 ------ cmd/namespace-lock.go | 21 +- cmd/net-rpc-client.go | 88 ++++--- cmd/retry-storage.go | 2 + cmd/rpc-common.go | 111 +++++++++ cmd/s3-peer-client.go | 15 +- cmd/s3-peer-router.go | 4 +- cmd/s3-peer-rpc-handlers.go | 36 ++- cmd/s3-peer-rpc-handlers_test.go | 30 +-- cmd/storage-rpc-client.go | 162 +++---------- cmd/storage-rpc-server-datatypes.go | 18 +- cmd/storage-rpc-server.go | 82 ++++--- cmd/storage-rpc-server_test.go | 63 ++--- cmd/test-utils_test.go | 9 +- cmd/utils_test.go | 6 + cmd/web-handlers.go | 2 +- cmd/web-handlers_test.go | 14 +- vendor/github.com/minio/dsync/README.md | 2 +- vendor/github.com/minio/dsync/drwmutex.go | 121 +++++----- vendor/github.com/minio/dsync/dsync.go | 30 ++- .../minio/dsync/rpc-client-interface.go | 56 ++++- vendor/vendor.json | 6 +- 39 files changed, 1083 insertions(+), 949 deletions(-) rename cmd/{login-server.go => auth-rpc-server.go} (61%) create mode 100644 cmd/auth-rpc-server_test.go create mode 100644 cmd/lock-rpc-client.go delete mode 100644 cmd/login-server_test.go create mode 100644 cmd/rpc-common.go diff --git a/cmd/admin-rpc-client.go b/cmd/admin-rpc-client.go index 43c9c3ae5..14f65626d 100644 --- a/cmd/admin-rpc-client.go +++ b/cmd/admin-rpc-client.go @@ -55,15 +55,15 @@ func (lc localAdminClient) Restart() error { // Stop - Sends stop command to remote server via RPC. func (rc remoteAdminClient) Stop() error { - args := GenericArgs{} - reply := GenericReply{} + args := AuthRPCArgs{} + reply := AuthRPCReply{} return rc.Call("Service.Shutdown", &args, &reply) } // Restart - Sends restart command to remote server via RPC. func (rc remoteAdminClient) Restart() error { - args := GenericArgs{} - reply := GenericReply{} + args := AuthRPCArgs{} + reply := AuthRPCReply{} return rc.Call("Service.Restart", &args, &reply) } @@ -90,6 +90,7 @@ func makeAdminPeers(eps []*url.URL) adminPeers { }) seenAddr[globalMinioAddr] = true + serverCred := serverConfig.GetCredential() // iterate over endpoints to find new remote peers and add // them to ret. for _, ep := range eps { @@ -100,17 +101,17 @@ func makeAdminPeers(eps []*url.URL) adminPeers { // Check if the remote host has been added already if !seenAddr[ep.Host] { cfg := authConfig{ - accessKey: serverConfig.GetCredential().AccessKey, - secretKey: serverConfig.GetCredential().SecretKey, - address: ep.Host, - secureConn: isSSL(), - path: path.Join(reservedBucket, servicePath), - loginMethod: "Service.LoginHandler", + accessKey: serverCred.AccessKey, + secretKey: serverCred.SecretKey, + serverAddr: ep.Host, + secureConn: isSSL(), + serviceEndpoint: path.Join(reservedBucket, servicePath), + serviceName: "Service", } servicePeers = append(servicePeers, adminPeer{ addr: ep.Host, - svcClnt: &remoteAdminClient{newAuthClient(&cfg)}, + svcClnt: &remoteAdminClient{newAuthRPCClient(cfg)}, }) seenAddr[ep.Host] = true } diff --git a/cmd/admin-rpc-server.go b/cmd/admin-rpc-server.go index af5cd0467..62751d114 100644 --- a/cmd/admin-rpc-server.go +++ b/cmd/admin-rpc-server.go @@ -27,23 +27,25 @@ const servicePath = "/admin/service" // serviceCmd - exports RPC methods for service status, stop and // restart commands. type serviceCmd struct { - loginServer + AuthRPCServer } // Shutdown - Shutdown this instance of minio server. -func (s *serviceCmd) Shutdown(args *GenericArgs, reply *GenericReply) error { - if !isAuthTokenValid(args.Token) { - return errInvalidToken +func (s *serviceCmd) Shutdown(args *AuthRPCArgs, reply *AuthRPCReply) error { + if err := args.IsAuthenticated(); err != nil { + return err } + globalServiceSignalCh <- serviceStop return nil } // Restart - Restart this instance of minio server. -func (s *serviceCmd) Restart(args *GenericArgs, reply *GenericReply) error { - if !isAuthTokenValid(args.Token) { - return errInvalidToken +func (s *serviceCmd) Restart(args *AuthRPCArgs, reply *AuthRPCReply) error { + if err := args.IsAuthenticated(); err != nil { + return err } + globalServiceSignalCh <- serviceRestart return nil } diff --git a/cmd/admin-rpc-server_test.go b/cmd/admin-rpc-server_test.go index a18f91547..1ebc24568 100644 --- a/cmd/admin-rpc-server_test.go +++ b/cmd/admin-rpc-server_test.go @@ -30,9 +30,14 @@ func testAdminCmd(cmd cmdType, t *testing.T) { adminServer := serviceCmd{} creds := serverConfig.GetCredential() - reply := RPCLoginReply{} - args := RPCLoginArgs{Username: creds.AccessKey, Password: creds.SecretKey} - err = adminServer.LoginHandler(&args, &reply) + args := LoginRPCArgs{ + Username: creds.AccessKey, + Password: creds.SecretKey, + Version: Version, + RequestTime: time.Now().UTC(), + } + reply := LoginRPCReply{} + err = adminServer.Login(&args, &reply) if err != nil { t.Fatalf("Failed to login to admin server - %v", err) } @@ -42,37 +47,16 @@ func testAdminCmd(cmd cmdType, t *testing.T) { <-globalServiceSignalCh }() - validToken := reply.Token - timeNow := time.Now().UTC() - testCases := []struct { - ga GenericArgs - expectedErr error - }{ - // Valid case - { - ga: GenericArgs{Token: validToken, Timestamp: timeNow}, - expectedErr: nil, - }, - // Invalid token - { - ga: GenericArgs{Token: "invalidToken", Timestamp: timeNow}, - expectedErr: errInvalidToken, - }, - } - - genReply := GenericReply{} - for i, test := range testCases { - switch cmd { - case stopCmd: - err = adminServer.Shutdown(&test.ga, &genReply) - if err != test.expectedErr { - t.Errorf("Test %d: Expected error %v but received %v", i+1, test.expectedErr, err) - } - case restartCmd: - err = adminServer.Restart(&test.ga, &genReply) - if err != test.expectedErr { - t.Errorf("Test %d: Expected error %v but received %v", i+1, test.expectedErr, err) - } + ga := AuthRPCArgs{AuthToken: reply.AuthToken, RequestTime: time.Now().UTC()} + genReply := AuthRPCReply{} + switch cmd { + case stopCmd: + if err = adminServer.Shutdown(&ga, &genReply); err != nil { + t.Errorf("stopCmd: Expected: , got: %v", err) + } + case restartCmd: + if err = adminServer.Restart(&ga, &genReply); err != nil { + t.Errorf("restartCmd: Expected: , got: %v", err) } } } diff --git a/cmd/auth-rpc-client.go b/cmd/auth-rpc-client.go index 2dec0f58f..0df2f5286 100644 --- a/cmd/auth-rpc-client.go +++ b/cmd/auth-rpc-client.go @@ -26,151 +26,98 @@ import ( // giving up on the remote RPC entirely. const globalAuthRPCRetryThreshold = 1 -// GenericReply represents any generic RPC reply. -type GenericReply struct{} - -// GenericArgs represents any generic RPC arguments. -type GenericArgs struct { - Token string // Used to authenticate every RPC call. - // Used to verify if the RPC call was issued between - // the same Login() and disconnect event pair. - Timestamp time.Time - - // Indicates if args should be sent to remote peers as well. - Remote bool -} - -// SetToken - sets the token to the supplied value. -func (ga *GenericArgs) SetToken(token string) { - ga.Token = token -} - -// SetTimestamp - sets the timestamp to the supplied value. -func (ga *GenericArgs) SetTimestamp(tstamp time.Time) { - ga.Timestamp = tstamp -} - -// RPCLoginArgs - login username and password for RPC. -type RPCLoginArgs struct { - Username string - Password string -} - -// RPCLoginReply - login reply provides generated token to be used -// with subsequent requests. -type RPCLoginReply struct { - Token string - Timestamp time.Time - ServerVersion string -} - -// Auth config represents authentication credentials and Login method name to be used -// for fetching JWT tokens from the RPC server. +// authConfig requires to make new AuthRPCClient. type authConfig struct { - accessKey string // Username for the server. - secretKey string // Password for the server. - secureConn bool // Ask for a secured connection - address string // Network address path of RPC server. - path string // Network path for HTTP dial. - loginMethod string // RPC service name for authenticating using JWT + accessKey string // Access key (like username) for authentication. + secretKey string // Secret key (like Password) for authentication. + serverAddr string // RPC server address. + serviceEndpoint string // Endpoint on the server to make any RPC call. + secureConn bool // Make TLS connection to RPC server or not. + serviceName string // Service name of auth server. + disableReconnect bool // Disable reconnect on failure or not. } -// AuthRPCClient is a wrapper type for RPCClient which provides JWT based authentication across reconnects. +// AuthRPCClient is a authenticated RPC client which does authentication before doing Call(). type AuthRPCClient struct { - mu sync.Mutex - config *authConfig - rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client - serverToken string // Disk rpc JWT based token. - serverVersion string // Server version exchanged by the RPC. + sync.Mutex // Mutex to lock this object. + rpcClient *RPCClient // Reconnectable RPC client to make any RPC call. + config authConfig // Authentication configuration information. + authToken string // Authentication token. } -// newAuthClient - returns a jwt based authenticated (go) rpc client, which does automatic reconnect. -func newAuthClient(cfg *authConfig) *AuthRPCClient { +// newAuthRPCClient - returns a JWT based authenticated (go) rpc client, which does automatic reconnect. +func newAuthRPCClient(config authConfig) *AuthRPCClient { return &AuthRPCClient{ - // Save the config. - config: cfg, - // Initialize a new reconnectable rpc client. - rpc: newRPCClient(cfg.address, cfg.path, cfg.secureConn), + rpcClient: newRPCClient(config.serverAddr, config.serviceEndpoint, config.secureConn), + config: config, } } -// Close - closes underlying rpc connection. -func (authClient *AuthRPCClient) Close() error { - authClient.mu.Lock() - // reset token on closing a connection - authClient.serverToken = "" - authClient.mu.Unlock() - return authClient.rpc.Close() -} - // Login - a jwt based authentication is performed with rpc server. func (authClient *AuthRPCClient) Login() (err error) { - authClient.mu.Lock() - // As soon as the function returns unlock, - defer authClient.mu.Unlock() + authClient.Lock() + defer authClient.Unlock() // Return if already logged in. - if authClient.serverToken != "" { + if authClient.authToken != "" { return nil } - reply := RPCLoginReply{} - if err = authClient.rpc.Call(authClient.config.loginMethod, RPCLoginArgs{ - Username: authClient.config.accessKey, - Password: authClient.config.secretKey, - }, &reply); err != nil { + // Call login. + args := LoginRPCArgs{ + Username: authClient.config.accessKey, + Password: authClient.config.secretKey, + Version: Version, + RequestTime: time.Now().UTC(), + } + + reply := LoginRPCReply{} + serviceMethod := authClient.config.serviceName + loginMethodName + if err = authClient.rpcClient.Call(serviceMethod, &args, &reply); err != nil { return err } - // Validate if version do indeed match. - if reply.ServerVersion != Version { - return errServerVersionMismatch - } + // Logged in successfully. + authClient.authToken = reply.AuthToken - // Validate if server timestamp is skewed. - curTime := time.Now().UTC() - if curTime.Sub(reply.Timestamp) > globalMaxSkewTime { - return errServerTimeMismatch - } - - // Set token, time stamp as received from a successful login call. - authClient.serverToken = reply.Token - authClient.serverVersion = reply.ServerVersion return nil } -// Call - If rpc connection isn't established yet since previous disconnect, -// connection is established, a jwt authenticated login is performed and then -// the call is performed. -func (authClient *AuthRPCClient) Call(serviceMethod string, args interface { - SetToken(token string) - SetTimestamp(tstamp time.Time) +// call makes a RPC call after logs into the server. +func (authClient *AuthRPCClient) call(serviceMethod string, args interface { + SetAuthToken(authToken string) + SetRequestTime(requestTime time.Time) }, reply interface{}) (err error) { - loginAndCallFn := func() error { - // On successful login, proceed to attempt the requested service method. - if err = authClient.Login(); err == nil { - // Set token and timestamp before the rpc call. - args.SetToken(authClient.serverToken) - args.SetTimestamp(time.Now().UTC()) + // On successful login, execute RPC call. + if err = authClient.Login(); err == nil { + // Set token and timestamp before the rpc call. + args.SetAuthToken(authClient.authToken) + args.SetRequestTime(time.Now().UTC()) - // Finally make the network call using net/rpc client. - err = authClient.rpc.Call(serviceMethod, args, reply) - } - return err + // Do RPC call. + err = authClient.rpcClient.Call(serviceMethod, args, reply) } + return err +} + +// Call executes RPC call till success or globalAuthRPCRetryThreshold on ErrShutdown. +func (authClient *AuthRPCClient) Call(serviceMethod string, args interface { + SetAuthToken(authToken string) + SetRequestTime(requestTime time.Time) +}, reply interface{}) (err error) { doneCh := make(chan struct{}) defer close(doneCh) - for i := range newRetryTimer(time.Second, time.Second*30, MaxJitter, doneCh) { - // Invalidate token, and mark it for re-login and - // reconnect upon rpc shutdown. - if err = loginAndCallFn(); err == rpc.ErrShutdown { - // Close the underlying connection, and proceed to reconnect - // if we haven't reached the retry threshold. + for i := range newRetryTimer(time.Second, 30*time.Second, MaxJitter, doneCh) { + if err = authClient.call(serviceMethod, args, reply); err == rpc.ErrShutdown { + // As connection at server side is closed, close the rpc client. authClient.Close() - // No need to return error until the retry count threshold has reached. - if i < globalAuthRPCRetryThreshold { - continue + // Retry if reconnect is not disabled. + if !authClient.config.disableReconnect { + // Retry until threshold reaches. + if i < globalAuthRPCRetryThreshold { + continue + } } } break @@ -178,18 +125,21 @@ func (authClient *AuthRPCClient) Call(serviceMethod string, args interface { return err } -// Node returns the node (network address) of the connection -func (authClient *AuthRPCClient) Node() (node string) { - if authClient.rpc != nil { - node = authClient.rpc.node - } - return node +// Close closes underlying RPC Client. +func (authClient *AuthRPCClient) Close() error { + authClient.Lock() + defer authClient.Unlock() + + authClient.authToken = "" + return authClient.rpcClient.Close() } -// RPCPath returns the RPC path of the connection -func (authClient *AuthRPCClient) RPCPath() (rpcPath string) { - if authClient.rpc != nil { - rpcPath = authClient.rpc.rpcPath - } - return rpcPath +// ServerAddr returns the serverAddr (network address) of the connection. +func (authClient *AuthRPCClient) ServerAddr() string { + return authClient.config.serverAddr +} + +// ServiceEndpoint returns the RPC service endpoint of the connection. +func (authClient *AuthRPCClient) ServiceEndpoint() string { + return authClient.config.serviceEndpoint } diff --git a/cmd/auth-rpc-client_test.go b/cmd/auth-rpc-client_test.go index fb27d4822..9b9076d7f 100644 --- a/cmd/auth-rpc-client_test.go +++ b/cmd/auth-rpc-client_test.go @@ -20,32 +20,32 @@ import "testing" // Tests authorized RPC client. func TestAuthRPCClient(t *testing.T) { - authCfg := &authConfig{ + authCfg := authConfig{ + accessKey: "123", + secretKey: "123", + serverAddr: "localhost:9000", + serviceEndpoint: "/rpc/disk", + secureConn: false, + serviceName: "MyPackage", + } + authRPC := newAuthRPCClient(authCfg) + if authRPC.ServerAddr() != authCfg.serverAddr { + t.Fatalf("Unexpected node value %s, but expected %s", authRPC.ServerAddr(), authCfg.serverAddr) + } + if authRPC.ServiceEndpoint() != authCfg.serviceEndpoint { + t.Fatalf("Unexpected node value %s, but expected %s", authRPC.ServiceEndpoint(), authCfg.serviceEndpoint) + } + authCfg = authConfig{ accessKey: "123", secretKey: "123", secureConn: false, - address: "localhost:9000", - path: "/rpc/disk", - loginMethod: "MyPackage.LoginHandler", + serviceName: "MyPackage", } - authRPC := newAuthClient(authCfg) - if authRPC.Node() != authCfg.address { - t.Fatalf("Unexpected node value %s, but expected %s", authRPC.Node(), authCfg.address) + authRPC = newAuthRPCClient(authCfg) + if authRPC.ServerAddr() != authCfg.serverAddr { + t.Fatalf("Unexpected node value %s, but expected %s", authRPC.ServerAddr(), authCfg.serverAddr) } - if authRPC.RPCPath() != authCfg.path { - t.Fatalf("Unexpected node value %s, but expected %s", authRPC.RPCPath(), authCfg.path) - } - authCfg = &authConfig{ - accessKey: "123", - secretKey: "123", - secureConn: false, - loginMethod: "MyPackage.LoginHandler", - } - authRPC = newAuthClient(authCfg) - if authRPC.Node() != authCfg.address { - t.Fatalf("Unexpected node value %s, but expected %s", authRPC.Node(), authCfg.address) - } - if authRPC.RPCPath() != authCfg.path { - t.Fatalf("Unexpected node value %s, but expected %s", authRPC.RPCPath(), authCfg.path) + if authRPC.ServiceEndpoint() != authCfg.serviceEndpoint { + t.Fatalf("Unexpected node value %s, but expected %s", authRPC.ServiceEndpoint(), authCfg.serviceEndpoint) } } diff --git a/cmd/login-server.go b/cmd/auth-rpc-server.go similarity index 61% rename from cmd/login-server.go rename to cmd/auth-rpc-server.go index 223288609..05a62b826 100644 --- a/cmd/login-server.go +++ b/cmd/auth-rpc-server.go @@ -16,20 +16,28 @@ package cmd -import "time" +// Base login method name. It should be used along with service name. +const loginMethodName = ".Login" -type loginServer struct { +// AuthRPCServer RPC server authenticates using JWT. +type AuthRPCServer struct { } -// LoginHandler - Handles JWT based RPC logic. -func (b loginServer) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) error { +// Login - Handles JWT based RPC login. +func (b AuthRPCServer) Login(args *LoginRPCArgs, reply *LoginRPCReply) error { + // Validate LoginRPCArgs + if err := args.IsValid(); err != nil { + return err + } + + // Authenticate using JWT. token, err := authenticateNode(args.Username, args.Password) if err != nil { return err } - reply.Token = token - reply.Timestamp = time.Now().UTC() - reply.ServerVersion = Version + // Return the token. + reply.AuthToken = token + return nil } diff --git a/cmd/auth-rpc-server_test.go b/cmd/auth-rpc-server_test.go new file mode 100644 index 000000000..e95c98340 --- /dev/null +++ b/cmd/auth-rpc-server_test.go @@ -0,0 +1,117 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "testing" + "time" +) + +func TestLogin(t *testing.T) { + rootPath, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("Failed to create test config - %v", err) + } + defer removeAll(rootPath) + creds := serverConfig.GetCredential() + ls := AuthRPCServer{} + testCases := []struct { + args LoginRPCArgs + skewTime time.Duration + expectedErr error + }{ + // Valid case. + { + args: LoginRPCArgs{ + Username: creds.AccessKey, + Password: creds.SecretKey, + Version: Version, + }, + skewTime: 0, + expectedErr: nil, + }, + // Valid username, password and request time, not version. + { + args: LoginRPCArgs{ + Username: creds.AccessKey, + Password: creds.SecretKey, + Version: "INVALID-" + Version, + }, + skewTime: 0, + expectedErr: errServerVersionMismatch, + }, + // Valid username, password and version, not request time + { + args: LoginRPCArgs{ + Username: creds.AccessKey, + Password: creds.SecretKey, + Version: Version, + }, + skewTime: 20 * time.Minute, + expectedErr: errServerTimeMismatch, + }, + // Invalid username length + { + args: LoginRPCArgs{ + Username: "aaa", + Password: "minio123", + Version: Version, + }, + skewTime: 0, + expectedErr: errInvalidAccessKeyLength, + }, + // Invalid password length + { + args: LoginRPCArgs{ + Username: "minio", + Password: "aaa", + Version: Version, + }, + skewTime: 0, + expectedErr: errInvalidSecretKeyLength, + }, + // Invalid username + { + args: LoginRPCArgs{ + Username: "aaaaa", + Password: creds.SecretKey, + Version: Version, + }, + skewTime: 0, + expectedErr: errInvalidAccessKeyID, + }, + // Invalid password + { + args: LoginRPCArgs{ + Username: creds.AccessKey, + Password: "aaaaaaaa", + Version: Version, + }, + skewTime: 0, + expectedErr: errAuthentication, + }, + } + for i, test := range testCases { + reply := LoginRPCReply{} + test.args.RequestTime = time.Now().Add(test.skewTime).UTC() + err := ls.Login(&test.args, &reply) + if err != test.expectedErr { + t.Errorf("Test %d: Expected error %v but received %v", + i+1, test.expectedErr, err) + } + } +} diff --git a/cmd/browser-peer-rpc.go b/cmd/browser-peer-rpc.go index 17d9254f6..3af174d06 100644 --- a/cmd/browser-peer-rpc.go +++ b/cmd/browser-peer-rpc.go @@ -24,22 +24,28 @@ import ( // Login handler implements JWT login token generator, which upon login request // along with username and password is generated. -func (br *browserPeerAPIHandlers) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) error { +func (br *browserPeerAPIHandlers) Login(args *LoginRPCArgs, reply *LoginRPCReply) error { + // Validate LoginRPCArgs + if err := args.IsValid(); err != nil { + return err + } + + // Authenticate using JWT. token, err := authenticateWeb(args.Username, args.Password) if err != nil { return err } - reply.Token = token - reply.ServerVersion = Version - reply.Timestamp = time.Now().UTC() + // Return the token. + reply.AuthToken = token + return nil } // SetAuthPeerArgs - Arguments collection for SetAuth RPC call type SetAuthPeerArgs struct { // For Auth - GenericArgs + AuthRPCArgs // New credentials that receiving peer should update to. Creds credential @@ -52,10 +58,9 @@ type SetAuthPeerArgs struct { // will be forced to re-establish connections. Connections will be // re-established only when the sending client has also updated its // credentials. -func (br *browserPeerAPIHandlers) SetAuthPeer(args SetAuthPeerArgs, reply *GenericReply) error { - // Check auth - if !isAuthTokenValid(args.Token) { - return errInvalidToken +func (br *browserPeerAPIHandlers) SetAuthPeer(args SetAuthPeerArgs, reply *AuthRPCReply) error { + if err := args.IsAuthenticated(); err != nil { + return err } // Update credentials in memory @@ -82,6 +87,7 @@ func updateCredsOnPeers(creds credential) map[string]error { errs := make([]error, len(peers)) var wg sync.WaitGroup + serverCred := serverConfig.GetCredential() // Launch go routines to send request to each peer in parallel. for ix := range peers { wg.Add(1) @@ -96,13 +102,13 @@ func updateCredsOnPeers(creds credential) map[string]error { } // Initialize client - client := newAuthClient(&authConfig{ - accessKey: serverConfig.GetCredential().AccessKey, - secretKey: serverConfig.GetCredential().SecretKey, - address: peers[ix], - secureConn: isSSL(), - path: path.Join(reservedBucket, browserPeerPath), - loginMethod: "Browser.LoginHandler", + client := newAuthRPCClient(authConfig{ + accessKey: serverCred.AccessKey, + secretKey: serverCred.SecretKey, + serverAddr: peers[ix], + secureConn: isSSL(), + serviceEndpoint: path.Join(reservedBucket, browserPeerPath), + serviceName: "Browser", }) // Construct RPC call arguments. @@ -110,14 +116,14 @@ func updateCredsOnPeers(creds credential) map[string]error { // Make RPC call - we only care about error // response and not the reply. - err := client.Call("Browser.SetAuthPeer", &args, &GenericReply{}) + err := client.Call("Browser.SetAuthPeer", &args, &AuthRPCReply{}) // We try a bit hard (3 attempts with 1 second delay) // to set creds on peers in case of failure. if err != nil { for i := 0; i < 2; i++ { time.Sleep(1 * time.Second) // 1 second delay. - err = client.Call("Browser.SetAuthPeer", &args, &GenericReply{}) + err = client.Call("Browser.SetAuthPeer", &args, &AuthRPCReply{}) if err == nil { break } diff --git a/cmd/browser-peer-rpc_test.go b/cmd/browser-peer-rpc_test.go index 4adfd4abf..5554c4f32 100644 --- a/cmd/browser-peer-rpc_test.go +++ b/cmd/browser-peer-rpc_test.go @@ -19,24 +19,25 @@ package cmd import ( "path" "testing" + "time" ) // API suite container common to both FS and XL. type TestRPCBrowserPeerSuite struct { serverType string testServer TestServer - testAuthConf *authConfig + testAuthConf authConfig } // Setting up the test suite and starting the Test server. func (s *TestRPCBrowserPeerSuite) SetUpSuite(c *testing.T) { s.testServer = StartTestBrowserPeerRPCServer(c, s.serverType) - s.testAuthConf = &authConfig{ - address: s.testServer.Server.Listener.Addr().String(), - accessKey: s.testServer.AccessKey, - secretKey: s.testServer.SecretKey, - path: path.Join(reservedBucket, browserPeerPath), - loginMethod: "BrowserPeer.LoginHandler", + s.testAuthConf = authConfig{ + serverAddr: s.testServer.Server.Listener.Addr().String(), + accessKey: s.testServer.AccessKey, + secretKey: s.testServer.SecretKey, + serviceEndpoint: path.Join(reservedBucket, browserPeerPath), + serviceName: "BrowserPeer", } } @@ -69,10 +70,10 @@ func (s *TestRPCBrowserPeerSuite) testBrowserPeerRPC(t *testing.T) { // Validate for invalid token. args := SetAuthPeerArgs{Creds: creds} - args.Token = "garbage" - rclient := newRPCClient(s.testAuthConf.address, s.testAuthConf.path, false) + args.AuthToken = "garbage" + rclient := newRPCClient(s.testAuthConf.serverAddr, s.testAuthConf.serviceEndpoint, false) defer rclient.Close() - err := rclient.Call("BrowserPeer.SetAuthPeer", &args, &GenericReply{}) + err := rclient.Call("BrowserPeer.SetAuthPeer", &args, &AuthRPCReply{}) if err != nil { if err.Error() != errInvalidToken.Error() { t.Fatal(err) @@ -81,22 +82,24 @@ func (s *TestRPCBrowserPeerSuite) testBrowserPeerRPC(t *testing.T) { // Validate for successful Peer update. args = SetAuthPeerArgs{Creds: creds} - client := newAuthClient(s.testAuthConf) + client := newAuthRPCClient(s.testAuthConf) defer client.Close() - err = client.Call("BrowserPeer.SetAuthPeer", &args, &GenericReply{}) + err = client.Call("BrowserPeer.SetAuthPeer", &args, &AuthRPCReply{}) if err != nil { t.Fatal(err) } // Validate for failure in login handler with previous credentials. - rclient = newRPCClient(s.testAuthConf.address, s.testAuthConf.path, false) + rclient = newRPCClient(s.testAuthConf.serverAddr, s.testAuthConf.serviceEndpoint, false) defer rclient.Close() - rargs := &RPCLoginArgs{ - Username: s.testAuthConf.accessKey, - Password: s.testAuthConf.secretKey, + rargs := &LoginRPCArgs{ + Username: creds.AccessKey, + Password: creds.SecretKey, + Version: Version, + RequestTime: time.Now().UTC(), } - rreply := &RPCLoginReply{} - err = rclient.Call("BrowserPeer.LoginHandler", rargs, rreply) + rreply := &LoginRPCReply{} + err = rclient.Call("BrowserPeer"+loginMethodName, rargs, rreply) if err != nil { if err.Error() != errInvalidAccessKeyID.Error() { t.Fatal(err) @@ -104,20 +107,19 @@ func (s *TestRPCBrowserPeerSuite) testBrowserPeerRPC(t *testing.T) { } // Validate for success in loing handled with valid credetnails. - rargs = &RPCLoginArgs{ - Username: creds.AccessKey, - Password: creds.SecretKey, + rargs = &LoginRPCArgs{ + Username: creds.AccessKey, + Password: creds.SecretKey, + Version: Version, + RequestTime: time.Now().UTC(), } - rreply = &RPCLoginReply{} - err = rclient.Call("BrowserPeer.LoginHandler", rargs, rreply) + rreply = &LoginRPCReply{} + err = rclient.Call("BrowserPeer"+loginMethodName, rargs, rreply) if err != nil { t.Fatal(err) } // Validate all the replied fields after successful login. - if rreply.Token == "" { + if rreply.AuthToken == "" { t.Fatalf("Generated token cannot be empty %s", errInvalidToken) } - if rreply.Timestamp.IsZero() { - t.Fatal("Time stamp returned cannot be zero") - } } diff --git a/cmd/browser-rpc-router.go b/cmd/browser-rpc-router.go index 74007f92e..f7b762441 100644 --- a/cmd/browser-rpc-router.go +++ b/cmd/browser-rpc-router.go @@ -31,7 +31,9 @@ const ( ) // The Type exporting methods exposed for RPC calls. -type browserPeerAPIHandlers struct{} +type browserPeerAPIHandlers struct { + AuthRPCServer +} // Register RPC router func registerBrowserPeerRPCRouter(mux *router.Router) error { diff --git a/cmd/bucket-metadata.go b/cmd/bucket-metadata.go index 01784e6e0..fb23b9211 100644 --- a/cmd/bucket-metadata.go +++ b/cmd/bucket-metadata.go @@ -108,27 +108,27 @@ type remoteBucketMetaState struct { // remoteBucketMetaState.UpdateBucketNotification - sends bucket notification // change to remote peer via RPC call. func (rc *remoteBucketMetaState) UpdateBucketNotification(args *SetBucketNotificationPeerArgs) error { - reply := GenericReply{} + reply := AuthRPCReply{} return rc.Call("S3.SetBucketNotificationPeer", args, &reply) } // remoteBucketMetaState.UpdateBucketListener - sends bucket listener change to // remote peer via RPC call. func (rc *remoteBucketMetaState) UpdateBucketListener(args *SetBucketListenerPeerArgs) error { - reply := GenericReply{} + reply := AuthRPCReply{} return rc.Call("S3.SetBucketListenerPeer", args, &reply) } // remoteBucketMetaState.UpdateBucketPolicy - sends bucket policy change to remote // peer via RPC call. func (rc *remoteBucketMetaState) UpdateBucketPolicy(args *SetBucketPolicyPeerArgs) error { - reply := GenericReply{} + reply := AuthRPCReply{} return rc.Call("S3.SetBucketPolicyPeer", args, &reply) } // remoteBucketMetaState.SendEvent - sends event for bucket listener to remote // peer via RPC call. func (rc *remoteBucketMetaState) SendEvent(args *EventArgs) error { - reply := GenericReply{} + reply := AuthRPCReply{} return rc.Call("S3.Event", args, &reply) } diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index 0a3b9e8d8..b7d7414e5 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -404,8 +404,7 @@ func AddBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI ObjectL func RemoveBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI ObjectLayer) { listenerCfgs := globalEventNotifier.GetBucketListenerConfig(bucket) - // remove listener with matching ARN - if not found ignore and - // exit. + // remove listener with matching ARN - if not found ignore and exit. var updatedLcfgs []listenerConfig found := false for k, configuredLcfg := range listenerCfgs { diff --git a/cmd/event-notifier_test.go b/cmd/event-notifier_test.go index b255a56d1..ee842676a 100644 --- a/cmd/event-notifier_test.go +++ b/cmd/event-notifier_test.go @@ -227,6 +227,11 @@ func TestSetNGetBucketNotification(t *testing.T) { } func TestInitEventNotifier(t *testing.T) { + currentIsDistXL := globalIsDistXL + defer func() { + globalIsDistXL = currentIsDistXL + }() + s := TestPeerRPCServerData{serverType: "XL"} // setup and teardown @@ -323,6 +328,11 @@ func TestInitEventNotifier(t *testing.T) { } func TestListenBucketNotification(t *testing.T) { + currentIsDistXL := globalIsDistXL + defer func() { + globalIsDistXL = currentIsDistXL + }() + s := TestPeerRPCServerData{serverType: "XL"} // setup and teardown s.Setup(t) diff --git a/cmd/lock-rpc-client.go b/cmd/lock-rpc-client.go new file mode 100644 index 000000000..ab302e3cf --- /dev/null +++ b/cmd/lock-rpc-client.go @@ -0,0 +1,71 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import "github.com/minio/dsync" + +// LockRPCClient is authenticable lock RPC client compatible to dsync.NetLocker +type LockRPCClient struct { + *AuthRPCClient +} + +// newLockRPCClient returns new lock RPC client object. +func newLockRPCClient(config authConfig) *LockRPCClient { + return &LockRPCClient{newAuthRPCClient(config)} +} + +// RLock calls read lock RPC. +func (lockRPCClient *LockRPCClient) RLock(args dsync.LockArgs) (reply bool, err error) { + lockArgs := newLockArgs(args) + err = lockRPCClient.AuthRPCClient.Call("Dsync.RLock", &lockArgs, &reply) + return reply, err +} + +// Lock calls write lock RPC. +func (lockRPCClient *LockRPCClient) Lock(args dsync.LockArgs) (reply bool, err error) { + lockArgs := newLockArgs(args) + err = lockRPCClient.AuthRPCClient.Call("Dsync.Lock", &lockArgs, &reply) + return reply, err +} + +// RUnlock calls read unlock RPC. +func (lockRPCClient *LockRPCClient) RUnlock(args dsync.LockArgs) (reply bool, err error) { + lockArgs := newLockArgs(args) + err = lockRPCClient.AuthRPCClient.Call("Dsync.RUnlock", &lockArgs, &reply) + return reply, err +} + +// Unlock calls write unlock RPC. +func (lockRPCClient *LockRPCClient) Unlock(args dsync.LockArgs) (reply bool, err error) { + lockArgs := newLockArgs(args) + err = lockRPCClient.AuthRPCClient.Call("Dsync.Unlock", &lockArgs, &reply) + return reply, err +} + +// ForceUnlock calls force unlock RPC. +func (lockRPCClient *LockRPCClient) ForceUnlock(args dsync.LockArgs) (reply bool, err error) { + lockArgs := newLockArgs(args) + err = lockRPCClient.AuthRPCClient.Call("Dsync.ForceUnlock", &lockArgs, &reply) + return reply, err +} + +// Expired calls expired RPC. +func (lockRPCClient *LockRPCClient) Expired(args dsync.LockArgs) (reply bool, err error) { + lockArgs := newLockArgs(args) + err = lockRPCClient.AuthRPCClient.Call("Dsync.Expired", &lockArgs, &reply) + return reply, err +} diff --git a/cmd/lock-rpc-server-common.go b/cmd/lock-rpc-server-common.go index c0e9a11ad..41856172c 100644 --- a/cmd/lock-rpc-server-common.go +++ b/cmd/lock-rpc-server-common.go @@ -57,20 +57,6 @@ func (l *lockServer) removeEntry(name, uid string, lri *[]lockRequesterInfo) boo return false } -// Validate lock args. -// - validate time stamp. -// - validate jwt token. -func (l *lockServer) validateLockArgs(args *LockArgs) error { - curTime := time.Now().UTC() - if curTime.Sub(args.Timestamp) > globalMaxSkewTime || args.Timestamp.Sub(curTime) > globalMaxSkewTime { - return errServerTimeMismatch - } - if !isAuthTokenValid(args.Token) { - return errInvalidToken - } - return nil -} - // getLongLivedLocks returns locks that are older than a certain time and // have not been 'checked' for validity too soon enough func getLongLivedLocks(m map[string][]lockRequesterInfo, interval time.Duration) []nameLockRequesterInfoPair { diff --git a/cmd/lock-rpc-server.go b/cmd/lock-rpc-server.go index 6e362557c..63873d4ac 100644 --- a/cmd/lock-rpc-server.go +++ b/cmd/lock-rpc-server.go @@ -25,32 +25,19 @@ import ( "time" router "github.com/gorilla/mux" + "github.com/minio/dsync" ) -const lockRPCPath = "/minio/lock" -const lockMaintenanceLoop = 1 * time.Minute -const lockCheckValidityInterval = 2 * time.Minute +const ( + // Lock rpc server endpoint. + lockRPCPath = "/minio/lock" -// LockArgs besides lock name, holds Token and Timestamp for session -// authentication and validation server restart. -type LockArgs struct { - Name string - Token string - Timestamp time.Time - Node string - RPCPath string - UID string -} + // Lock maintenance interval. + lockMaintenanceInterval = 1 * time.Minute // 1 minute. -// SetToken - sets the token to the supplied value. -func (l *LockArgs) SetToken(token string) { - l.Token = token -} - -// SetTimestamp - sets the timestamp to the supplied value. -func (l *LockArgs) SetTimestamp(tstamp time.Time) { - l.Timestamp = tstamp -} + // Lock validity check interval. + lockValidityCheckInterval = 2 * time.Minute // 2 minutes. +) // lockRequesterInfo stores various info from the client for each lock that is requested type lockRequesterInfo struct { @@ -69,43 +56,61 @@ func isWriteLock(lri []lockRequesterInfo) bool { // lockServer is type for RPC handlers type lockServer struct { - loginServer + AuthRPCServer rpcPath string mutex sync.Mutex lockMap map[string][]lockRequesterInfo } +// Start lock maintenance from all lock servers. +func startLockMaintainence(lockServers []*lockServer) { + for _, locker := range lockServers { + // Start loop for stale lock maintenance + go func(lk *lockServer) { + // Initialize a new ticker with a minute between each ticks. + ticker := time.NewTicker(lockMaintenanceInterval) + + // Start with random sleep time, so as to avoid "synchronous checks" between servers + time.Sleep(time.Duration(rand.Float64() * float64(lockMaintenanceInterval))) + for { + // Verifies every minute for locks held more than 2minutes. + select { + case <-ticker.C: + lk.lockMaintenance(lockValidityCheckInterval) + case <-globalServiceDoneCh: + // Stop the timer. + ticker.Stop() + } + } + }(locker) + } +} + // Register distributed NS lock handlers. func registerDistNSLockRouter(mux *router.Router, serverConfig serverCmdConfig) error { + // Initialize a new set of lock servers. lockServers := newLockServers(serverConfig) + + // Start lock maintenance from all lock servers. + startLockMaintainence(lockServers) + + // Register initialized lock servers to their respective rpc endpoints. return registerStorageLockers(mux, lockServers) } // Create one lock server for every local storage rpc server. func newLockServers(srvConfig serverCmdConfig) (lockServers []*lockServer) { for _, ep := range srvConfig.endpoints { - // Not local storage move to the next node. - if !isLocalStorage(ep) { - continue - } - - // Create handler for lock RPCs - locker := &lockServer{ - rpcPath: getPath(ep), - mutex: sync.Mutex{}, - lockMap: make(map[string][]lockRequesterInfo), - } - - // Start loop for stale lock maintenance - go func() { - // Start with random sleep time, so as to avoid "synchronous checks" between servers - time.Sleep(time.Duration(rand.Float64() * float64(lockMaintenanceLoop))) - for { - time.Sleep(lockMaintenanceLoop) - locker.lockMaintenance(lockCheckValidityInterval) + // Initialize new lock server for each local node. + if isLocalStorage(ep) { + // Create handler for lock RPCs + locker := &lockServer{ + rpcPath: getPath(ep), + mutex: sync.Mutex{}, + lockMap: make(map[string][]lockRequesterInfo), } - }() - lockServers = append(lockServers, locker) + lockServers = append(lockServers, locker) + } } return lockServers } @@ -114,8 +119,7 @@ func newLockServers(srvConfig serverCmdConfig) (lockServers []*lockServer) { func registerStorageLockers(mux *router.Router, lockServers []*lockServer) error { for _, lockServer := range lockServers { lockRPCServer := rpc.NewServer() - err := lockRPCServer.RegisterName("Dsync", lockServer) - if err != nil { + if err := lockRPCServer.RegisterName("Dsync", lockServer); err != nil { return traceError(err) } lockRouter := mux.PathPrefix(reservedBucket).Subrouter() @@ -130,17 +134,17 @@ func registerStorageLockers(mux *router.Router, lockServers []*lockServer) error func (l *lockServer) Lock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - if err := l.validateLockArgs(args); err != nil { + if err := args.IsAuthenticated(); err != nil { return err } - _, *reply = l.lockMap[args.Name] + _, *reply = l.lockMap[args.dsyncLockArgs.Resource] if !*reply { // No locks held on the given name, so claim write lock - l.lockMap[args.Name] = []lockRequesterInfo{ + l.lockMap[args.dsyncLockArgs.Resource] = []lockRequesterInfo{ { writer: true, - node: args.Node, - rpcPath: args.RPCPath, - uid: args.UID, + node: args.dsyncLockArgs.ServerAddr, + rpcPath: args.dsyncLockArgs.ServiceEndpoint, + uid: args.dsyncLockArgs.UID, timestamp: time.Now().UTC(), timeLastCheck: time.Now().UTC(), }, @@ -154,18 +158,18 @@ func (l *lockServer) Lock(args *LockArgs, reply *bool) error { func (l *lockServer) Unlock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - if err := l.validateLockArgs(args); err != nil { + if err := args.IsAuthenticated(); err != nil { return err } var lri []lockRequesterInfo - if lri, *reply = l.lockMap[args.Name]; !*reply { // No lock is held on the given name - return fmt.Errorf("Unlock attempted on an unlocked entity: %s", args.Name) + if lri, *reply = l.lockMap[args.dsyncLockArgs.Resource]; !*reply { // No lock is held on the given name + return fmt.Errorf("Unlock attempted on an unlocked entity: %s", args.dsyncLockArgs.Resource) } if *reply = isWriteLock(lri); !*reply { // Unless it is a write lock - return fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.Name, len(lri)) + return fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.dsyncLockArgs.Resource, len(lri)) } - if !l.removeEntry(args.Name, args.UID, &lri) { - return fmt.Errorf("Unlock unable to find corresponding lock for uid: %s", args.UID) + if !l.removeEntry(args.dsyncLockArgs.Resource, args.dsyncLockArgs.UID, &lri) { + return fmt.Errorf("Unlock unable to find corresponding lock for uid: %s", args.dsyncLockArgs.UID) } return nil } @@ -174,23 +178,23 @@ func (l *lockServer) Unlock(args *LockArgs, reply *bool) error { func (l *lockServer) RLock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - if err := l.validateLockArgs(args); err != nil { + if err := args.IsAuthenticated(); err != nil { return err } lrInfo := lockRequesterInfo{ writer: false, - node: args.Node, - rpcPath: args.RPCPath, - uid: args.UID, + node: args.dsyncLockArgs.ServerAddr, + rpcPath: args.dsyncLockArgs.ServiceEndpoint, + uid: args.dsyncLockArgs.UID, timestamp: time.Now().UTC(), timeLastCheck: time.Now().UTC(), } - if lri, ok := l.lockMap[args.Name]; ok { + if lri, ok := l.lockMap[args.dsyncLockArgs.Resource]; ok { if *reply = !isWriteLock(lri); *reply { // Unless there is a write lock - l.lockMap[args.Name] = append(l.lockMap[args.Name], lrInfo) + l.lockMap[args.dsyncLockArgs.Resource] = append(l.lockMap[args.dsyncLockArgs.Resource], lrInfo) } } else { // No locks held on the given name, so claim (first) read lock - l.lockMap[args.Name] = []lockRequesterInfo{lrInfo} + l.lockMap[args.dsyncLockArgs.Resource] = []lockRequesterInfo{lrInfo} *reply = true } return nil @@ -200,18 +204,18 @@ func (l *lockServer) RLock(args *LockArgs, reply *bool) error { func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - if err := l.validateLockArgs(args); err != nil { + if err := args.IsAuthenticated(); err != nil { return err } var lri []lockRequesterInfo - if lri, *reply = l.lockMap[args.Name]; !*reply { // No lock is held on the given name - return fmt.Errorf("RUnlock attempted on an unlocked entity: %s", args.Name) + if lri, *reply = l.lockMap[args.dsyncLockArgs.Resource]; !*reply { // No lock is held on the given name + return fmt.Errorf("RUnlock attempted on an unlocked entity: %s", args.dsyncLockArgs.Resource) } if *reply = !isWriteLock(lri); !*reply { // A write-lock is held, cannot release a read lock - return fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.Name) + return fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.dsyncLockArgs.Resource) } - if !l.removeEntry(args.Name, args.UID, &lri) { - return fmt.Errorf("RUnlock unable to find corresponding read lock for uid: %s", args.UID) + if !l.removeEntry(args.dsyncLockArgs.Resource, args.dsyncLockArgs.UID, &lri) { + return fmt.Errorf("RUnlock unable to find corresponding read lock for uid: %s", args.dsyncLockArgs.UID) } return nil } @@ -220,14 +224,14 @@ func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error { func (l *lockServer) ForceUnlock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - if err := l.validateLockArgs(args); err != nil { + if err := args.IsAuthenticated(); err != nil { return err } - if len(args.UID) != 0 { - return fmt.Errorf("ForceUnlock called with non-empty UID: %s", args.UID) + if len(args.dsyncLockArgs.UID) != 0 { + return fmt.Errorf("ForceUnlock called with non-empty UID: %s", args.dsyncLockArgs.UID) } - if _, ok := l.lockMap[args.Name]; ok { // Only clear lock when set - delete(l.lockMap, args.Name) // Remove the lock (irrespective of write or read lock) + if _, ok := l.lockMap[args.dsyncLockArgs.Resource]; ok { // Only clear lock when set + delete(l.lockMap, args.dsyncLockArgs.Resource) // Remove the lock (irrespective of write or read lock) } *reply = true return nil @@ -237,21 +241,21 @@ func (l *lockServer) ForceUnlock(args *LockArgs, reply *bool) error { func (l *lockServer) Expired(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - if err := l.validateLockArgs(args); err != nil { + if err := args.IsAuthenticated(); err != nil { return err } // Lock found, proceed to verify if belongs to given uid. - if lri, ok := l.lockMap[args.Name]; ok { + if lri, ok := l.lockMap[args.dsyncLockArgs.Resource]; ok { // Check whether uid is still active for _, entry := range lri { - if entry.uid == args.UID { + if entry.uid == args.dsyncLockArgs.UID { *reply = false // When uid found, lock is still active so return not expired. return nil // When uid found *reply is set to true. } } } - // When we get here lock is no longer active due to either args.Name - // being absent from map or uid not found for given args.Name + // When we get here lock is no longer active due to either args.dsyncLockArgs.Resource + // being absent from map or uid not found for given args.dsyncLockArgs.Resource *reply = true return nil } @@ -276,19 +280,24 @@ func (l *lockServer) lockMaintenance(interval time.Duration) { nlripLongLived := getLongLivedLocks(l.lockMap, interval) l.mutex.Unlock() + serverCred := serverConfig.GetCredential() // Validate if long lived locks are indeed clean. for _, nlrip := range nlripLongLived { // Initialize client based on the long live locks. - c := newRPCClient(nlrip.lri.node, nlrip.lri.rpcPath, isSSL()) - - var expired bool + c := newLockRPCClient(authConfig{ + accessKey: serverCred.AccessKey, + secretKey: serverCred.SecretKey, + serverAddr: nlrip.lri.node, + serviceEndpoint: nlrip.lri.rpcPath, + secureConn: isSSL(), + serviceName: "Dsync", + }) // Call back to original server verify whether the lock is still active (based on name & uid) - c.Call("Dsync.Expired", &LockArgs{ - Name: nlrip.name, - UID: nlrip.lri.uid, - }, &expired) - c.Close() // Close the connection regardless of the call response. + expired, _ := c.Expired(dsync.LockArgs{UID: nlrip.lri.uid, Resource: nlrip.name}) + + // Close the connection regardless of the call response. + c.rpcClient.Close() // For successful response, verify if lock is indeed active or stale. if expired { diff --git a/cmd/lock-rpc-server_test.go b/cmd/lock-rpc-server_test.go index 27098309e..eb8bebd31 100644 --- a/cmd/lock-rpc-server_test.go +++ b/cmd/lock-rpc-server_test.go @@ -22,6 +22,8 @@ import ( "sync" "testing" "time" + + "github.com/minio/dsync" ) // Helper function to test equality of locks (without taking timing info into account) @@ -49,38 +51,41 @@ func createLockTestServer(t *testing.T) (string, *lockServer, string) { } locker := &lockServer{ - loginServer: loginServer{}, - rpcPath: "rpc-path", - mutex: sync.Mutex{}, - lockMap: make(map[string][]lockRequesterInfo), + AuthRPCServer: AuthRPCServer{}, + rpcPath: "rpc-path", + mutex: sync.Mutex{}, + lockMap: make(map[string][]lockRequesterInfo), } creds := serverConfig.GetCredential() - loginArgs := RPCLoginArgs{Username: creds.AccessKey, Password: creds.SecretKey} - loginReply := RPCLoginReply{} - err = locker.LoginHandler(&loginArgs, &loginReply) + loginArgs := LoginRPCArgs{ + Username: creds.AccessKey, + Password: creds.SecretKey, + Version: Version, + RequestTime: time.Now().UTC(), + } + loginReply := LoginRPCReply{} + err = locker.Login(&loginArgs, &loginReply) if err != nil { t.Fatalf("Failed to login to lock server - %v", err) } - token := loginReply.Token + token := loginReply.AuthToken return testPath, locker, token } // Test Lock functionality func TestLockRpcServerLock(t *testing.T) { - - timestamp := time.Now().UTC() testPath, locker, token := createLockTestServer(t) defer removeAll(testPath) - la := LockArgs{ - Name: "name", - Token: token, - Timestamp: timestamp, - Node: "node", - RPCPath: "rpc-path", - UID: "0123-4567", - } + la := newLockArgs(dsync.LockArgs{ + UID: "0123-4567", + Resource: "name", + ServerAddr: "node", + ServiceEndpoint: "rpc-path", + }) + la.SetAuthToken(token) + la.SetRequestTime(time.Now().UTC()) // Claim a lock var result bool @@ -107,14 +112,15 @@ func TestLockRpcServerLock(t *testing.T) { } // Try to claim same lock again (will fail) - la2 := LockArgs{ - Name: "name", - Token: token, - Timestamp: timestamp, - Node: "node", - RPCPath: "rpc-path", - UID: "89ab-cdef", - } + la2 := newLockArgs(dsync.LockArgs{ + UID: "89ab-cdef", + Resource: "name", + ServerAddr: "node", + ServiceEndpoint: "rpc-path", + }) + la2.SetAuthToken(token) + la2.SetRequestTime(time.Now().UTC()) + err = locker.Lock(&la2, &result) if err != nil { t.Errorf("Expected %#v, got %#v", nil, err) @@ -127,19 +133,17 @@ func TestLockRpcServerLock(t *testing.T) { // Test Unlock functionality func TestLockRpcServerUnlock(t *testing.T) { - - timestamp := time.Now().UTC() testPath, locker, token := createLockTestServer(t) defer removeAll(testPath) - la := LockArgs{ - Name: "name", - Token: token, - Timestamp: timestamp, - Node: "node", - RPCPath: "rpc-path", - UID: "0123-4567", - } + la := newLockArgs(dsync.LockArgs{ + UID: "0123-4567", + Resource: "name", + ServerAddr: "node", + ServiceEndpoint: "rpc-path", + }) + la.SetAuthToken(token) + la.SetRequestTime(time.Now().UTC()) // First test return of error when attempting to unlock a lock that does not exist var result bool @@ -149,6 +153,7 @@ func TestLockRpcServerUnlock(t *testing.T) { } // Create lock (so that we can release) + la.SetRequestTime(time.Now().UTC()) err = locker.Lock(&la, &result) if err != nil { t.Errorf("Expected %#v, got %#v", nil, err) @@ -157,6 +162,7 @@ func TestLockRpcServerUnlock(t *testing.T) { } // Finally test successful release of lock + la.SetRequestTime(time.Now().UTC()) err = locker.Unlock(&la, &result) if err != nil { t.Errorf("Expected %#v, got %#v", nil, err) @@ -175,19 +181,17 @@ func TestLockRpcServerUnlock(t *testing.T) { // Test RLock functionality func TestLockRpcServerRLock(t *testing.T) { - - timestamp := time.Now().UTC() testPath, locker, token := createLockTestServer(t) defer removeAll(testPath) - la := LockArgs{ - Name: "name", - Token: token, - Timestamp: timestamp, - Node: "node", - RPCPath: "rpc-path", - UID: "0123-4567", - } + la := newLockArgs(dsync.LockArgs{ + UID: "0123-4567", + Resource: "name", + ServerAddr: "node", + ServiceEndpoint: "rpc-path", + }) + la.SetAuthToken(token) + la.SetRequestTime(time.Now().UTC()) // Claim a lock var result bool @@ -214,14 +218,15 @@ func TestLockRpcServerRLock(t *testing.T) { } // Try to claim same again (will succeed) - la2 := LockArgs{ - Name: "name", - Token: token, - Timestamp: timestamp, - Node: "node", - RPCPath: "rpc-path", - UID: "89ab-cdef", - } + la2 := newLockArgs(dsync.LockArgs{ + UID: "89ab-cdef", + Resource: "name", + ServerAddr: "node", + ServiceEndpoint: "rpc-path", + }) + la2.SetAuthToken(token) + la2.SetRequestTime(time.Now().UTC()) + err = locker.RLock(&la2, &result) if err != nil { t.Errorf("Expected %#v, got %#v", nil, err) @@ -234,19 +239,17 @@ func TestLockRpcServerRLock(t *testing.T) { // Test RUnlock functionality func TestLockRpcServerRUnlock(t *testing.T) { - - timestamp := time.Now().UTC() testPath, locker, token := createLockTestServer(t) defer removeAll(testPath) - la := LockArgs{ - Name: "name", - Token: token, - Timestamp: timestamp, - Node: "node", - RPCPath: "rpc-path", - UID: "0123-4567", - } + la := newLockArgs(dsync.LockArgs{ + UID: "0123-4567", + Resource: "name", + ServerAddr: "node", + ServiceEndpoint: "rpc-path", + }) + la.SetAuthToken(token) + la.SetRequestTime(time.Now().UTC()) // First test return of error when attempting to unlock a read-lock that does not exist var result bool @@ -256,6 +259,7 @@ func TestLockRpcServerRUnlock(t *testing.T) { } // Create first lock ... (so that we can release) + la.SetRequestTime(time.Now().UTC()) err = locker.RLock(&la, &result) if err != nil { t.Errorf("Expected %#v, got %#v", nil, err) @@ -263,14 +267,15 @@ func TestLockRpcServerRUnlock(t *testing.T) { t.Errorf("Expected %#v, got %#v", true, result) } - la2 := LockArgs{ - Name: "name", - Token: token, - Timestamp: timestamp, - Node: "node", - RPCPath: "rpc-path", - UID: "89ab-cdef", - } + // Try to claim same again (will succeed) + la2 := newLockArgs(dsync.LockArgs{ + UID: "89ab-cdef", + Resource: "name", + ServerAddr: "node", + ServiceEndpoint: "rpc-path", + }) + la2.SetAuthToken(token) + la2.SetRequestTime(time.Now().UTC()) // ... and create a second lock on same resource err = locker.RLock(&la2, &result) @@ -281,6 +286,7 @@ func TestLockRpcServerRUnlock(t *testing.T) { } // Test successful release of first read lock + la.SetRequestTime(time.Now().UTC()) err = locker.RUnlock(&la, &result) if err != nil { t.Errorf("Expected %#v, got %#v", nil, err) @@ -305,6 +311,7 @@ func TestLockRpcServerRUnlock(t *testing.T) { } // Finally test successful release of second (and last) read lock + la2.SetRequestTime(time.Now().UTC()) err = locker.RUnlock(&la2, &result) if err != nil { t.Errorf("Expected %#v, got %#v", nil, err) @@ -323,19 +330,17 @@ func TestLockRpcServerRUnlock(t *testing.T) { // Test ForceUnlock functionality func TestLockRpcServerForceUnlock(t *testing.T) { - - timestamp := time.Now().UTC() testPath, locker, token := createLockTestServer(t) defer removeAll(testPath) - laForce := LockArgs{ - Name: "name", - Token: token, - Timestamp: timestamp, - Node: "node", - RPCPath: "rpc-path", - UID: "1234-5678", - } + laForce := newLockArgs(dsync.LockArgs{ + UID: "1234-5678", + Resource: "name", + ServerAddr: "node", + ServiceEndpoint: "rpc-path", + }) + laForce.SetAuthToken(token) + laForce.SetRequestTime(time.Now().UTC()) // First test that UID should be empty var result bool @@ -345,20 +350,21 @@ func TestLockRpcServerForceUnlock(t *testing.T) { } // Then test force unlock of a lock that does not exist (not returning an error) - laForce.UID = "" + laForce.dsyncLockArgs.UID = "" + laForce.SetRequestTime(time.Now().UTC()) err = locker.ForceUnlock(&laForce, &result) if err != nil { t.Errorf("Expected no error, got %#v", err) } - la := LockArgs{ - Name: "name", - Token: token, - Timestamp: timestamp, - Node: "node", - RPCPath: "rpc-path", - UID: "0123-4567", - } + la := newLockArgs(dsync.LockArgs{ + UID: "0123-4567", + Resource: "name", + ServerAddr: "node", + ServiceEndpoint: "rpc-path", + }) + la.SetAuthToken(token) + la.SetRequestTime(time.Now().UTC()) // Create lock ... (so that we can force unlock) err = locker.Lock(&la, &result) @@ -369,12 +375,14 @@ func TestLockRpcServerForceUnlock(t *testing.T) { } // Forcefully unlock the lock (not returning an error) + laForce.SetRequestTime(time.Now().UTC()) err = locker.ForceUnlock(&laForce, &result) if err != nil { t.Errorf("Expected no error, got %#v", err) } // Try to get lock again (should be granted) + la.SetRequestTime(time.Now().UTC()) err = locker.Lock(&la, &result) if err != nil { t.Errorf("Expected %#v, got %#v", nil, err) @@ -383,6 +391,7 @@ func TestLockRpcServerForceUnlock(t *testing.T) { } // Finally forcefully unlock the lock once again + laForce.SetRequestTime(time.Now().UTC()) err = locker.ForceUnlock(&laForce, &result) if err != nil { t.Errorf("Expected no error, got %#v", err) @@ -391,18 +400,17 @@ func TestLockRpcServerForceUnlock(t *testing.T) { // Test Expired functionality func TestLockRpcServerExpired(t *testing.T) { - timestamp := time.Now().UTC() testPath, locker, token := createLockTestServer(t) defer removeAll(testPath) - la := LockArgs{ - Name: "name", - Token: token, - Timestamp: timestamp, - Node: "node", - RPCPath: "rpc-path", - UID: "0123-4567", - } + la := newLockArgs(dsync.LockArgs{ + UID: "0123-4567", + Resource: "name", + ServerAddr: "node", + ServiceEndpoint: "rpc-path", + }) + la.SetAuthToken(token) + la.SetRequestTime(time.Now().UTC()) // Unknown lock at server will return expired = true var expired bool @@ -417,6 +425,7 @@ func TestLockRpcServerExpired(t *testing.T) { // Create lock (so that we can test that it is not expired) var result bool + la.SetRequestTime(time.Now().UTC()) err = locker.Lock(&la, &result) if err != nil { t.Errorf("Expected %#v, got %#v", nil, err) @@ -424,6 +433,7 @@ func TestLockRpcServerExpired(t *testing.T) { t.Errorf("Expected %#v, got %#v", true, result) } + la.SetRequestTime(time.Now().UTC()) err = locker.Expired(&la, &expired) if err != nil { t.Errorf("Expected no error, got %#v", err) @@ -439,6 +449,12 @@ func TestLockServers(t *testing.T) { if runtime.GOOS == "windows" { return } + + currentIsDistXL := globalIsDistXL + defer func() { + globalIsDistXL = currentIsDistXL + }() + globalMinioHost = "" testCases := []struct { isDistXL bool diff --git a/cmd/login-server_test.go b/cmd/login-server_test.go deleted file mode 100644 index 3d8f66a97..000000000 --- a/cmd/login-server_test.go +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import "testing" - -func TestLoginHandler(t *testing.T) { - rootPath, err := newTestConfig("us-east-1") - if err != nil { - t.Fatalf("Failed to create test config - %v", err) - } - defer removeAll(rootPath) - creds := serverConfig.GetCredential() - ls := loginServer{} - testCases := []struct { - args RPCLoginArgs - expectedErr error - }{ - // Valid username and password - { - args: RPCLoginArgs{Username: creds.AccessKey, Password: creds.SecretKey}, - expectedErr: nil, - }, - // Invalid username length - { - args: RPCLoginArgs{Username: "aaa", Password: "minio123"}, - expectedErr: errInvalidAccessKeyLength, - }, - // Invalid password length - { - args: RPCLoginArgs{Username: "minio", Password: "aaa"}, - expectedErr: errInvalidSecretKeyLength, - }, - // Invalid username - { - args: RPCLoginArgs{Username: "aaaaa", Password: creds.SecretKey}, - expectedErr: errInvalidAccessKeyID, - }, - // Invalid password - { - args: RPCLoginArgs{Username: creds.AccessKey, Password: "aaaaaaaa"}, - expectedErr: errAuthentication, - }, - } - for i, test := range testCases { - reply := RPCLoginReply{} - err := ls.LoginHandler(&test.args, &reply) - if err != test.expectedErr { - t.Errorf("Test %d: Expected error %v but received %v", - i+1, test.expectedErr, err) - } - } -} diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index b59261408..cdb4f0f5c 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -33,27 +33,26 @@ var globalNSMutex *nsLockMap func initDsyncNodes(eps []*url.URL) error { cred := serverConfig.GetCredential() // Initialize rpc lock client information only if this instance is a distributed setup. - clnts := make([]dsync.RPC, len(eps)) + clnts := make([]dsync.NetLocker, len(eps)) myNode := -1 for index, ep := range eps { if ep == nil { return errInvalidArgument } - clnts[index] = newAuthClient(&authConfig{ - accessKey: cred.AccessKey, - secretKey: cred.SecretKey, - // Construct a new dsync server addr. - secureConn: isSSL(), - address: ep.Host, - // Construct a new rpc path for the endpoint. - path: pathutil.Join(lockRPCPath, getPath(ep)), - loginMethod: "Dsync.LoginHandler", + clnts[index] = newLockRPCClient(authConfig{ + accessKey: cred.AccessKey, + secretKey: cred.SecretKey, + serverAddr: ep.Host, + serviceEndpoint: pathutil.Join(lockRPCPath, getPath(ep)), + secureConn: isSSL(), + serviceName: "Dsync", }) if isLocalStorage(ep) && myNode == -1 { myNode = index } } - return dsync.SetNodesWithClients(clnts, myNode) + + return dsync.Init(clnts, myNode) } // initNSLock - initialize name space lock map. diff --git a/cmd/net-rpc-client.go b/cmd/net-rpc-client.go index 1ba1bb8e5..ad50bd10d 100644 --- a/cmd/net-rpc-client.go +++ b/cmd/net-rpc-client.go @@ -33,79 +33,83 @@ import ( // defaultDialTimeout is used for non-secure connection. const defaultDialTimeout = 3 * time.Second -// RPCClient is a wrapper type for rpc.Client which provides reconnect on first failure. +// RPCClient is a reconnectable RPC client on Call(). type RPCClient struct { - mu sync.Mutex - netRPCClient *rpc.Client - node string - rpcPath string - secureConn bool + sync.Mutex // Mutex to lock net rpc client. + netRPCClient *rpc.Client // Base RPC client to make any RPC call. + serverAddr string // RPC server address. + serviceEndpoint string // Endpoint on the server to make any RPC call. + secureConn bool // Make TLS connection to RPC server or not. } -// newClient constructs a RPCClient object with node and rpcPath initialized. +// newRPCClient returns new RPCClient object with given serverAddr and serviceEndpoint. // It does lazy connect to the remote endpoint on Call(). -func newRPCClient(node, rpcPath string, secureConn bool) *RPCClient { +func newRPCClient(serverAddr, serviceEndpoint string, secureConn bool) *RPCClient { return &RPCClient{ - node: node, - rpcPath: rpcPath, - secureConn: secureConn, + serverAddr: serverAddr, + serviceEndpoint: serviceEndpoint, + secureConn: secureConn, } } -// dial tries to establish a connection to the server in a safe manner. +// dial tries to establish a connection to serverAddr in a safe manner. // If there is a valid rpc.Cliemt, it returns that else creates a new one. -func (rpcClient *RPCClient) dial() (*rpc.Client, error) { - rpcClient.mu.Lock() - defer rpcClient.mu.Unlock() +func (rpcClient *RPCClient) dial() (netRPCClient *rpc.Client, err error) { + rpcClient.Lock() + defer rpcClient.Unlock() // Nothing to do as we already have valid connection. if rpcClient.netRPCClient != nil { return rpcClient.netRPCClient, nil } - var err error var conn net.Conn if rpcClient.secureConn { - hostname, _, splitErr := net.SplitHostPort(rpcClient.node) - if splitErr != nil { - err = errors.New("Unable to parse RPC address <" + rpcClient.node + "> : " + splitErr.Error()) - return nil, &net.OpError{ + var hostname string + if hostname, _, err = net.SplitHostPort(rpcClient.serverAddr); err != nil { + err = &net.OpError{ Op: "dial-http", - Net: rpcClient.node + " " + rpcClient.rpcPath, + Net: rpcClient.serverAddr + rpcClient.serviceEndpoint, Addr: nil, - Err: err, + Err: fmt.Errorf("Unable to parse server address <%s>: %s", rpcClient.serverAddr, err.Error()), } + + return nil, err } - // ServerName in tls.Config needs to be specified to support SNI certificates - conn, err = tls.Dial("tcp", rpcClient.node, &tls.Config{ServerName: hostname, RootCAs: globalRootCAs}) + + // ServerName in tls.Config needs to be specified to support SNI certificates. + conn, err = tls.Dial("tcp", rpcClient.serverAddr, &tls.Config{ServerName: hostname, RootCAs: globalRootCAs}) } else { - // Dial with 3 seconds timeout. - conn, err = net.DialTimeout("tcp", rpcClient.node, defaultDialTimeout) + // Dial with a timeout. + conn, err = net.DialTimeout("tcp", rpcClient.serverAddr, defaultDialTimeout) } + if err != nil { - // Print RPC connection errors that are worthy to display in log + // Print RPC connection errors that are worthy to display in log. switch err.(type) { case x509.HostnameError: - errorIf(err, "Unable to establish secure connection to %s", rpcClient.node) + errorIf(err, "Unable to establish secure connection to %s", rpcClient.serverAddr) } + return nil, &net.OpError{ Op: "dial-http", - Net: rpcClient.node + " " + rpcClient.rpcPath, + Net: rpcClient.serverAddr + rpcClient.serviceEndpoint, Addr: nil, Err: err, } } - io.WriteString(conn, "CONNECT "+rpcClient.rpcPath+" HTTP/1.0\n\n") + io.WriteString(conn, "CONNECT "+rpcClient.serviceEndpoint+" HTTP/1.0\n\n") // Require successful HTTP response before switching to RPC protocol. resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"}) if err == nil && resp.Status == "200 Connected to Go RPC" { netRPCClient := rpc.NewClient(conn) + if netRPCClient == nil { return nil, &net.OpError{ Op: "dial-http", - Net: rpcClient.node + " " + rpcClient.rpcPath, + Net: rpcClient.serverAddr + rpcClient.serviceEndpoint, Addr: nil, Err: fmt.Errorf("Unable to initialize new rpc.Client, %s", errUnexpected), } @@ -116,13 +120,15 @@ func (rpcClient *RPCClient) dial() (*rpc.Client, error) { return netRPCClient, nil } + conn.Close() + if err == nil { err = errors.New("unexpected HTTP response: " + resp.Status) } - conn.Close() + return nil, &net.OpError{ Op: "dial-http", - Net: rpcClient.node + " " + rpcClient.rpcPath, + Net: rpcClient.serverAddr + rpcClient.serviceEndpoint, Addr: nil, Err: err, } @@ -141,28 +147,18 @@ func (rpcClient *RPCClient) Call(serviceMethod string, args interface{}, reply i // Close closes underlying rpc.Client. func (rpcClient *RPCClient) Close() error { - rpcClient.mu.Lock() + rpcClient.Lock() if rpcClient.netRPCClient != nil { // We make a copy of rpc.Client and unlock it immediately so that another // goroutine could try to dial or close in parallel. netRPCClient := rpcClient.netRPCClient rpcClient.netRPCClient = nil - rpcClient.mu.Unlock() + rpcClient.Unlock() return netRPCClient.Close() } - rpcClient.mu.Unlock() + rpcClient.Unlock() return nil } - -// Node returns the node (network address) of the connection -func (rpcClient *RPCClient) Node() string { - return rpcClient.node -} - -// RPCPath returns the RPC path of the connection -func (rpcClient *RPCClient) RPCPath() string { - return rpcClient.rpcPath -} diff --git a/cmd/retry-storage.go b/cmd/retry-storage.go index c30c152a3..23a7526a3 100644 --- a/cmd/retry-storage.go +++ b/cmd/retry-storage.go @@ -233,6 +233,7 @@ func (f retryStorage) reInit() (err error) { } return err } + // Attempt to load format to see if the disk is really // a formatted disk and part of the cluster. _, err = loadFormat(f.remoteStorage) @@ -244,6 +245,7 @@ func (f retryStorage) reInit() (err error) { } return err } + // Login and loading format was a success, break and proceed forward. break } diff --git a/cmd/rpc-common.go b/cmd/rpc-common.go new file mode 100644 index 000000000..8180aad45 --- /dev/null +++ b/cmd/rpc-common.go @@ -0,0 +1,111 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "time" + + "github.com/minio/dsync" +) + +// Allow any RPC call request time should be no more/less than 3 seconds. +// 3 seconds is chosen arbitrarily. +const rpcSkewTimeAllowed = 3 * time.Second + +func isRequestTimeAllowed(requestTime time.Time) bool { + // Check whether request time is within acceptable skew time. + utcNow := time.Now().UTC() + return !(requestTime.Sub(utcNow) > rpcSkewTimeAllowed || + utcNow.Sub(requestTime) > rpcSkewTimeAllowed) +} + +// AuthRPCArgs represents minimum required arguments to make any authenticated RPC call. +type AuthRPCArgs struct { + // Authentication token to be verified by the server for every RPC call. + AuthToken string + + // Request time to be verified by the server for every RPC call. + // This is an addition check over Authentication token for time drifting. + RequestTime time.Time +} + +// SetAuthToken - sets the token to the supplied value. +func (args *AuthRPCArgs) SetAuthToken(authToken string) { + args.AuthToken = authToken +} + +// SetRequestTime - sets the requestTime to the supplied value. +func (args *AuthRPCArgs) SetRequestTime(requestTime time.Time) { + args.RequestTime = requestTime +} + +// IsAuthenticated - validated whether this auth RPC args are already authenticated or not. +func (args AuthRPCArgs) IsAuthenticated() error { + // Check whether the token is valid + if !isAuthTokenValid(args.AuthToken) { + return errInvalidToken + } + + // Check if the request time is within the allowed skew limit. + if !isRequestTimeAllowed(args.RequestTime) { + return errServerTimeMismatch + } + + // Good to go. + return nil +} + +// AuthRPCReply represents minimum required reply for any authenticated RPC call. +type AuthRPCReply struct{} + +// LoginRPCArgs - login username and password for RPC. +type LoginRPCArgs struct { + Username string + Password string + Version string + RequestTime time.Time +} + +// IsValid - validates whether this LoginRPCArgs are valid for authentication. +func (args LoginRPCArgs) IsValid() error { + // Check if version matches. + if args.Version != Version { + return errServerVersionMismatch + } + + if !isRequestTimeAllowed(args.RequestTime) { + return errServerTimeMismatch + } + + return nil +} + +// LoginRPCReply - login reply provides generated token to be used +// with subsequent requests. +type LoginRPCReply struct { + AuthToken string +} + +// LockArgs represents arguments for any authenticated lock RPC call. +type LockArgs struct { + AuthRPCArgs + dsyncLockArgs dsync.LockArgs +} + +func newLockArgs(args dsync.LockArgs) LockArgs { + return LockArgs{dsyncLockArgs: args} +} diff --git a/cmd/s3-peer-client.go b/cmd/s3-peer-client.go index 670eb14bf..0f0cba687 100644 --- a/cmd/s3-peer-client.go +++ b/cmd/s3-peer-client.go @@ -52,6 +52,7 @@ func makeS3Peers(eps []*url.URL) s3Peers { }) seenAddr[globalMinioAddr] = true + serverCred := serverConfig.GetCredential() // iterate over endpoints to find new remote peers and add // them to ret. for _, ep := range eps { @@ -62,17 +63,17 @@ func makeS3Peers(eps []*url.URL) s3Peers { // Check if the remote host has been added already if !seenAddr[ep.Host] { cfg := authConfig{ - accessKey: serverConfig.GetCredential().AccessKey, - secretKey: serverConfig.GetCredential().SecretKey, - address: ep.Host, - secureConn: isSSL(), - path: path.Join(reservedBucket, s3Path), - loginMethod: "S3.LoginHandler", + accessKey: serverCred.AccessKey, + secretKey: serverCred.SecretKey, + serverAddr: ep.Host, + serviceEndpoint: path.Join(reservedBucket, s3Path), + secureConn: isSSL(), + serviceName: "S3", } ret = append(ret, s3Peer{ addr: ep.Host, - bmsClient: &remoteBucketMetaState{newAuthClient(&cfg)}, + bmsClient: &remoteBucketMetaState{newAuthRPCClient(cfg)}, }) seenAddr[ep.Host] = true } diff --git a/cmd/s3-peer-router.go b/cmd/s3-peer-router.go index e6575f967..e843c17bb 100644 --- a/cmd/s3-peer-router.go +++ b/cmd/s3-peer-router.go @@ -27,13 +27,13 @@ const ( ) type s3PeerAPIHandlers struct { - loginServer + AuthRPCServer bms BucketMetaState } func registerS3PeerRPCRouter(mux *router.Router) error { s3PeerHandlers := &s3PeerAPIHandlers{ - loginServer{}, + AuthRPCServer{}, &localBucketMetaState{ ObjectAPI: newObjectLayerFn, }, diff --git a/cmd/s3-peer-rpc-handlers.go b/cmd/s3-peer-rpc-handlers.go index 769d84f76..1f568875a 100644 --- a/cmd/s3-peer-rpc-handlers.go +++ b/cmd/s3-peer-rpc-handlers.go @@ -20,7 +20,7 @@ package cmd // call type SetBucketNotificationPeerArgs struct { // For Auth - GenericArgs + AuthRPCArgs Bucket string @@ -35,10 +35,9 @@ func (s *SetBucketNotificationPeerArgs) BucketUpdate(client BucketMetaState) err return client.UpdateBucketNotification(s) } -func (s3 *s3PeerAPIHandlers) SetBucketNotificationPeer(args *SetBucketNotificationPeerArgs, reply *GenericReply) error { - // check auth - if !isAuthTokenValid(args.Token) { - return errInvalidToken +func (s3 *s3PeerAPIHandlers) SetBucketNotificationPeer(args *SetBucketNotificationPeerArgs, reply *AuthRPCReply) error { + if err := args.IsAuthenticated(); err != nil { + return err } return s3.bms.UpdateBucketNotification(args) @@ -47,7 +46,7 @@ func (s3 *s3PeerAPIHandlers) SetBucketNotificationPeer(args *SetBucketNotificati // SetBucketListenerPeerArgs - Arguments collection to SetBucketListenerPeer RPC call type SetBucketListenerPeerArgs struct { // For Auth - GenericArgs + AuthRPCArgs Bucket string @@ -62,10 +61,9 @@ func (s *SetBucketListenerPeerArgs) BucketUpdate(client BucketMetaState) error { return client.UpdateBucketListener(s) } -func (s3 *s3PeerAPIHandlers) SetBucketListenerPeer(args *SetBucketListenerPeerArgs, reply *GenericReply) error { - // check auth - if !isAuthTokenValid(args.Token) { - return errInvalidToken +func (s3 *s3PeerAPIHandlers) SetBucketListenerPeer(args *SetBucketListenerPeerArgs, reply *AuthRPCReply) error { + if err := args.IsAuthenticated(); err != nil { + return err } return s3.bms.UpdateBucketListener(args) @@ -74,7 +72,7 @@ func (s3 *s3PeerAPIHandlers) SetBucketListenerPeer(args *SetBucketListenerPeerAr // EventArgs - Arguments collection for Event RPC call type EventArgs struct { // For Auth - GenericArgs + AuthRPCArgs // event being sent Event []NotificationEvent @@ -84,10 +82,9 @@ type EventArgs struct { } // submit an event to the receiving server. -func (s3 *s3PeerAPIHandlers) Event(args *EventArgs, reply *GenericReply) error { - // check auth - if !isAuthTokenValid(args.Token) { - return errInvalidToken +func (s3 *s3PeerAPIHandlers) Event(args *EventArgs, reply *AuthRPCReply) error { + if err := args.IsAuthenticated(); err != nil { + return err } return s3.bms.SendEvent(args) @@ -96,7 +93,7 @@ func (s3 *s3PeerAPIHandlers) Event(args *EventArgs, reply *GenericReply) error { // SetBucketPolicyPeerArgs - Arguments collection for SetBucketPolicyPeer RPC call type SetBucketPolicyPeerArgs struct { // For Auth - GenericArgs + AuthRPCArgs Bucket string @@ -112,10 +109,9 @@ func (s *SetBucketPolicyPeerArgs) BucketUpdate(client BucketMetaState) error { } // tell receiving server to update a bucket policy -func (s3 *s3PeerAPIHandlers) SetBucketPolicyPeer(args *SetBucketPolicyPeerArgs, reply *GenericReply) error { - // check auth - if !isAuthTokenValid(args.Token) { - return errInvalidToken +func (s3 *s3PeerAPIHandlers) SetBucketPolicyPeer(args *SetBucketPolicyPeerArgs, reply *AuthRPCReply) error { + if err := args.IsAuthenticated(); err != nil { + return err } return s3.bms.UpdateBucketPolicy(args) diff --git a/cmd/s3-peer-rpc-handlers_test.go b/cmd/s3-peer-rpc-handlers_test.go index ff46414bc..a9e813cb7 100644 --- a/cmd/s3-peer-rpc-handlers_test.go +++ b/cmd/s3-peer-rpc-handlers_test.go @@ -25,19 +25,19 @@ import ( type TestRPCS3PeerSuite struct { testServer TestServer - testAuthConf *authConfig + testAuthConf authConfig disks []string } // Set up the suite and start the test server. func (s *TestRPCS3PeerSuite) SetUpSuite(t *testing.T) { s.testServer, s.disks = StartTestS3PeerRPCServer(t) - s.testAuthConf = &authConfig{ - address: s.testServer.Server.Listener.Addr().String(), - accessKey: s.testServer.AccessKey, - secretKey: s.testServer.SecretKey, - path: path.Join(reservedBucket, s3Path), - loginMethod: "S3.LoginHandler", + s.testAuthConf = authConfig{ + serverAddr: s.testServer.Server.Listener.Addr().String(), + accessKey: s.testServer.AccessKey, + secretKey: s.testServer.SecretKey, + serviceEndpoint: path.Join(reservedBucket, s3Path), + serviceName: "S3", } } @@ -62,10 +62,10 @@ func TestS3PeerRPC(t *testing.T) { // Test S3 RPC handlers func (s *TestRPCS3PeerSuite) testS3PeerRPC(t *testing.T) { // Validate for invalid token. - args := GenericArgs{Token: "garbage", Timestamp: time.Now().UTC()} - rclient := newRPCClient(s.testAuthConf.address, s.testAuthConf.path, false) + args := AuthRPCArgs{AuthToken: "garbage", RequestTime: time.Now().UTC()} + rclient := newRPCClient(s.testAuthConf.serverAddr, s.testAuthConf.serviceEndpoint, false) defer rclient.Close() - err := rclient.Call("S3.SetBucketNotificationPeer", &args, &GenericReply{}) + err := rclient.Call("S3.SetBucketNotificationPeer", &args, &AuthRPCReply{}) if err != nil { if err.Error() != errInvalidToken.Error() { t.Fatal(err) @@ -74,16 +74,16 @@ func (s *TestRPCS3PeerSuite) testS3PeerRPC(t *testing.T) { // Check bucket notification call works. BNPArgs := SetBucketNotificationPeerArgs{Bucket: "bucket", NCfg: ¬ificationConfig{}} - client := newAuthClient(s.testAuthConf) + client := newAuthRPCClient(s.testAuthConf) defer client.Close() - err = client.Call("S3.SetBucketNotificationPeer", &BNPArgs, &GenericReply{}) + err = client.Call("S3.SetBucketNotificationPeer", &BNPArgs, &AuthRPCReply{}) if err != nil { t.Fatal(err) } // Check bucket listener update call works. BLPArgs := SetBucketListenerPeerArgs{Bucket: "bucket", LCfg: nil} - err = client.Call("S3.SetBucketListenerPeer", &BLPArgs, &GenericReply{}) + err = client.Call("S3.SetBucketListenerPeer", &BLPArgs, &AuthRPCReply{}) if err != nil { t.Fatal(err) } @@ -95,14 +95,14 @@ func (s *TestRPCS3PeerSuite) testS3PeerRPC(t *testing.T) { t.Fatal(err) } BPPArgs := SetBucketPolicyPeerArgs{Bucket: "bucket", PChBytes: pChBytes} - err = client.Call("S3.SetBucketPolicyPeer", &BPPArgs, &GenericReply{}) + err = client.Call("S3.SetBucketPolicyPeer", &BPPArgs, &AuthRPCReply{}) if err != nil { t.Fatal(err) } // Check event send event call works. evArgs := EventArgs{Event: nil, Arn: "localhost:9000"} - err = client.Call("S3.Event", &evArgs, &GenericReply{}) + err = client.Call("S3.Event", &evArgs, &AuthRPCReply{}) if err != nil { t.Fatal(err) } diff --git a/cmd/storage-rpc-client.go b/cmd/storage-rpc-client.go index ebe639dd1..846b17096 100644 --- a/cmd/storage-rpc-client.go +++ b/cmd/storage-rpc-client.go @@ -23,18 +23,14 @@ import ( "net/rpc" "net/url" "path" - "sync" "sync/atomic" - "time" "github.com/minio/minio/pkg/disk" ) type networkStorage struct { networkIOErrCount int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG - netAddr string - netPath string - rpcClient *storageRPCClient + rpcClient *AuthRPCClient } const ( @@ -99,104 +95,6 @@ func toStorageErr(err error) error { return err } -// storageRPCClient is a wrapper type for RPCClient which provides JWT based authentication across reconnects. -type storageRPCClient struct { - sync.Mutex - cfg storageConfig - rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client - serverToken string // Disk rpc JWT based token. - serverVersion string // Server version exchanged by the RPC. -} - -// Storage config represents authentication credentials and Login -// method name to be used for fetching JWT tokens from the storage -// server. -type storageConfig struct { - addr string // Network address path of storage RPC server. - path string // Network storage path for HTTP dial. - secureConn bool // Indicates if this storage RPC is on a secured connection. - creds credential -} - -// newStorageClient - returns a jwt based authenticated (go) storage rpc client. -func newStorageClient(storageCfg storageConfig) *storageRPCClient { - return &storageRPCClient{ - // Save the config. - cfg: storageCfg, - rpc: newRPCClient(storageCfg.addr, storageCfg.path, storageCfg.secureConn), - } -} - -// Close - closes underlying rpc connection. -func (storageClient *storageRPCClient) Close() error { - storageClient.Lock() - // reset token on closing a connection - storageClient.serverToken = "" - storageClient.Unlock() - return storageClient.rpc.Close() -} - -// Login - a jwt based authentication is performed with rpc server. -func (storageClient *storageRPCClient) Login() (err error) { - storageClient.Lock() - // As soon as the function returns unlock, - defer storageClient.Unlock() - - // Return if token is already set. - if storageClient.serverToken != "" { - return nil - } - - reply := RPCLoginReply{} - if err = storageClient.rpc.Call("Storage.LoginHandler", RPCLoginArgs{ - Username: storageClient.cfg.creds.AccessKey, - Password: storageClient.cfg.creds.SecretKey, - }, &reply); err != nil { - return err - } - - // Validate if version do indeed match. - if reply.ServerVersion != Version { - return errServerVersionMismatch - } - - // Validate if server timestamp is skewed. - curTime := time.Now().UTC() - if curTime.Sub(reply.Timestamp) > globalMaxSkewTime { - return errServerTimeMismatch - } - - // Set token, time stamp as received from a successful login call. - storageClient.serverToken = reply.Token - storageClient.serverVersion = reply.ServerVersion - return nil -} - -// Call - If rpc connection isn't established yet since previous disconnect, -// connection is established, a jwt authenticated login is performed and then -// the call is performed. -func (storageClient *storageRPCClient) Call(serviceMethod string, args interface { - SetToken(token string) - SetTimestamp(tstamp time.Time) -}, reply interface{}) (err error) { - // On successful login, attempt the call. - if err = storageClient.Login(); err != nil { - return err - } - // Set token and timestamp before the rpc call. - args.SetToken(storageClient.serverToken) - args.SetTimestamp(time.Now().UTC()) - - // Call the underlying rpc. - err = storageClient.rpc.Call(serviceMethod, args, reply) - - // Invalidate token, and mark it for re-login. - if err == rpc.ErrShutdown { - storageClient.Close() - } - return err -} - // Initialize new storage rpc client. func newStorageRPC(ep *url.URL) (StorageAPI, error) { if ep == nil { @@ -207,38 +105,35 @@ func newStorageRPC(ep *url.URL) (StorageAPI, error) { rpcPath := path.Join(storageRPCPath, getPath(ep)) rpcAddr := ep.Host - // Initialize rpc client with network address and rpc path. - accessKey := serverConfig.GetCredential().AccessKey - secretKey := serverConfig.GetCredential().SecretKey + serverCred := serverConfig.GetCredential() + accessKey := serverCred.AccessKey + secretKey := serverCred.SecretKey if ep.User != nil { accessKey = ep.User.Username() - if key, set := ep.User.Password(); set { - secretKey = key + if password, ok := ep.User.Password(); ok { + secretKey = password } } - // Initialize network storage. - ndisk := &networkStorage{ - netAddr: ep.Host, - netPath: getPath(ep), - rpcClient: newStorageClient(storageConfig{ - addr: rpcAddr, - path: rpcPath, - creds: credential{ - AccessKey: accessKey, - SecretKey: secretKey, - }, - secureConn: isSSL(), + storageAPI := &networkStorage{ + rpcClient: newAuthRPCClient(authConfig{ + accessKey: accessKey, + secretKey: secretKey, + serverAddr: rpcAddr, + serviceEndpoint: rpcPath, + secureConn: isSSL(), + serviceName: "Storage", + disableReconnect: true, }), } // Returns successfully here. - return ndisk, nil + return storageAPI, nil } // Stringer interface compatible representation of network device. func (n *networkStorage) String() string { - return n.netAddr + ":" + n.netPath + return n.rpcClient.ServerAddr() + ":" + n.rpcClient.ServiceEndpoint() } // Network IO error count is kept at 256 with some simple @@ -250,10 +145,9 @@ func (n *networkStorage) String() string { // incoming i/o. const maxAllowedNetworkIOError = 256 // maximum allowed network IOError. -// Initializes the remote RPC connection by attempting a login attempt. -func (n *networkStorage) Init() (err error) { - // Attempt a login to reconnect. - err = n.rpcClient.Login() +// Init - attempts a login to reconnect. +func (n *networkStorage) Init() error { + err := n.rpcClient.Login() return toStorageErr(err) } @@ -278,7 +172,7 @@ func (n *networkStorage) DiskInfo() (info disk.Info, err error) { return disk.Info{}, errFaultyRemoteDisk } - args := GenericArgs{} + args := AuthRPCArgs{} if err = n.rpcClient.Call("Storage.DiskInfoHandler", &args, &info); err != nil { return disk.Info{}, toStorageErr(err) } @@ -299,7 +193,7 @@ func (n *networkStorage) MakeVol(volume string) (err error) { return errFaultyRemoteDisk } - reply := GenericReply{} + reply := AuthRPCReply{} args := GenericVolArgs{Vol: volume} if err := n.rpcClient.Call("Storage.MakeVolHandler", &args, &reply); err != nil { return toStorageErr(err) @@ -322,7 +216,7 @@ func (n *networkStorage) ListVols() (vols []VolInfo, err error) { } ListVols := ListVolsReply{} - err = n.rpcClient.Call("Storage.ListVolsHandler", &GenericArgs{}, &ListVols) + err = n.rpcClient.Call("Storage.ListVolsHandler", &AuthRPCArgs{}, &ListVols) if err != nil { return nil, toStorageErr(err) } @@ -364,7 +258,7 @@ func (n *networkStorage) DeleteVol(volume string) (err error) { return errFaultyRemoteDisk } - reply := GenericReply{} + reply := AuthRPCReply{} args := GenericVolArgs{Vol: volume} if err := n.rpcClient.Call("Storage.DeleteVolHandler", &args, &reply); err != nil { return toStorageErr(err) @@ -386,7 +280,7 @@ func (n *networkStorage) PrepareFile(volume, path string, length int64) (err err return errFaultyRemoteDisk } - reply := GenericReply{} + reply := AuthRPCReply{} if err = n.rpcClient.Call("Storage.PrepareFileHandler", &PrepareFileArgs{ Vol: volume, Path: path, @@ -411,7 +305,7 @@ func (n *networkStorage) AppendFile(volume, path string, buffer []byte) (err err return errFaultyRemoteDisk } - reply := GenericReply{} + reply := AuthRPCReply{} if err = n.rpcClient.Call("Storage.AppendFileHandler", &AppendFileArgs{ Vol: volume, Path: path, @@ -545,7 +439,7 @@ func (n *networkStorage) DeleteFile(volume, path string) (err error) { return errFaultyRemoteDisk } - reply := GenericReply{} + reply := AuthRPCReply{} if err = n.rpcClient.Call("Storage.DeleteFileHandler", &DeleteFileArgs{ Vol: volume, Path: path, @@ -569,7 +463,7 @@ func (n *networkStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath strin return errFaultyRemoteDisk } - reply := GenericReply{} + reply := AuthRPCReply{} if err = n.rpcClient.Call("Storage.RenameFileHandler", &RenameFileArgs{ SrcVol: srcVolume, SrcPath: srcPath, diff --git a/cmd/storage-rpc-server-datatypes.go b/cmd/storage-rpc-server-datatypes.go index 8f474feff..e3dd9bf9a 100644 --- a/cmd/storage-rpc-server-datatypes.go +++ b/cmd/storage-rpc-server-datatypes.go @@ -19,7 +19,7 @@ package cmd // GenericVolArgs - generic volume args. type GenericVolArgs struct { // Authentication token generated by Login. - GenericArgs + AuthRPCArgs // Name of the volume. Vol string @@ -34,7 +34,7 @@ type ListVolsReply struct { // ReadAllArgs represents read all RPC arguments. type ReadAllArgs struct { // Authentication token generated by Login. - GenericArgs + AuthRPCArgs // Name of the volume. Vol string @@ -46,7 +46,7 @@ type ReadAllArgs struct { // ReadFileArgs represents read file RPC arguments. type ReadFileArgs struct { // Authentication token generated by Login. - GenericArgs + AuthRPCArgs // Name of the volume. Vol string @@ -64,7 +64,7 @@ type ReadFileArgs struct { // PrepareFileArgs represents append file RPC arguments. type PrepareFileArgs struct { // Authentication token generated by Login. - GenericArgs + AuthRPCArgs // Name of the volume. Vol string @@ -79,7 +79,7 @@ type PrepareFileArgs struct { // AppendFileArgs represents append file RPC arguments. type AppendFileArgs struct { // Authentication token generated by Login. - GenericArgs + AuthRPCArgs // Name of the volume. Vol string @@ -94,7 +94,7 @@ type AppendFileArgs struct { // StatFileArgs represents stat file RPC arguments. type StatFileArgs struct { // Authentication token generated by Login. - GenericArgs + AuthRPCArgs // Name of the volume. Vol string @@ -106,7 +106,7 @@ type StatFileArgs struct { // DeleteFileArgs represents delete file RPC arguments. type DeleteFileArgs struct { // Authentication token generated by Login. - GenericArgs + AuthRPCArgs // Name of the volume. Vol string @@ -118,7 +118,7 @@ type DeleteFileArgs struct { // ListDirArgs represents list contents RPC arguments. type ListDirArgs struct { // Authentication token generated by Login. - GenericArgs + AuthRPCArgs // Name of the volume. Vol string @@ -130,7 +130,7 @@ type ListDirArgs struct { // RenameFileArgs represents rename file RPC arguments. type RenameFileArgs struct { // Authentication token generated by Login. - GenericArgs + AuthRPCArgs // Name of source volume. SrcVol string diff --git a/cmd/storage-rpc-server.go b/cmd/storage-rpc-server.go index 0210a4c8b..6ad287868 100644 --- a/cmd/storage-rpc-server.go +++ b/cmd/storage-rpc-server.go @@ -29,7 +29,7 @@ import ( // Storage server implements rpc primitives to facilitate exporting a // disk over a network. type storageServer struct { - loginServer + AuthRPCServer storage StorageAPI path string timestamp time.Time @@ -38,10 +38,11 @@ type storageServer struct { /// Storage operations handlers. // DiskInfoHandler - disk info handler is rpc wrapper for DiskInfo operation. -func (s *storageServer) DiskInfoHandler(args *GenericArgs, reply *disk.Info) error { - if !isAuthTokenValid(args.Token) { - return errInvalidToken +func (s *storageServer) DiskInfoHandler(args *AuthRPCArgs, reply *disk.Info) error { + if err := args.IsAuthenticated(); err != nil { + return err } + info, err := s.storage.DiskInfo() *reply = info return err @@ -50,18 +51,20 @@ func (s *storageServer) DiskInfoHandler(args *GenericArgs, reply *disk.Info) err /// Volume operations handlers. // MakeVolHandler - make vol handler is rpc wrapper for MakeVol operation. -func (s *storageServer) MakeVolHandler(args *GenericVolArgs, reply *GenericReply) error { - if !isAuthTokenValid(args.Token) { - return errInvalidToken +func (s *storageServer) MakeVolHandler(args *GenericVolArgs, reply *AuthRPCReply) error { + if err := args.IsAuthenticated(); err != nil { + return err } + return s.storage.MakeVol(args.Vol) } // ListVolsHandler - list vols handler is rpc wrapper for ListVols operation. -func (s *storageServer) ListVolsHandler(args *GenericArgs, reply *ListVolsReply) error { - if !isAuthTokenValid(args.Token) { - return errInvalidToken +func (s *storageServer) ListVolsHandler(args *AuthRPCArgs, reply *ListVolsReply) error { + if err := args.IsAuthenticated(); err != nil { + return err } + vols, err := s.storage.ListVols() if err != nil { return err @@ -72,9 +75,10 @@ func (s *storageServer) ListVolsHandler(args *GenericArgs, reply *ListVolsReply) // StatVolHandler - stat vol handler is a rpc wrapper for StatVol operation. func (s *storageServer) StatVolHandler(args *GenericVolArgs, reply *VolInfo) error { - if !isAuthTokenValid(args.Token) { - return errInvalidToken + if err := args.IsAuthenticated(); err != nil { + return err } + volInfo, err := s.storage.StatVol(args.Vol) if err != nil { return err @@ -85,10 +89,11 @@ func (s *storageServer) StatVolHandler(args *GenericVolArgs, reply *VolInfo) err // DeleteVolHandler - delete vol handler is a rpc wrapper for // DeleteVol operation. -func (s *storageServer) DeleteVolHandler(args *GenericVolArgs, reply *GenericReply) error { - if !isAuthTokenValid(args.Token) { - return errInvalidToken +func (s *storageServer) DeleteVolHandler(args *GenericVolArgs, reply *AuthRPCReply) error { + if err := args.IsAuthenticated(); err != nil { + return err } + return s.storage.DeleteVol(args.Vol) } @@ -96,9 +101,10 @@ func (s *storageServer) DeleteVolHandler(args *GenericVolArgs, reply *GenericRep // StatFileHandler - stat file handler is rpc wrapper to stat file. func (s *storageServer) StatFileHandler(args *StatFileArgs, reply *FileInfo) error { - if !isAuthTokenValid(args.Token) { - return errInvalidToken + if err := args.IsAuthenticated(); err != nil { + return err } + fileInfo, err := s.storage.StatFile(args.Vol, args.Path) if err != nil { return err @@ -109,9 +115,10 @@ func (s *storageServer) StatFileHandler(args *StatFileArgs, reply *FileInfo) err // ListDirHandler - list directory handler is rpc wrapper to list dir. func (s *storageServer) ListDirHandler(args *ListDirArgs, reply *[]string) error { - if !isAuthTokenValid(args.Token) { - return errInvalidToken + if err := args.IsAuthenticated(); err != nil { + return err } + entries, err := s.storage.ListDir(args.Vol, args.Path) if err != nil { return err @@ -122,9 +129,10 @@ func (s *storageServer) ListDirHandler(args *ListDirArgs, reply *[]string) error // ReadAllHandler - read all handler is rpc wrapper to read all storage API. func (s *storageServer) ReadAllHandler(args *ReadFileArgs, reply *[]byte) error { - if !isAuthTokenValid(args.Token) { - return errInvalidToken + if err := args.IsAuthenticated(); err != nil { + return err } + buf, err := s.storage.ReadAll(args.Vol, args.Path) if err != nil { return err @@ -135,8 +143,8 @@ func (s *storageServer) ReadAllHandler(args *ReadFileArgs, reply *[]byte) error // ReadFileHandler - read file handler is rpc wrapper to read file. func (s *storageServer) ReadFileHandler(args *ReadFileArgs, reply *[]byte) (err error) { - if !isAuthTokenValid(args.Token) { - return errInvalidToken + if err = args.IsAuthenticated(); err != nil { + return err } var n int64 @@ -153,34 +161,38 @@ func (s *storageServer) ReadFileHandler(args *ReadFileArgs, reply *[]byte) (err } // PrepareFileHandler - prepare file handler is rpc wrapper to prepare file. -func (s *storageServer) PrepareFileHandler(args *PrepareFileArgs, reply *GenericReply) error { - if !isAuthTokenValid(args.Token) { - return errInvalidToken +func (s *storageServer) PrepareFileHandler(args *PrepareFileArgs, reply *AuthRPCReply) error { + if err := args.IsAuthenticated(); err != nil { + return err } + return s.storage.PrepareFile(args.Vol, args.Path, args.Size) } // AppendFileHandler - append file handler is rpc wrapper to append file. -func (s *storageServer) AppendFileHandler(args *AppendFileArgs, reply *GenericReply) error { - if !isAuthTokenValid(args.Token) { - return errInvalidToken +func (s *storageServer) AppendFileHandler(args *AppendFileArgs, reply *AuthRPCReply) error { + if err := args.IsAuthenticated(); err != nil { + return err } + return s.storage.AppendFile(args.Vol, args.Path, args.Buffer) } // DeleteFileHandler - delete file handler is rpc wrapper to delete file. -func (s *storageServer) DeleteFileHandler(args *DeleteFileArgs, reply *GenericReply) error { - if !isAuthTokenValid(args.Token) { - return errInvalidToken +func (s *storageServer) DeleteFileHandler(args *DeleteFileArgs, reply *AuthRPCReply) error { + if err := args.IsAuthenticated(); err != nil { + return err } + return s.storage.DeleteFile(args.Vol, args.Path) } // RenameFileHandler - rename file handler is rpc wrapper to rename file. -func (s *storageServer) RenameFileHandler(args *RenameFileArgs, reply *GenericReply) error { - if !isAuthTokenValid(args.Token) { - return errInvalidToken +func (s *storageServer) RenameFileHandler(args *RenameFileArgs, reply *AuthRPCReply) error { + if err := args.IsAuthenticated(); err != nil { + return err } + return s.storage.RenameFile(args.SrcVol, args.SrcPath, args.DstVol, args.DstPath) } diff --git a/cmd/storage-rpc-server_test.go b/cmd/storage-rpc-server_test.go index a64ebf07c..6ed51d6bc 100644 --- a/cmd/storage-rpc-server_test.go +++ b/cmd/storage-rpc-server_test.go @@ -87,108 +87,113 @@ func TestStorageRPCInvalidToken(t *testing.T) { defer removeAll(st.configDir) storageRPC := st.stServer - timestamp := time.Now().UTC() - ga := GenericArgs{ - Token: st.token, - Timestamp: timestamp, - } - // Construct an invalid token. - badga := ga - badga.Token = "invalidToken" // Following test cases are meant to exercise the invalid // token code path of the storage RPC methods. - var err error - gva := GenericVolArgs{ - GenericArgs: badga, + badAuthRPCArgs := AuthRPCArgs{AuthToken: "invalidToken"} + badGenericVolArgs := GenericVolArgs{ + AuthRPCArgs: badAuthRPCArgs, Vol: "myvol", } // 1. DiskInfoHandler diskInfoReply := &disk.Info{} - err = storageRPC.DiskInfoHandler(&badga, diskInfoReply) + badAuthRPCArgs.RequestTime = time.Now().UTC() + err = storageRPC.DiskInfoHandler(&badAuthRPCArgs, diskInfoReply) errorIfInvalidToken(t, err) // 2. MakeVolHandler - makeVolArgs := &gva - makeVolReply := &GenericReply{} + makeVolArgs := &badGenericVolArgs + makeVolArgs.AuthRPCArgs.RequestTime = time.Now().UTC() + makeVolReply := &AuthRPCReply{} err = storageRPC.MakeVolHandler(makeVolArgs, makeVolReply) errorIfInvalidToken(t, err) // 3. ListVolsHandler listVolReply := &ListVolsReply{} - err = storageRPC.ListVolsHandler(&badga, listVolReply) + badAuthRPCArgs.RequestTime = time.Now().UTC() + err = storageRPC.ListVolsHandler(&badAuthRPCArgs, listVolReply) errorIfInvalidToken(t, err) // 4. StatVolHandler statVolReply := &VolInfo{} - statVolArgs := &gva + statVolArgs := &badGenericVolArgs + statVolArgs.AuthRPCArgs.RequestTime = time.Now().UTC() err = storageRPC.StatVolHandler(statVolArgs, statVolReply) errorIfInvalidToken(t, err) // 5. DeleteVolHandler - deleteVolArgs := &gva - deleteVolReply := &GenericReply{} + deleteVolArgs := &badGenericVolArgs + deleteVolArgs.AuthRPCArgs.RequestTime = time.Now().UTC() + deleteVolReply := &AuthRPCReply{} err = storageRPC.DeleteVolHandler(deleteVolArgs, deleteVolReply) errorIfInvalidToken(t, err) // 6. StatFileHandler statFileArgs := &StatFileArgs{ - GenericArgs: badga, + AuthRPCArgs: badAuthRPCArgs, } + statFileArgs.AuthRPCArgs.RequestTime = time.Now().UTC() statReply := &FileInfo{} err = storageRPC.StatFileHandler(statFileArgs, statReply) errorIfInvalidToken(t, err) // 7. ListDirHandler listDirArgs := &ListDirArgs{ - GenericArgs: badga, + AuthRPCArgs: badAuthRPCArgs, } + listDirArgs.AuthRPCArgs.RequestTime = time.Now().UTC() listDirReply := &[]string{} err = storageRPC.ListDirHandler(listDirArgs, listDirReply) errorIfInvalidToken(t, err) // 8. ReadAllHandler readFileArgs := &ReadFileArgs{ - GenericArgs: badga, + AuthRPCArgs: badAuthRPCArgs, } + readFileArgs.AuthRPCArgs.RequestTime = time.Now().UTC() readFileReply := &[]byte{} err = storageRPC.ReadAllHandler(readFileArgs, readFileReply) errorIfInvalidToken(t, err) // 9. ReadFileHandler + readFileArgs.AuthRPCArgs.RequestTime = time.Now().UTC() err = storageRPC.ReadFileHandler(readFileArgs, readFileReply) errorIfInvalidToken(t, err) // 10. PrepareFileHandler prepFileArgs := &PrepareFileArgs{ - GenericArgs: badga, + AuthRPCArgs: badAuthRPCArgs, } - prepFileReply := &GenericReply{} + prepFileArgs.AuthRPCArgs.RequestTime = time.Now().UTC() + prepFileReply := &AuthRPCReply{} err = storageRPC.PrepareFileHandler(prepFileArgs, prepFileReply) errorIfInvalidToken(t, err) // 11. AppendFileHandler appendArgs := &AppendFileArgs{ - GenericArgs: badga, + AuthRPCArgs: badAuthRPCArgs, } - appendReply := &GenericReply{} + appendArgs.AuthRPCArgs.RequestTime = time.Now().UTC() + appendReply := &AuthRPCReply{} err = storageRPC.AppendFileHandler(appendArgs, appendReply) errorIfInvalidToken(t, err) // 12. DeleteFileHandler delFileArgs := &DeleteFileArgs{ - GenericArgs: badga, + AuthRPCArgs: badAuthRPCArgs, } - delFileRely := &GenericReply{} + delFileArgs.AuthRPCArgs.RequestTime = time.Now().UTC() + delFileRely := &AuthRPCReply{} err = storageRPC.DeleteFileHandler(delFileArgs, delFileRely) errorIfInvalidToken(t, err) // 13. RenameFileHandler renameArgs := &RenameFileArgs{ - GenericArgs: badga, + AuthRPCArgs: badAuthRPCArgs, } - renameReply := &GenericReply{} + renameArgs.AuthRPCArgs.RequestTime = time.Now().UTC() + renameReply := &AuthRPCReply{} err = storageRPC.RenameFileHandler(renameArgs, renameReply) errorIfInvalidToken(t, err) } diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index ba783bcbe..97fd99218 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -56,9 +56,11 @@ import ( // Tests should initNSLock only once. func init() { + // Set as non-distributed. + globalIsDistXL = false + // Initialize name space lock. - isDist := false - initNSLock(isDist) + initNSLock(globalIsDistXL) // Disable printing console messages during tests. color.Output = ioutil.Discard @@ -426,9 +428,6 @@ func StartTestPeersRPCServer(t TestErrHandler, instanceType string) TestServer { // Run TestServer. testRPCServer.Server = httptest.NewServer(mux) - // Set as non-distributed. - globalIsDistXL = false - // initialize remainder of serverCmdConfig testRPCServer.SrvCmdCfg = srvCfg diff --git a/cmd/utils_test.go b/cmd/utils_test.go index 80efbf61b..89d0aae67 100644 --- a/cmd/utils_test.go +++ b/cmd/utils_test.go @@ -232,6 +232,12 @@ func TestLocalAddress(t *testing.T) { if runtime.GOOS == "windows" { return } + + currentIsDistXL := globalIsDistXL + defer func() { + globalIsDistXL = currentIsDistXL + }() + // need to set this to avoid stale values from other tests. globalMinioPort = "9000" globalMinioHost = "" diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 881abf7fc..5d93a9d81 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -94,7 +94,7 @@ type StorageInfoRep struct { } // StorageInfo - web call to gather storage usage statistics. -func (web *webAPIHandlers) StorageInfo(r *http.Request, args *GenericArgs, reply *StorageInfoRep) error { +func (web *webAPIHandlers) StorageInfo(r *http.Request, args *AuthRPCArgs, reply *StorageInfoRep) error { objectAPI := web.ObjectAPI() if objectAPI == nil { return toJSONError(errServerNotInitialized) diff --git a/cmd/web-handlers_test.go b/cmd/web-handlers_test.go index 42cad932a..d187f3d64 100644 --- a/cmd/web-handlers_test.go +++ b/cmd/web-handlers_test.go @@ -100,7 +100,7 @@ func getWebRPCToken(apiRouter http.Handler, accessKey, secretKey string) (token rec := httptest.NewRecorder() request := LoginArgs{Username: accessKey, Password: secretKey} reply := &LoginRep{} - req, err := newTestWebRPCRequest("Web.Login", "", request) + req, err := newTestWebRPCRequest("Web"+loginMethodName, "", request) if err != nil { return "", err } @@ -193,7 +193,7 @@ func testStorageInfoWebHandler(obj ObjectLayer, instanceType string, t TestErrHa rec := httptest.NewRecorder() - storageInfoRequest := GenericArgs{} + storageInfoRequest := AuthRPCArgs{} storageInfoReply := &StorageInfoRep{} req, err := newTestWebRPCRequest("Web.StorageInfo", authorization, storageInfoRequest) if err != nil { @@ -239,7 +239,7 @@ func testServerInfoWebHandler(obj ObjectLayer, instanceType string, t TestErrHan rec := httptest.NewRecorder() - serverInfoRequest := GenericArgs{} + serverInfoRequest := AuthRPCArgs{} serverInfoReply := &ServerInfoRep{} req, err := newTestWebRPCRequest("Web.ServerInfo", authorization, serverInfoRequest) if err != nil { @@ -1204,7 +1204,7 @@ func TestWebCheckAuthorization(t *testing.T) { "PresignedGet", } for _, rpcCall := range webRPCs { - args := &GenericArgs{} + args := &AuthRPCArgs{} reply := &WebGenericRep{} req, nerr := newTestWebRPCRequest("Web."+rpcCall, "Bearer fooauthorization", args) if nerr != nil { @@ -1288,7 +1288,7 @@ func TestWebObjectLayerNotReady(t *testing.T) { webRPCs := []string{"StorageInfo", "MakeBucket", "ListBuckets", "ListObjects", "RemoveObject", "GetBucketPolicy", "SetBucketPolicy", "ListAllBucketPolicies"} for _, rpcCall := range webRPCs { - args := &GenericArgs{} + args := &AuthRPCArgs{} reply := &WebGenericRep{} req, nerr := newTestWebRPCRequest("Web."+rpcCall, authorization, args) if nerr != nil { @@ -1392,7 +1392,7 @@ func TestWebObjectLayerFaultyDisks(t *testing.T) { "GetBucketPolicy", "SetBucketPolicy"} for _, rpcCall := range webRPCs { - args := &GenericArgs{} + args := &AuthRPCArgs{} reply := &WebGenericRep{} req, nerr := newTestWebRPCRequest("Web."+rpcCall, authorization, args) if nerr != nil { @@ -1409,7 +1409,7 @@ func TestWebObjectLayerFaultyDisks(t *testing.T) { } // Test Web.StorageInfo - storageInfoRequest := GenericArgs{} + storageInfoRequest := AuthRPCArgs{} storageInfoReply := &StorageInfoRep{} req, err := newTestWebRPCRequest("Web.StorageInfo", authorization, storageInfoRequest) if err != nil { diff --git a/vendor/github.com/minio/dsync/README.md b/vendor/github.com/minio/dsync/README.md index 206e2185c..a95f5debe 100644 --- a/vendor/github.com/minio/dsync/README.md +++ b/vendor/github.com/minio/dsync/README.md @@ -193,7 +193,7 @@ The basic steps in the lock process are as follows: ### Unlock process The unlock process is really simple: -- boardcast unlock message to all nodes that granted lock +- broadcast unlock message to all nodes that granted lock - if a destination is not available, retry with gradually longer back-off window to still deliver - ignore the 'result' (cover for cases where destination node has gone down and came back up) diff --git a/vendor/github.com/minio/dsync/drwmutex.go b/vendor/github.com/minio/dsync/drwmutex.go index 450c3043b..b15bd4fb8 100644 --- a/vendor/github.com/minio/dsync/drwmutex.go +++ b/vendor/github.com/minio/dsync/drwmutex.go @@ -19,7 +19,7 @@ package dsync import ( cryptorand "crypto/rand" "fmt" - "log" + golog "log" "math" "math/rand" "net" @@ -36,6 +36,12 @@ func init() { dsyncLog = os.Getenv("DSYNC_LOG") == "1" } +func log(msg ...interface{}) { + if dsyncLog { + golog.Println(msg...) + } +} + // DRWMutexAcquireTimeout - tolerance limit to wait for lock acquisition before. const DRWMutexAcquireTimeout = 25 * time.Millisecond // 25ms. @@ -60,23 +66,6 @@ func isLocked(uid string) bool { return len(uid) > 0 } -type LockArgs struct { - Token string - Timestamp time.Time - Name string - Node string - RPCPath string - UID string -} - -func (l *LockArgs) SetToken(token string) { - l.Token = token -} - -func (l *LockArgs) SetTimestamp(tstamp time.Time) { - l.Timestamp = tstamp -} - func NewDRWMutex(name string) *DRWMutex { return &DRWMutex{ Name: name, @@ -152,7 +141,7 @@ func (dm *DRWMutex) lockBlocking(isReadLock bool) { // lock tries to acquire the distributed lock, returning true or false // -func lock(clnts []RPC, locks *[]string, lockName string, isReadLock bool) bool { +func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) bool { // Create buffered channel of size equal to total number of nodes. ch := make(chan Granted, dnodeCount) @@ -160,25 +149,29 @@ func lock(clnts []RPC, locks *[]string, lockName string, isReadLock bool) bool { for index, c := range clnts { // broadcast lock request to all nodes - go func(index int, isReadLock bool, c RPC) { + go func(index int, isReadLock bool, c NetLocker) { // All client methods issuing RPCs are thread-safe and goroutine-safe, // i.e. it is safe to call them from multiple concurrently running go routines. - var locked bool bytesUid := [16]byte{} cryptorand.Read(bytesUid[:]) uid := fmt.Sprintf("%X", bytesUid[:]) - args := LockArgs{Name: lockName, Node: clnts[ownNode].Node(), RPCPath: clnts[ownNode].RPCPath(), UID: uid} + + args := LockArgs{ + UID: uid, + Resource: lockName, + ServerAddr: clnts[ownNode].ServerAddr(), + ServiceEndpoint: clnts[ownNode].ServiceEndpoint(), + } + + var locked bool + var err error if isReadLock { - if err := c.Call("Dsync.RLock", &args, &locked); err != nil { - if dsyncLog { - log.Println("Unable to call Dsync.RLock", err) - } + if locked, err = c.RLock(args); err != nil { + log("Unable to call RLock", err) } } else { - if err := c.Call("Dsync.Lock", &args, &locked); err != nil { - if dsyncLog { - log.Println("Unable to call Dsync.Lock", err) - } + if locked, err = c.Lock(args); err != nil { + log("Unable to call Lock", err) } } @@ -284,7 +277,7 @@ func quorumMet(locks *[]string, isReadLock bool) bool { } // releaseAll releases all locks that are marked as locked -func releaseAll(clnts []RPC, locks *[]string, lockName string, isReadLock bool) { +func releaseAll(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) { for lock := 0; lock < dnodeCount; lock++ { if isLocked((*locks)[lock]) { sendRelease(clnts[lock], lockName, (*locks)[lock], isReadLock) @@ -385,7 +378,7 @@ func (dm *DRWMutex) ForceUnlock() { } // sendRelease sends a release message to a node that previously granted a lock -func sendRelease(c RPC, name, uid string, isReadLock bool) { +func sendRelease(c NetLocker, name, uid string, isReadLock bool) { backOffArray := []time.Duration{ 30 * time.Second, // 30secs. @@ -396,55 +389,47 @@ func sendRelease(c RPC, name, uid string, isReadLock bool) { 1 * time.Hour, // 1hr. } - go func(c RPC, name string) { + go func(c NetLocker, name string) { for _, backOff := range backOffArray { // All client methods issuing RPCs are thread-safe and goroutine-safe, // i.e. it is safe to call them from multiple concurrently running goroutines. - var unlocked bool - args := LockArgs{Name: name, UID: uid} // Just send name & uid (and leave out node and rpcPath; unimportant for unlocks) + args := LockArgs{ + UID: uid, + Resource: name, + ServerAddr: clnts[ownNode].ServerAddr(), + ServiceEndpoint: clnts[ownNode].ServiceEndpoint(), + } + + var err error if len(uid) == 0 { - if err := c.Call("Dsync.ForceUnlock", &args, &unlocked); err == nil { - // ForceUnlock delivered, exit out - return - } else if err != nil { - if dsyncLog { - log.Println("Unable to call Dsync.ForceUnlock", err) - } - if nErr, ok := err.(net.Error); ok && nErr.Timeout() { - // ForceUnlock possibly failed with server timestamp mismatch, server may have restarted. - return - } + if _, err = c.ForceUnlock(args); err != nil { + log("Unable to call ForceUnlock", err) } } else if isReadLock { - if err := c.Call("Dsync.RUnlock", &args, &unlocked); err == nil { - // RUnlock delivered, exit out - return - } else if err != nil { - if dsyncLog { - log.Println("Unable to call Dsync.RUnlock", err) - } - if nErr, ok := err.(net.Error); ok && nErr.Timeout() { - // RUnlock possibly failed with server timestamp mismatch, server may have restarted. - return - } + if _, err = c.RUnlock(args); err != nil { + log("Unable to call RUnlock", err) } } else { - if err := c.Call("Dsync.Unlock", &args, &unlocked); err == nil { - // Unlock delivered, exit out - return - } else if err != nil { - if dsyncLog { - log.Println("Unable to call Dsync.Unlock", err) - } - if nErr, ok := err.(net.Error); ok && nErr.Timeout() { - // Unlock possibly failed with server timestamp mismatch, server may have restarted. - return - } + if _, err = c.Unlock(args); err != nil { + log("Unable to call Unlock", err) } } + if err != nil { + // Ignore if err is net.Error and it is occurred due to timeout. + // The cause could have been server timestamp mismatch or server may have restarted. + // FIXME: This is minio specific behaviour and we would need a way to make it generically. + if nErr, ok := err.(net.Error); ok && nErr.Timeout() { + err = nil + } + } + + if err == nil { + return + } + // Wait.. time.Sleep(backOff) } diff --git a/vendor/github.com/minio/dsync/dsync.go b/vendor/github.com/minio/dsync/dsync.go index 9375e0444..ba027d54f 100644 --- a/vendor/github.com/minio/dsync/dsync.go +++ b/vendor/github.com/minio/dsync/dsync.go @@ -18,16 +18,11 @@ package dsync import "errors" -const RpcPath = "/dsync" -const DebugPath = "/debug" - -const DefaultPath = "/rpc/dsync" - // Number of nodes participating in the distributed locking. var dnodeCount int // List of rpc client objects, one per lock server. -var clnts []RPC +var clnts []NetLocker // Index into rpc client array for server running on localhost var ownNode int @@ -38,20 +33,21 @@ var dquorum int // Simple quorum for read operations, set to dNodeCount/2 var dquorumReads int -// SetNodesWithPath - initializes package-level global state variables such as clnts. -// N B - This function should be called only once inside any program that uses -// dsync. -func SetNodesWithClients(rpcClnts []RPC, rpcOwnNode int) (err error) { +// Init - initializes package-level global state variables such as clnts. +// N B - This function should be called only once inside any program +// that uses dsync. +func Init(rpcClnts []NetLocker, rpcOwnNode int) (err error) { // Validate if number of nodes is within allowable range. if dnodeCount != 0 { return errors.New("Cannot reinitialize dsync package") - } else if len(rpcClnts) < 4 { - return errors.New("Dsync not designed for less than 4 nodes") + } + if len(rpcClnts) < 4 { + return errors.New("Dsync is not designed for less than 4 nodes") } else if len(rpcClnts) > 16 { - return errors.New("Dsync not designed for more than 16 nodes") - } else if len(rpcClnts)&1 == 1 { - return errors.New("Dsync not designed for an uneven number of nodes") + return errors.New("Dsync is not designed for more than 16 nodes") + } else if len(rpcClnts)%2 != 0 { + return errors.New("Dsync is not designed for an uneven number of nodes") } if rpcOwnNode > len(rpcClnts) { @@ -61,8 +57,8 @@ func SetNodesWithClients(rpcClnts []RPC, rpcOwnNode int) (err error) { dnodeCount = len(rpcClnts) dquorum = dnodeCount/2 + 1 dquorumReads = dnodeCount / 2 - // Initialize node name and rpc path for each RPCClient object. - clnts = make([]RPC, dnodeCount) + // Initialize node name and rpc path for each NetLocker object. + clnts = make([]NetLocker, dnodeCount) copy(clnts, rpcClnts) ownNode = rpcOwnNode diff --git a/vendor/github.com/minio/dsync/rpc-client-interface.go b/vendor/github.com/minio/dsync/rpc-client-interface.go index 035faddea..09d4c61ad 100644 --- a/vendor/github.com/minio/dsync/rpc-client-interface.go +++ b/vendor/github.com/minio/dsync/rpc-client-interface.go @@ -16,15 +16,51 @@ package dsync -import "time" +// LockArgs is minimal required values for any dsync compatible lock operation. +type LockArgs struct { + // Unique ID of lock/unlock request. + UID string -// RPC - is dsync compatible client interface. -type RPC interface { - Call(serviceMethod string, args interface { - SetToken(token string) - SetTimestamp(tstamp time.Time) - }, reply interface{}) error - Node() string - RPCPath() string - Close() error + // Resource contains a entity to be locked/unlocked. + Resource string + + // ServerAddr contains the address of the server who requested lock/unlock of the above resource. + ServerAddr string + + // ServiceEndpoint contains the network path of above server to do lock/unlock. + ServiceEndpoint string +} + +// NetLocker is dsync compatible locker interface. +type NetLocker interface { + // Do read lock for given LockArgs. It should return + // * a boolean to indicate success/failure of the operation + // * an error on failure of lock request operation. + RLock(args LockArgs) (bool, error) + + // Do write lock for given LockArgs. It should return + // * a boolean to indicate success/failure of the operation + // * an error on failure of lock request operation. + Lock(args LockArgs) (bool, error) + + // Do read unlock for given LockArgs. It should return + // * a boolean to indicate success/failure of the operation + // * an error on failure of unlock request operation. + RUnlock(args LockArgs) (bool, error) + + // Do write unlock for given LockArgs. It should return + // * a boolean to indicate success/failure of the operation + // * an error on failure of unlock request operation. + Unlock(args LockArgs) (bool, error) + + // Unlock (read/write) forcefully for given LockArgs. It should return + // * a boolean to indicate success/failure of the operation + // * an error on failure of unlock request operation. + ForceUnlock(args LockArgs) (bool, error) + + // Return this lock server address. + ServerAddr() string + + // Return this lock server service endpoint on which the server runs. + ServiceEndpoint() string } diff --git a/vendor/vendor.json b/vendor/vendor.json index e776a4e0e..93d3bcfc8 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -148,10 +148,10 @@ "revisionTime": "2015-11-18T20:00:48-08:00" }, { - "checksumSHA1": "ddMyebkzU3xB7K8dAhM1S+Mflmo=", + "checksumSHA1": "NBGyq2+iTtJvJ+ElG4FzHLe1WSY=", "path": "github.com/minio/dsync", - "revision": "dd0da3743e6668b03559c2905cc661bc0fceeae3", - "revisionTime": "2016-11-28T22:07:34Z" + "revision": "9cafd4d729eb71b31ef7851a8c8f6ceb855d0915", + "revisionTime": "2016-12-23T07:07:24Z" }, { "path": "github.com/minio/go-homedir",