rpc: Refactor authentication and login changes. (#2543)

- Cache login requests.
- Converge validating distributed setup.
This commit is contained in:
Harshavardhana 2016-08-23 19:19:24 -07:00
parent c8dfc4cda4
commit e1b0985b5b
6 changed files with 98 additions and 103 deletions

View File

@ -27,6 +27,7 @@ import (
type AuthRPCClient struct { type AuthRPCClient struct {
rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client
cred credential // AccessKey and SecretKey cred credential // AccessKey and SecretKey
isLoggedIn bool // Indicates if the auth client has been logged in and token is valid.
token string // JWT based token token string // JWT based token
tstamp time.Time // Timestamp as received on Login RPC. tstamp time.Time // Timestamp as received on Login RPC.
loginMethod string // RPC service name for authenticating using JWT loginMethod string // RPC service name for authenticating using JWT
@ -37,6 +38,7 @@ func newAuthClient(node, rpcPath string, cred credential, loginMethod string) *A
return &AuthRPCClient{ return &AuthRPCClient{
rpc: newClient(node, rpcPath), rpc: newClient(node, rpcPath),
cred: cred, cred: cred,
isLoggedIn: false, // Not logged in yet.
loginMethod: loginMethod, loginMethod: loginMethod,
} }
} }
@ -44,42 +46,46 @@ func newAuthClient(node, rpcPath string, cred credential, loginMethod string) *A
// Close - closes underlying rpc connection. // Close - closes underlying rpc connection.
func (authClient *AuthRPCClient) Close() error { func (authClient *AuthRPCClient) Close() error {
// reset token on closing a connection // reset token on closing a connection
authClient.token = "" authClient.isLoggedIn = false
return authClient.rpc.Close() return authClient.rpc.Close()
} }
// Login - a jwt based authentication is performed with rpc server. // Login - a jwt based authentication is performed with rpc server.
func (authClient *AuthRPCClient) Login() (string, time.Time, error) { func (authClient *AuthRPCClient) Login() error {
// Return if already logged in.
if authClient.isLoggedIn {
return nil
}
reply := RPCLoginReply{} reply := RPCLoginReply{}
if err := authClient.rpc.Call(authClient.loginMethod, RPCLoginArgs{ if err := authClient.rpc.Call(authClient.loginMethod, RPCLoginArgs{
Username: authClient.cred.AccessKeyID, Username: authClient.cred.AccessKeyID,
Password: authClient.cred.SecretAccessKey, Password: authClient.cred.SecretAccessKey,
}, &reply); err != nil { }, &reply); err != nil {
return "", time.Time{}, err return err
} }
return reply.Token, reply.Timestamp, nil // Set token, time stamp as received from a successful login call.
authClient.token = reply.Token
authClient.tstamp = reply.Timestamp
authClient.isLoggedIn = true
return nil
} }
// Call - If rpc connection isn't established yet since previous disconnect, // Call - If rpc connection isn't established yet since previous disconnect,
// connection is established, a jwt authenticated login is performed and then // connection is established, a jwt authenticated login is performed and then
// the call is performed. // the call is performed.
func (authClient *AuthRPCClient) Call(serviceMethod string, args dsync.TokenSetter, reply interface{}) error { func (authClient *AuthRPCClient) Call(serviceMethod string, args dsync.TokenSetter, reply interface{}) (err error) {
if authClient.token == "" { // On successful login, attempt the call.
token, tstamp, err := authClient.Login() if err = authClient.Login(); err == nil {
if err != nil { // Set token and timestamp before the rpc call.
return err args.SetToken(authClient.token)
args.SetTimestamp(authClient.tstamp)
// ..
err = authClient.rpc.Call(serviceMethod, args, reply)
// Invalidate token to mark for re-login on subsequent reconnect.
if err != nil && err == rpc.ErrShutdown {
authClient.isLoggedIn = false
} }
// set token, time stamp as received from a successful login call.
authClient.token = token
authClient.tstamp = tstamp
// Update the RPC call's token with that received from the recent login call.
args.SetToken(token)
args.SetTimestamp(tstamp)
}
err := authClient.rpc.Call(serviceMethod, args, reply)
// Reset token on disconnect to mark for re-login on subsequent reconnect.
if err != nil && err == rpc.ErrShutdown {
authClient.token = ""
} }
return err return err
} }

View File

@ -57,8 +57,14 @@ type lockServer struct {
timestamp time.Time // Timestamp set at the time of initialization. Resets naturally on minio server restart. timestamp time.Time // Timestamp set at the time of initialization. Resets naturally on minio server restart.
} }
func (l *lockServer) verifyTimestamp(tstamp time.Time) bool { func (l *lockServer) verifyArgs(args *LockArgs) error {
return l.timestamp.Equal(tstamp) if !l.timestamp.Equal(args.Timestamp) {
return errors.New("Timestamps don't match, server may have restarted.")
}
if !isRPCTokenValid(args.Token) {
return errors.New("Invalid token")
}
return nil
} }
/// Distributed lock handlers /// Distributed lock handlers
@ -85,8 +91,8 @@ func (l *lockServer) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) erro
func (l *lockServer) Lock(args *LockArgs, reply *bool) error { func (l *lockServer) Lock(args *LockArgs, reply *bool) error {
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
if l.verifyTimestamp(args.Timestamp) { if err := l.verifyArgs(args); err != nil {
return errors.New("Timestamps don't match, server may have restarted.") return err
} }
_, ok := l.lockMap[args.Name] _, ok := l.lockMap[args.Name]
// No locks held on the given name. // No locks held on the given name.
@ -104,8 +110,8 @@ func (l *lockServer) Lock(args *LockArgs, reply *bool) error {
func (l *lockServer) Unlock(args *LockArgs, reply *bool) error { func (l *lockServer) Unlock(args *LockArgs, reply *bool) error {
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
if l.verifyTimestamp(args.Timestamp) { if err := l.verifyArgs(args); err != nil {
return errors.New("Timestamps don't match, server may have restarted.") return err
} }
_, ok := l.lockMap[args.Name] _, ok := l.lockMap[args.Name]
// No lock is held on the given name, there must be some issue at the lock client side. // No lock is held on the given name, there must be some issue at the lock client side.
@ -120,8 +126,8 @@ func (l *lockServer) Unlock(args *LockArgs, reply *bool) error {
func (l *lockServer) RLock(args *LockArgs, reply *bool) error { func (l *lockServer) RLock(args *LockArgs, reply *bool) error {
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
if l.verifyTimestamp(args.Timestamp) { if err := l.verifyArgs(args); err != nil {
return errors.New("Timestamps don't match, server may have restarted.") return err
} }
locksHeld, ok := l.lockMap[args.Name] locksHeld, ok := l.lockMap[args.Name]
// No locks held on the given name. // No locks held on the given name.
@ -144,8 +150,8 @@ func (l *lockServer) RLock(args *LockArgs, reply *bool) error {
func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error { func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error {
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
if l.verifyTimestamp(args.Timestamp) { if err := l.verifyArgs(args); err != nil {
return errors.New("Timestamps don't match, server may have restarted.") return err
} }
locksHeld, ok := l.lockMap[args.Name] locksHeld, ok := l.lockMap[args.Name]
if !ok { if !ok {

View File

@ -18,7 +18,7 @@ package cmd
import ( import (
"errors" "errors"
pathpkg "path" pathutil "path"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -31,39 +31,20 @@ var nsMutex *nsLockMap
// Initialize distributed locking only in case of distributed setup. // Initialize distributed locking only in case of distributed setup.
// Returns if the setup is distributed or not on success. // Returns if the setup is distributed or not on success.
func initDsyncNodes(disks []string, port int) (bool, error) { func initDsyncNodes(disks []string, port int) error {
// Holds a bool indicating whether this server instance is part of
// distributed setup or not.
var isDist = false
// List of lock servers that part in the co-operative namespace locking.
var dsyncNodes []string
// Corresponding rpc paths needed for communication over net/rpc
var rpcPaths []string
// Port to connect to for the lock servers in a distributed setup.
serverPort := strconv.Itoa(port) serverPort := strconv.Itoa(port)
cred := serverConfig.GetCredential()
loginMethod := "Dsync.LoginHandler"
// Initialize rpc lock client information only if this instance is a distributed setup.
var clnts []dsync.RPC
for _, disk := range disks { for _, disk := range disks {
if idx := strings.LastIndex(disk, ":"); idx != -1 { if idx := strings.LastIndex(disk, ":"); idx != -1 {
dsyncNodes = append(dsyncNodes, disk[:idx]+":"+serverPort) dsyncAddr := disk[:idx] + ":" + serverPort // Construct a new dsync server addr.
rpcPaths = append(rpcPaths, pathpkg.Join(lockRPCPath, disk[idx+1:])) rpcPath := pathutil.Join(lockRPCPath, disk[idx+1:]) // Construct a new rpc path for the disk.
} clnts = append(clnts, newAuthClient(dsyncAddr, rpcPath, cred, loginMethod))
if !isLocalStorage(disk) {
// One or more disks supplied as arguments are not
// attached to the local node.
isDist = true
} }
} }
// Initialize rpc lock client information only if this instance is a return dsync.SetNodesWithClients(clnts)
// distributed setup.
clnts := make([]dsync.RPC, len(disks))
for i := 0; i < len(disks); i++ {
clnts[i] = newAuthClient(dsyncNodes[i], rpcPaths[i], serverConfig.GetCredential(), "Dsync.LoginHandler")
}
if isDist {
return isDist, dsync.SetNodesWithClients(clnts)
}
return isDist, nil
} }
// initNSLock - initialize name space lock map. // initNSLock - initialize name space lock map.
@ -112,7 +93,7 @@ func (n *nsLockMap) lock(volume, path string, readLock bool) {
nsLk = &nsLock{ nsLk = &nsLock{
RWLocker: func() RWLocker { RWLocker: func() RWLocker {
if n.isDist { if n.isDist {
return dsync.NewDRWMutex(pathpkg.Join(volume, path)) return dsync.NewDRWMutex(pathutil.Join(volume, path))
} }
return &sync.RWMutex{} return &sync.RWMutex{}
}(), }(),

View File

@ -227,6 +227,19 @@ func getPort(address string) int {
return portInt return portInt
} }
// Returns if slice of disks is a distributed setup.
func isDistributedSetup(disks []string) (isDist bool) {
// Port to connect to for the lock servers in a distributed setup.
for _, disk := range disks {
if !isLocalStorage(disk) {
// One or more disks supplied as arguments are not
// attached to the local node.
isDist = true
}
}
return isDist
}
// serverMain handler called for 'minio server' command. // serverMain handler called for 'minio server' command.
func serverMain(c *cli.Context) { func serverMain(c *cli.Context) {
// Check 'server' cli arguments. // Check 'server' cli arguments.
@ -252,10 +265,12 @@ func serverMain(c *cli.Context) {
// Disks to be used in server init. // Disks to be used in server init.
disks := c.Args() disks := c.Args()
// Set nodes for dsync isDist := isDistributedSetup(disks)
var isDist bool // Set nodes for dsync for distributed setup.
isDist, err = initDsyncNodes(disks, port) if isDist {
fatalIf(err, "Unable to initialize distributed locking") err = initDsyncNodes(disks, port)
fatalIf(err, "Unable to initialize distributed locking")
}
// Initialize name space lock. // Initialize name space lock.
initNSLock(isDist) initNSLock(isDist)

View File

@ -114,7 +114,7 @@ func newRPCClient(networkPath string) (StorageAPI, error) {
// MakeVol - make a volume. // MakeVol - make a volume.
func (n networkStorage) MakeVol(volume string) error { func (n networkStorage) MakeVol(volume string) error {
reply := GenericReply{} reply := GenericReply{}
args := GenericVolArgs{GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, volume} args := GenericVolArgs{Vol: volume}
if err := n.rpcClient.Call("Storage.MakeVolHandler", &args, &reply); err != nil { if err := n.rpcClient.Call("Storage.MakeVolHandler", &args, &reply); err != nil {
return toStorageErr(err) return toStorageErr(err)
} }
@ -124,7 +124,7 @@ func (n networkStorage) MakeVol(volume string) error {
// ListVols - List all volumes. // ListVols - List all volumes.
func (n networkStorage) ListVols() (vols []VolInfo, err error) { func (n networkStorage) ListVols() (vols []VolInfo, err error) {
ListVols := ListVolsReply{} ListVols := ListVolsReply{}
err = n.rpcClient.Call("Storage.ListVolsHandler", &GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, &ListVols) err = n.rpcClient.Call("Storage.ListVolsHandler", &GenericArgs{}, &ListVols)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -133,7 +133,7 @@ func (n networkStorage) ListVols() (vols []VolInfo, err error) {
// StatVol - get current Stat volume info. // StatVol - get current Stat volume info.
func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) { func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) {
args := GenericVolArgs{GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, volume} args := GenericVolArgs{Vol: volume}
if err = n.rpcClient.Call("Storage.StatVolHandler", &args, &volInfo); err != nil { if err = n.rpcClient.Call("Storage.StatVolHandler", &args, &volInfo); err != nil {
return VolInfo{}, toStorageErr(err) return VolInfo{}, toStorageErr(err)
} }
@ -143,7 +143,7 @@ func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) {
// DeleteVol - Delete a volume. // DeleteVol - Delete a volume.
func (n networkStorage) DeleteVol(volume string) error { func (n networkStorage) DeleteVol(volume string) error {
reply := GenericReply{} reply := GenericReply{}
args := GenericVolArgs{GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, volume} args := GenericVolArgs{Vol: volume}
if err := n.rpcClient.Call("Storage.DeleteVolHandler", &args, &reply); err != nil { if err := n.rpcClient.Call("Storage.DeleteVolHandler", &args, &reply); err != nil {
return toStorageErr(err) return toStorageErr(err)
} }
@ -156,10 +156,9 @@ func (n networkStorage) DeleteVol(volume string) error {
func (n networkStorage) AppendFile(volume, path string, buffer []byte) (err error) { func (n networkStorage) AppendFile(volume, path string, buffer []byte) (err error) {
reply := GenericReply{} reply := GenericReply{}
if err = n.rpcClient.Call("Storage.AppendFileHandler", &AppendFileArgs{ if err = n.rpcClient.Call("Storage.AppendFileHandler", &AppendFileArgs{
GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, Vol: volume,
Vol: volume, Path: path,
Path: path, Buffer: buffer,
Buffer: buffer,
}, &reply); err != nil { }, &reply); err != nil {
return toStorageErr(err) return toStorageErr(err)
} }
@ -169,9 +168,8 @@ func (n networkStorage) AppendFile(volume, path string, buffer []byte) (err erro
// StatFile - get latest Stat information for a file at path. // StatFile - get latest Stat information for a file at path.
func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) { func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) {
if err = n.rpcClient.Call("Storage.StatFileHandler", &StatFileArgs{ if err = n.rpcClient.Call("Storage.StatFileHandler", &StatFileArgs{
GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, Vol: volume,
Vol: volume, Path: path,
Path: path,
}, &fileInfo); err != nil { }, &fileInfo); err != nil {
return FileInfo{}, toStorageErr(err) return FileInfo{}, toStorageErr(err)
} }
@ -184,9 +182,8 @@ func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err er
// not use this on large files as it would cause server to crash. // not use this on large files as it would cause server to crash.
func (n networkStorage) ReadAll(volume, path string) (buf []byte, err error) { func (n networkStorage) ReadAll(volume, path string) (buf []byte, err error) {
if err = n.rpcClient.Call("Storage.ReadAllHandler", &ReadAllArgs{ if err = n.rpcClient.Call("Storage.ReadAllHandler", &ReadAllArgs{
GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, Vol: volume,
Vol: volume, Path: path,
Path: path,
}, &buf); err != nil { }, &buf); err != nil {
return nil, toStorageErr(err) return nil, toStorageErr(err)
} }
@ -197,11 +194,10 @@ func (n networkStorage) ReadAll(volume, path string) (buf []byte, err error) {
func (n networkStorage) ReadFile(volume string, path string, offset int64, buffer []byte) (m int64, err error) { func (n networkStorage) ReadFile(volume string, path string, offset int64, buffer []byte) (m int64, err error) {
var result []byte var result []byte
err = n.rpcClient.Call("Storage.ReadFileHandler", &ReadFileArgs{ err = n.rpcClient.Call("Storage.ReadFileHandler", &ReadFileArgs{
GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, Vol: volume,
Vol: volume, Path: path,
Path: path, Offset: offset,
Offset: offset, Size: len(buffer),
Size: len(buffer),
}, &result) }, &result)
// Copy results to buffer. // Copy results to buffer.
copy(buffer, result) copy(buffer, result)
@ -212,9 +208,8 @@ func (n networkStorage) ReadFile(volume string, path string, offset int64, buffe
// ListDir - list all entries at prefix. // ListDir - list all entries at prefix.
func (n networkStorage) ListDir(volume, path string) (entries []string, err error) { func (n networkStorage) ListDir(volume, path string) (entries []string, err error) {
if err = n.rpcClient.Call("Storage.ListDirHandler", &ListDirArgs{ if err = n.rpcClient.Call("Storage.ListDirHandler", &ListDirArgs{
GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, Vol: volume,
Vol: volume, Path: path,
Path: path,
}, &entries); err != nil { }, &entries); err != nil {
return nil, toStorageErr(err) return nil, toStorageErr(err)
} }
@ -226,9 +221,8 @@ func (n networkStorage) ListDir(volume, path string) (entries []string, err erro
func (n networkStorage) DeleteFile(volume, path string) (err error) { func (n networkStorage) DeleteFile(volume, path string) (err error) {
reply := GenericReply{} reply := GenericReply{}
if err = n.rpcClient.Call("Storage.DeleteFileHandler", &DeleteFileArgs{ if err = n.rpcClient.Call("Storage.DeleteFileHandler", &DeleteFileArgs{
GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, Vol: volume,
Vol: volume, Path: path,
Path: path,
}, &reply); err != nil { }, &reply); err != nil {
return toStorageErr(err) return toStorageErr(err)
} }
@ -239,11 +233,10 @@ func (n networkStorage) DeleteFile(volume, path string) (err error) {
func (n networkStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { func (n networkStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) {
reply := GenericReply{} reply := GenericReply{}
if err = n.rpcClient.Call("Storage.RenameFileHandler", &RenameFileArgs{ if err = n.rpcClient.Call("Storage.RenameFileHandler", &RenameFileArgs{
GenericArgs: GenericArgs{Token: n.rpcClient.token}, SrcVol: srcVolume,
SrcVol: srcVolume, SrcPath: srcPath,
SrcPath: srcPath, DstVol: dstVolume,
DstVol: dstVolume, DstPath: dstPath,
DstPath: dstPath,
}, &reply); err != nil { }, &reply); err != nil {
return toStorageErr(err) return toStorageErr(err)
} }

View File

@ -18,16 +18,8 @@ package cmd
import "time" import "time"
// TokenSetter is to be implemented by types that need a way to update member that represents a token.
// e.g, See GenericArgs.
type TokenSetter interface {
SetToken(token string)
SetTimestamp(tstamp time.Time)
}
// GenericReply represents any generic RPC reply. // GenericReply represents any generic RPC reply.
type GenericReply struct { type GenericReply struct{}
}
// GenericArgs represents any generic RPC arguments. // GenericArgs represents any generic RPC arguments.
type GenericArgs struct { type GenericArgs struct {
@ -78,6 +70,7 @@ type ListVolsReply struct {
type ReadAllArgs struct { type ReadAllArgs struct {
// Authentication token generated by Login. // Authentication token generated by Login.
GenericArgs GenericArgs
// Name of the volume. // Name of the volume.
Vol string Vol string
@ -89,6 +82,7 @@ type ReadAllArgs struct {
type ReadFileArgs struct { type ReadFileArgs struct {
// Authentication token generated by Login. // Authentication token generated by Login.
GenericArgs GenericArgs
// Name of the volume. // Name of the volume.
Vol string Vol string