Adopt dsync interface changes and major cleanup on RPC server/client.

* Rename GenericArgs to AuthRPCArgs
* Rename GenericReply to AuthRPCReply
* Remove authConfig.loginMethod and add authConfig.ServiceName
* Rename loginServer to AuthRPCServer
* Rename RPCLoginArgs to LoginRPCArgs
* Rename RPCLoginReply to LoginRPCReply
* Version and RequestTime are added to LoginRPCArgs and verified by
  server side, not client side.
* Fix data race in lockMaintainence loop.
This commit is contained in:
Bala.FA
2016-12-23 20:42:19 +05:30
parent cde6496172
commit 6d10f4c19a
39 changed files with 1083 additions and 949 deletions

View File

@@ -23,18 +23,14 @@ import (
"net/rpc"
"net/url"
"path"
"sync"
"sync/atomic"
"time"
"github.com/minio/minio/pkg/disk"
)
type networkStorage struct {
networkIOErrCount int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
netAddr string
netPath string
rpcClient *storageRPCClient
rpcClient *AuthRPCClient
}
const (
@@ -99,104 +95,6 @@ func toStorageErr(err error) error {
return err
}
// storageRPCClient is a wrapper type for RPCClient which provides JWT based authentication across reconnects.
type storageRPCClient struct {
sync.Mutex
cfg storageConfig
rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client
serverToken string // Disk rpc JWT based token.
serverVersion string // Server version exchanged by the RPC.
}
// Storage config represents authentication credentials and Login
// method name to be used for fetching JWT tokens from the storage
// server.
type storageConfig struct {
addr string // Network address path of storage RPC server.
path string // Network storage path for HTTP dial.
secureConn bool // Indicates if this storage RPC is on a secured connection.
creds credential
}
// newStorageClient - returns a jwt based authenticated (go) storage rpc client.
func newStorageClient(storageCfg storageConfig) *storageRPCClient {
return &storageRPCClient{
// Save the config.
cfg: storageCfg,
rpc: newRPCClient(storageCfg.addr, storageCfg.path, storageCfg.secureConn),
}
}
// Close - closes underlying rpc connection.
func (storageClient *storageRPCClient) Close() error {
storageClient.Lock()
// reset token on closing a connection
storageClient.serverToken = ""
storageClient.Unlock()
return storageClient.rpc.Close()
}
// Login - a jwt based authentication is performed with rpc server.
func (storageClient *storageRPCClient) Login() (err error) {
storageClient.Lock()
// As soon as the function returns unlock,
defer storageClient.Unlock()
// Return if token is already set.
if storageClient.serverToken != "" {
return nil
}
reply := RPCLoginReply{}
if err = storageClient.rpc.Call("Storage.LoginHandler", RPCLoginArgs{
Username: storageClient.cfg.creds.AccessKey,
Password: storageClient.cfg.creds.SecretKey,
}, &reply); err != nil {
return err
}
// Validate if version do indeed match.
if reply.ServerVersion != Version {
return errServerVersionMismatch
}
// Validate if server timestamp is skewed.
curTime := time.Now().UTC()
if curTime.Sub(reply.Timestamp) > globalMaxSkewTime {
return errServerTimeMismatch
}
// Set token, time stamp as received from a successful login call.
storageClient.serverToken = reply.Token
storageClient.serverVersion = reply.ServerVersion
return nil
}
// Call - If rpc connection isn't established yet since previous disconnect,
// connection is established, a jwt authenticated login is performed and then
// the call is performed.
func (storageClient *storageRPCClient) Call(serviceMethod string, args interface {
SetToken(token string)
SetTimestamp(tstamp time.Time)
}, reply interface{}) (err error) {
// On successful login, attempt the call.
if err = storageClient.Login(); err != nil {
return err
}
// Set token and timestamp before the rpc call.
args.SetToken(storageClient.serverToken)
args.SetTimestamp(time.Now().UTC())
// Call the underlying rpc.
err = storageClient.rpc.Call(serviceMethod, args, reply)
// Invalidate token, and mark it for re-login.
if err == rpc.ErrShutdown {
storageClient.Close()
}
return err
}
// Initialize new storage rpc client.
func newStorageRPC(ep *url.URL) (StorageAPI, error) {
if ep == nil {
@@ -207,38 +105,35 @@ func newStorageRPC(ep *url.URL) (StorageAPI, error) {
rpcPath := path.Join(storageRPCPath, getPath(ep))
rpcAddr := ep.Host
// Initialize rpc client with network address and rpc path.
accessKey := serverConfig.GetCredential().AccessKey
secretKey := serverConfig.GetCredential().SecretKey
serverCred := serverConfig.GetCredential()
accessKey := serverCred.AccessKey
secretKey := serverCred.SecretKey
if ep.User != nil {
accessKey = ep.User.Username()
if key, set := ep.User.Password(); set {
secretKey = key
if password, ok := ep.User.Password(); ok {
secretKey = password
}
}
// Initialize network storage.
ndisk := &networkStorage{
netAddr: ep.Host,
netPath: getPath(ep),
rpcClient: newStorageClient(storageConfig{
addr: rpcAddr,
path: rpcPath,
creds: credential{
AccessKey: accessKey,
SecretKey: secretKey,
},
secureConn: isSSL(),
storageAPI := &networkStorage{
rpcClient: newAuthRPCClient(authConfig{
accessKey: accessKey,
secretKey: secretKey,
serverAddr: rpcAddr,
serviceEndpoint: rpcPath,
secureConn: isSSL(),
serviceName: "Storage",
disableReconnect: true,
}),
}
// Returns successfully here.
return ndisk, nil
return storageAPI, nil
}
// Stringer interface compatible representation of network device.
func (n *networkStorage) String() string {
return n.netAddr + ":" + n.netPath
return n.rpcClient.ServerAddr() + ":" + n.rpcClient.ServiceEndpoint()
}
// Network IO error count is kept at 256 with some simple
@@ -250,10 +145,9 @@ func (n *networkStorage) String() string {
// incoming i/o.
const maxAllowedNetworkIOError = 256 // maximum allowed network IOError.
// Initializes the remote RPC connection by attempting a login attempt.
func (n *networkStorage) Init() (err error) {
// Attempt a login to reconnect.
err = n.rpcClient.Login()
// Init - attempts a login to reconnect.
func (n *networkStorage) Init() error {
err := n.rpcClient.Login()
return toStorageErr(err)
}
@@ -278,7 +172,7 @@ func (n *networkStorage) DiskInfo() (info disk.Info, err error) {
return disk.Info{}, errFaultyRemoteDisk
}
args := GenericArgs{}
args := AuthRPCArgs{}
if err = n.rpcClient.Call("Storage.DiskInfoHandler", &args, &info); err != nil {
return disk.Info{}, toStorageErr(err)
}
@@ -299,7 +193,7 @@ func (n *networkStorage) MakeVol(volume string) (err error) {
return errFaultyRemoteDisk
}
reply := GenericReply{}
reply := AuthRPCReply{}
args := GenericVolArgs{Vol: volume}
if err := n.rpcClient.Call("Storage.MakeVolHandler", &args, &reply); err != nil {
return toStorageErr(err)
@@ -322,7 +216,7 @@ func (n *networkStorage) ListVols() (vols []VolInfo, err error) {
}
ListVols := ListVolsReply{}
err = n.rpcClient.Call("Storage.ListVolsHandler", &GenericArgs{}, &ListVols)
err = n.rpcClient.Call("Storage.ListVolsHandler", &AuthRPCArgs{}, &ListVols)
if err != nil {
return nil, toStorageErr(err)
}
@@ -364,7 +258,7 @@ func (n *networkStorage) DeleteVol(volume string) (err error) {
return errFaultyRemoteDisk
}
reply := GenericReply{}
reply := AuthRPCReply{}
args := GenericVolArgs{Vol: volume}
if err := n.rpcClient.Call("Storage.DeleteVolHandler", &args, &reply); err != nil {
return toStorageErr(err)
@@ -386,7 +280,7 @@ func (n *networkStorage) PrepareFile(volume, path string, length int64) (err err
return errFaultyRemoteDisk
}
reply := GenericReply{}
reply := AuthRPCReply{}
if err = n.rpcClient.Call("Storage.PrepareFileHandler", &PrepareFileArgs{
Vol: volume,
Path: path,
@@ -411,7 +305,7 @@ func (n *networkStorage) AppendFile(volume, path string, buffer []byte) (err err
return errFaultyRemoteDisk
}
reply := GenericReply{}
reply := AuthRPCReply{}
if err = n.rpcClient.Call("Storage.AppendFileHandler", &AppendFileArgs{
Vol: volume,
Path: path,
@@ -545,7 +439,7 @@ func (n *networkStorage) DeleteFile(volume, path string) (err error) {
return errFaultyRemoteDisk
}
reply := GenericReply{}
reply := AuthRPCReply{}
if err = n.rpcClient.Call("Storage.DeleteFileHandler", &DeleteFileArgs{
Vol: volume,
Path: path,
@@ -569,7 +463,7 @@ func (n *networkStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath strin
return errFaultyRemoteDisk
}
reply := GenericReply{}
reply := AuthRPCReply{}
if err = n.rpcClient.Call("Storage.RenameFileHandler", &RenameFileArgs{
SrcVol: srcVolume,
SrcPath: srcPath,