From e1b0985b5be01e48caf597d70a9fc499043690f4 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 23 Aug 2016 19:19:24 -0700 Subject: [PATCH] rpc: Refactor authentication and login changes. (#2543) - Cache login requests. - Converge validating distributed setup. --- cmd/auth-rpc-client.go | 46 ++++++++++++++----------- cmd/lock-rpc-server.go | 26 ++++++++------ cmd/namespace-lock.go | 41 ++++++---------------- cmd/server-main.go | 23 ++++++++++--- cmd/storage-rpc-client.go | 53 +++++++++++++---------------- cmd/storage-rpc-server-datatypes.go | 12 ++----- 6 files changed, 98 insertions(+), 103 deletions(-) diff --git a/cmd/auth-rpc-client.go b/cmd/auth-rpc-client.go index 0cd29292d..f72ec4703 100644 --- a/cmd/auth-rpc-client.go +++ b/cmd/auth-rpc-client.go @@ -27,6 +27,7 @@ import ( type AuthRPCClient struct { rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client cred credential // AccessKey and SecretKey + isLoggedIn bool // Indicates if the auth client has been logged in and token is valid. token string // JWT based token tstamp time.Time // Timestamp as received on Login RPC. loginMethod string // RPC service name for authenticating using JWT @@ -37,6 +38,7 @@ func newAuthClient(node, rpcPath string, cred credential, loginMethod string) *A return &AuthRPCClient{ rpc: newClient(node, rpcPath), cred: cred, + isLoggedIn: false, // Not logged in yet. loginMethod: loginMethod, } } @@ -44,42 +46,46 @@ func newAuthClient(node, rpcPath string, cred credential, loginMethod string) *A // Close - closes underlying rpc connection. func (authClient *AuthRPCClient) Close() error { // reset token on closing a connection - authClient.token = "" + authClient.isLoggedIn = false return authClient.rpc.Close() } // Login - a jwt based authentication is performed with rpc server. -func (authClient *AuthRPCClient) Login() (string, time.Time, error) { +func (authClient *AuthRPCClient) Login() error { + // Return if already logged in. + if authClient.isLoggedIn { + return nil + } reply := RPCLoginReply{} if err := authClient.rpc.Call(authClient.loginMethod, RPCLoginArgs{ Username: authClient.cred.AccessKeyID, Password: authClient.cred.SecretAccessKey, }, &reply); err != nil { - return "", time.Time{}, err + return err } - return reply.Token, reply.Timestamp, nil + // Set token, time stamp as received from a successful login call. + authClient.token = reply.Token + authClient.tstamp = reply.Timestamp + authClient.isLoggedIn = true + return nil } // Call - If rpc connection isn't established yet since previous disconnect, // connection is established, a jwt authenticated login is performed and then // the call is performed. -func (authClient *AuthRPCClient) Call(serviceMethod string, args dsync.TokenSetter, reply interface{}) error { - if authClient.token == "" { - token, tstamp, err := authClient.Login() - if err != nil { - return err +func (authClient *AuthRPCClient) Call(serviceMethod string, args dsync.TokenSetter, reply interface{}) (err error) { + // On successful login, attempt the call. + if err = authClient.Login(); err == nil { + // Set token and timestamp before the rpc call. + args.SetToken(authClient.token) + args.SetTimestamp(authClient.tstamp) + + // .. + err = authClient.rpc.Call(serviceMethod, args, reply) + // Invalidate token to mark for re-login on subsequent reconnect. + if err != nil && err == rpc.ErrShutdown { + authClient.isLoggedIn = false } - // set token, time stamp as received from a successful login call. - authClient.token = token - authClient.tstamp = tstamp - // Update the RPC call's token with that received from the recent login call. - args.SetToken(token) - args.SetTimestamp(tstamp) - } - err := authClient.rpc.Call(serviceMethod, args, reply) - // Reset token on disconnect to mark for re-login on subsequent reconnect. - if err != nil && err == rpc.ErrShutdown { - authClient.token = "" } return err } diff --git a/cmd/lock-rpc-server.go b/cmd/lock-rpc-server.go index bc757d160..f2c3ef28b 100644 --- a/cmd/lock-rpc-server.go +++ b/cmd/lock-rpc-server.go @@ -57,8 +57,14 @@ type lockServer struct { timestamp time.Time // Timestamp set at the time of initialization. Resets naturally on minio server restart. } -func (l *lockServer) verifyTimestamp(tstamp time.Time) bool { - return l.timestamp.Equal(tstamp) +func (l *lockServer) verifyArgs(args *LockArgs) error { + if !l.timestamp.Equal(args.Timestamp) { + return errors.New("Timestamps don't match, server may have restarted.") + } + if !isRPCTokenValid(args.Token) { + return errors.New("Invalid token") + } + return nil } /// Distributed lock handlers @@ -85,8 +91,8 @@ func (l *lockServer) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) erro func (l *lockServer) Lock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - if l.verifyTimestamp(args.Timestamp) { - return errors.New("Timestamps don't match, server may have restarted.") + if err := l.verifyArgs(args); err != nil { + return err } _, ok := l.lockMap[args.Name] // No locks held on the given name. @@ -104,8 +110,8 @@ func (l *lockServer) Lock(args *LockArgs, reply *bool) error { func (l *lockServer) Unlock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - if l.verifyTimestamp(args.Timestamp) { - return errors.New("Timestamps don't match, server may have restarted.") + if err := l.verifyArgs(args); err != nil { + return err } _, ok := l.lockMap[args.Name] // No lock is held on the given name, there must be some issue at the lock client side. @@ -120,8 +126,8 @@ func (l *lockServer) Unlock(args *LockArgs, reply *bool) error { func (l *lockServer) RLock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - if l.verifyTimestamp(args.Timestamp) { - return errors.New("Timestamps don't match, server may have restarted.") + if err := l.verifyArgs(args); err != nil { + return err } locksHeld, ok := l.lockMap[args.Name] // No locks held on the given name. @@ -144,8 +150,8 @@ func (l *lockServer) RLock(args *LockArgs, reply *bool) error { func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - if l.verifyTimestamp(args.Timestamp) { - return errors.New("Timestamps don't match, server may have restarted.") + if err := l.verifyArgs(args); err != nil { + return err } locksHeld, ok := l.lockMap[args.Name] if !ok { diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 53bb5b124..b8791ad25 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -18,7 +18,7 @@ package cmd import ( "errors" - pathpkg "path" + pathutil "path" "strconv" "strings" "sync" @@ -31,39 +31,20 @@ var nsMutex *nsLockMap // Initialize distributed locking only in case of distributed setup. // Returns if the setup is distributed or not on success. -func initDsyncNodes(disks []string, port int) (bool, error) { - // Holds a bool indicating whether this server instance is part of - // distributed setup or not. - var isDist = false - // List of lock servers that part in the co-operative namespace locking. - var dsyncNodes []string - // Corresponding rpc paths needed for communication over net/rpc - var rpcPaths []string - - // Port to connect to for the lock servers in a distributed setup. +func initDsyncNodes(disks []string, port int) error { serverPort := strconv.Itoa(port) - + cred := serverConfig.GetCredential() + loginMethod := "Dsync.LoginHandler" + // Initialize rpc lock client information only if this instance is a distributed setup. + var clnts []dsync.RPC for _, disk := range disks { if idx := strings.LastIndex(disk, ":"); idx != -1 { - dsyncNodes = append(dsyncNodes, disk[:idx]+":"+serverPort) - rpcPaths = append(rpcPaths, pathpkg.Join(lockRPCPath, disk[idx+1:])) - } - if !isLocalStorage(disk) { - // One or more disks supplied as arguments are not - // attached to the local node. - isDist = true + dsyncAddr := disk[:idx] + ":" + serverPort // Construct a new dsync server addr. + rpcPath := pathutil.Join(lockRPCPath, disk[idx+1:]) // Construct a new rpc path for the disk. + clnts = append(clnts, newAuthClient(dsyncAddr, rpcPath, cred, loginMethod)) } } - // Initialize rpc lock client information only if this instance is a - // distributed setup. - clnts := make([]dsync.RPC, len(disks)) - for i := 0; i < len(disks); i++ { - clnts[i] = newAuthClient(dsyncNodes[i], rpcPaths[i], serverConfig.GetCredential(), "Dsync.LoginHandler") - } - if isDist { - return isDist, dsync.SetNodesWithClients(clnts) - } - return isDist, nil + return dsync.SetNodesWithClients(clnts) } // initNSLock - initialize name space lock map. @@ -112,7 +93,7 @@ func (n *nsLockMap) lock(volume, path string, readLock bool) { nsLk = &nsLock{ RWLocker: func() RWLocker { if n.isDist { - return dsync.NewDRWMutex(pathpkg.Join(volume, path)) + return dsync.NewDRWMutex(pathutil.Join(volume, path)) } return &sync.RWMutex{} }(), diff --git a/cmd/server-main.go b/cmd/server-main.go index 284335b1c..7a45080c5 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -227,6 +227,19 @@ func getPort(address string) int { return portInt } +// Returns if slice of disks is a distributed setup. +func isDistributedSetup(disks []string) (isDist bool) { + // Port to connect to for the lock servers in a distributed setup. + for _, disk := range disks { + if !isLocalStorage(disk) { + // One or more disks supplied as arguments are not + // attached to the local node. + isDist = true + } + } + return isDist +} + // serverMain handler called for 'minio server' command. func serverMain(c *cli.Context) { // Check 'server' cli arguments. @@ -252,10 +265,12 @@ func serverMain(c *cli.Context) { // Disks to be used in server init. disks := c.Args() - // Set nodes for dsync - var isDist bool - isDist, err = initDsyncNodes(disks, port) - fatalIf(err, "Unable to initialize distributed locking") + isDist := isDistributedSetup(disks) + // Set nodes for dsync for distributed setup. + if isDist { + err = initDsyncNodes(disks, port) + fatalIf(err, "Unable to initialize distributed locking") + } // Initialize name space lock. initNSLock(isDist) diff --git a/cmd/storage-rpc-client.go b/cmd/storage-rpc-client.go index d5c0d3a39..7b93b1433 100644 --- a/cmd/storage-rpc-client.go +++ b/cmd/storage-rpc-client.go @@ -114,7 +114,7 @@ func newRPCClient(networkPath string) (StorageAPI, error) { // MakeVol - make a volume. func (n networkStorage) MakeVol(volume string) error { reply := GenericReply{} - args := GenericVolArgs{GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, volume} + args := GenericVolArgs{Vol: volume} if err := n.rpcClient.Call("Storage.MakeVolHandler", &args, &reply); err != nil { return toStorageErr(err) } @@ -124,7 +124,7 @@ func (n networkStorage) MakeVol(volume string) error { // ListVols - List all volumes. func (n networkStorage) ListVols() (vols []VolInfo, err error) { ListVols := ListVolsReply{} - err = n.rpcClient.Call("Storage.ListVolsHandler", &GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, &ListVols) + err = n.rpcClient.Call("Storage.ListVolsHandler", &GenericArgs{}, &ListVols) if err != nil { return nil, err } @@ -133,7 +133,7 @@ func (n networkStorage) ListVols() (vols []VolInfo, err error) { // StatVol - get current Stat volume info. func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) { - args := GenericVolArgs{GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, volume} + args := GenericVolArgs{Vol: volume} if err = n.rpcClient.Call("Storage.StatVolHandler", &args, &volInfo); err != nil { return VolInfo{}, toStorageErr(err) } @@ -143,7 +143,7 @@ func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) { // DeleteVol - Delete a volume. func (n networkStorage) DeleteVol(volume string) error { reply := GenericReply{} - args := GenericVolArgs{GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, volume} + args := GenericVolArgs{Vol: volume} if err := n.rpcClient.Call("Storage.DeleteVolHandler", &args, &reply); err != nil { return toStorageErr(err) } @@ -156,10 +156,9 @@ func (n networkStorage) DeleteVol(volume string) error { func (n networkStorage) AppendFile(volume, path string, buffer []byte) (err error) { reply := GenericReply{} if err = n.rpcClient.Call("Storage.AppendFileHandler", &AppendFileArgs{ - GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, - Vol: volume, - Path: path, - Buffer: buffer, + Vol: volume, + Path: path, + Buffer: buffer, }, &reply); err != nil { return toStorageErr(err) } @@ -169,9 +168,8 @@ func (n networkStorage) AppendFile(volume, path string, buffer []byte) (err erro // StatFile - get latest Stat information for a file at path. func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) { if err = n.rpcClient.Call("Storage.StatFileHandler", &StatFileArgs{ - GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, - Vol: volume, - Path: path, + Vol: volume, + Path: path, }, &fileInfo); err != nil { return FileInfo{}, toStorageErr(err) } @@ -184,9 +182,8 @@ func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err er // not use this on large files as it would cause server to crash. func (n networkStorage) ReadAll(volume, path string) (buf []byte, err error) { if err = n.rpcClient.Call("Storage.ReadAllHandler", &ReadAllArgs{ - GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, - Vol: volume, - Path: path, + Vol: volume, + Path: path, }, &buf); err != nil { return nil, toStorageErr(err) } @@ -197,11 +194,10 @@ func (n networkStorage) ReadAll(volume, path string) (buf []byte, err error) { func (n networkStorage) ReadFile(volume string, path string, offset int64, buffer []byte) (m int64, err error) { var result []byte err = n.rpcClient.Call("Storage.ReadFileHandler", &ReadFileArgs{ - GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, - Vol: volume, - Path: path, - Offset: offset, - Size: len(buffer), + Vol: volume, + Path: path, + Offset: offset, + Size: len(buffer), }, &result) // Copy results to buffer. copy(buffer, result) @@ -212,9 +208,8 @@ func (n networkStorage) ReadFile(volume string, path string, offset int64, buffe // ListDir - list all entries at prefix. func (n networkStorage) ListDir(volume, path string) (entries []string, err error) { if err = n.rpcClient.Call("Storage.ListDirHandler", &ListDirArgs{ - GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, - Vol: volume, - Path: path, + Vol: volume, + Path: path, }, &entries); err != nil { return nil, toStorageErr(err) } @@ -226,9 +221,8 @@ func (n networkStorage) ListDir(volume, path string) (entries []string, err erro func (n networkStorage) DeleteFile(volume, path string) (err error) { reply := GenericReply{} if err = n.rpcClient.Call("Storage.DeleteFileHandler", &DeleteFileArgs{ - GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, - Vol: volume, - Path: path, + Vol: volume, + Path: path, }, &reply); err != nil { return toStorageErr(err) } @@ -239,11 +233,10 @@ func (n networkStorage) DeleteFile(volume, path string) (err error) { func (n networkStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { reply := GenericReply{} if err = n.rpcClient.Call("Storage.RenameFileHandler", &RenameFileArgs{ - GenericArgs: GenericArgs{Token: n.rpcClient.token}, - SrcVol: srcVolume, - SrcPath: srcPath, - DstVol: dstVolume, - DstPath: dstPath, + SrcVol: srcVolume, + SrcPath: srcPath, + DstVol: dstVolume, + DstPath: dstPath, }, &reply); err != nil { return toStorageErr(err) } diff --git a/cmd/storage-rpc-server-datatypes.go b/cmd/storage-rpc-server-datatypes.go index 798e2b3cd..6c682426d 100644 --- a/cmd/storage-rpc-server-datatypes.go +++ b/cmd/storage-rpc-server-datatypes.go @@ -18,16 +18,8 @@ package cmd import "time" -// TokenSetter is to be implemented by types that need a way to update member that represents a token. -// e.g, See GenericArgs. -type TokenSetter interface { - SetToken(token string) - SetTimestamp(tstamp time.Time) -} - // GenericReply represents any generic RPC reply. -type GenericReply struct { -} +type GenericReply struct{} // GenericArgs represents any generic RPC arguments. type GenericArgs struct { @@ -78,6 +70,7 @@ type ListVolsReply struct { type ReadAllArgs struct { // Authentication token generated by Login. GenericArgs + // Name of the volume. Vol string @@ -89,6 +82,7 @@ type ReadAllArgs struct { type ReadFileArgs struct { // Authentication token generated by Login. GenericArgs + // Name of the volume. Vol string