Layered rpc-client implementation (#2512)

This commit is contained in:
Krishnan Parthasarathi 2016-08-22 11:01:21 -07:00 committed by Harshavardhana
parent 7e3e24b394
commit bda6bcd5be
11 changed files with 307 additions and 258 deletions

79
cmd/auth-rpc-client.go Normal file
View File

@ -0,0 +1,79 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"time"
"github.com/minio/dsync"
)
// AuthRPCClient is a wrapper type for RPCClient which provides JWT based authentication across reconnects.
type AuthRPCClient struct {
rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client
cred credential // AccessKey and SecretKey
token string // JWT based token
tstamp time.Time // Timestamp as received on Login RPC.
loginMethod string // RPC service name for authenticating using JWT
}
// newAuthClient - returns a jwt based authenticated (go) rpc client, which does automatic reconnect.
func newAuthClient(node, rpcPath string, cred credential, loginMethod string) *AuthRPCClient {
return &AuthRPCClient{
rpc: newClient(node, rpcPath),
cred: cred,
loginMethod: loginMethod,
}
}
// Close - closes underlying rpc connection.
func (authClient *AuthRPCClient) Close() error {
// reset token on closing a connection
authClient.token = ""
return authClient.rpc.Close()
}
// Login - a jwt based authentication is performed with rpc server.
func (authClient *AuthRPCClient) Login() (string, time.Time, error) {
reply := RPCLoginReply{}
if err := authClient.rpc.Call(authClient.loginMethod, RPCLoginArgs{
Username: authClient.cred.AccessKeyID,
Password: authClient.cred.SecretAccessKey,
}, &reply); err != nil {
return "", time.Time{}, err
}
return reply.Token, reply.Timestamp, nil
}
// Call - If rpc connection isn't established yet since previous disconnect,
// connection is established, a jwt authenticated login is performed and then
// the call is performed.
func (authClient *AuthRPCClient) Call(serviceMethod string, args dsync.TokenSetter, reply interface{}) (err error) {
if authClient.token == "" {
token, tstamp, err := authClient.Login()
if err != nil {
return err
}
// set token, time stamp as received from a successful login call.
authClient.token = token
authClient.tstamp = tstamp
// Update the RPC call's token with that received from the recent login call.
args.SetToken(token)
args.SetTimestamp(tstamp)
}
return authClient.rpc.Call(serviceMethod, args, reply)
}

View File

@ -17,36 +17,82 @@
package cmd package cmd
import ( import (
"errors"
"fmt" "fmt"
"net/rpc" "net/rpc"
"path" "path"
"strings" "strings"
"sync" "sync"
"time"
router "github.com/gorilla/mux" router "github.com/gorilla/mux"
) )
const lockRPCPath = "/minio/lock" const lockRPCPath = "/minio/lock"
// LockArgs besides lock name, holds Token and Timestamp for session
// authentication and validation server restart.
type LockArgs struct {
Name string
Token string
Timestamp time.Time
}
// SetToken - sets the token to the supplied value.
func (l *LockArgs) SetToken(token string) {
l.Token = token
}
// SetTimestamp - sets the timestamp to the supplied value.
func (l *LockArgs) SetTimestamp(tstamp time.Time) {
l.Timestamp = tstamp
}
type lockServer struct { type lockServer struct {
rpcPath string rpcPath string
mutex sync.Mutex mutex sync.Mutex
// e.g, when a Lock(name) is held, map[string][]bool{"name" : []bool{true}} // e.g, when a Lock(name) is held, map[string][]bool{"name" : []bool{true}}
// when one or more RLock() is held, map[string][]bool{"name" : []bool{false, false}} // when one or more RLock() is held, map[string][]bool{"name" : []bool{false, false}}
lockMap map[string][]bool lockMap map[string][]bool
timestamp time.Time // Timestamp set at the time of initialization. Resets naturally on minio server restart.
}
func (l *lockServer) verifyTimestamp(tstamp time.Time) bool {
return l.timestamp.Equal(tstamp)
} }
/// Distributed lock handlers /// Distributed lock handlers
// LoginHandler - handles LoginHandler RPC call.
func (l *lockServer) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) error {
jwt, err := newJWT(defaultTokenExpiry)
if err != nil {
return err
}
if err = jwt.Authenticate(args.Username, args.Password); err != nil {
return err
}
token, err := jwt.GenerateToken(args.Username)
if err != nil {
return err
}
reply.Token = token
reply.Timestamp = l.timestamp
return nil
}
// LockHandler - rpc handler for lock operation. // LockHandler - rpc handler for lock operation.
func (l *lockServer) Lock(name *string, reply *bool) error { func (l *lockServer) Lock(args *LockArgs, reply *bool) error {
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
_, ok := l.lockMap[*name] if l.verifyTimestamp(args.Timestamp) {
return errors.New("Timestamps don't match, server may have restarted.")
}
_, ok := l.lockMap[args.Name]
// No locks held on the given name. // No locks held on the given name.
if !ok { if !ok {
*reply = true *reply = true
l.lockMap[*name] = []bool{true} l.lockMap[args.Name] = []bool{true}
return nil return nil
} }
// Either a read or write lock is held on the given name. // Either a read or write lock is held on the given name.
@ -55,56 +101,65 @@ func (l *lockServer) Lock(name *string, reply *bool) error {
} }
// UnlockHandler - rpc handler for unlock operation. // UnlockHandler - rpc handler for unlock operation.
func (l *lockServer) Unlock(name *string, reply *bool) error { func (l *lockServer) Unlock(args *LockArgs, reply *bool) error {
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
_, ok := l.lockMap[*name] if l.verifyTimestamp(args.Timestamp) {
return errors.New("Timestamps don't match, server may have restarted.")
}
_, ok := l.lockMap[args.Name]
// No lock is held on the given name, there must be some issue at the lock client side. // No lock is held on the given name, there must be some issue at the lock client side.
if !ok { if !ok {
return fmt.Errorf("Unlock attempted on an un-locked entity: %s", *name) return fmt.Errorf("Unlock attempted on an un-locked entity: %s", args.Name)
} }
*reply = true *reply = true
delete(l.lockMap, *name) delete(l.lockMap, args.Name)
return nil return nil
} }
func (l *lockServer) RLock(name *string, reply *bool) error { func (l *lockServer) RLock(args *LockArgs, reply *bool) error {
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
locksHeld, ok := l.lockMap[*name] if l.verifyTimestamp(args.Timestamp) {
return errors.New("Timestamps don't match, server may have restarted.")
}
locksHeld, ok := l.lockMap[args.Name]
// No locks held on the given name. // No locks held on the given name.
if !ok { if !ok {
// First read-lock to be held on *name. // First read-lock to be held on *name.
l.lockMap[*name] = []bool{false} l.lockMap[args.Name] = []bool{false}
*reply = true *reply = true
} else if len(locksHeld) == 1 && locksHeld[0] == true { } else if len(locksHeld) == 1 && locksHeld[0] == true {
// A write-lock is held, read lock can't be granted. // A write-lock is held, read lock can't be granted.
*reply = false *reply = false
} else { } else {
// Add an entry for this read lock. // Add an entry for this read lock.
l.lockMap[*name] = append(locksHeld, false) l.lockMap[args.Name] = append(locksHeld, false)
*reply = true *reply = true
} }
return nil return nil
} }
func (l *lockServer) RUnlock(name *string, reply *bool) error { func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error {
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
locksHeld, ok := l.lockMap[*name] if l.verifyTimestamp(args.Timestamp) {
return errors.New("Timestamps don't match, server may have restarted.")
}
locksHeld, ok := l.lockMap[args.Name]
if !ok { if !ok {
return fmt.Errorf("RUnlock attempted on an un-locked entity: %s", *name) return fmt.Errorf("RUnlock attempted on an un-locked entity: %s", args.Name)
} }
if len(locksHeld) > 1 { if len(locksHeld) > 1 {
// Remove one of the read locks held. // Remove one of the read locks held.
locksHeld = locksHeld[1:] locksHeld = locksHeld[1:]
l.lockMap[*name] = locksHeld l.lockMap[args.Name] = locksHeld
*reply = true *reply = true
} else { } else {
// Delete the map entry since this is the last read lock held // Delete the map entry since this is the last read lock held
// on *name. // on *name.
delete(l.lockMap, *name) delete(l.lockMap, args.Name)
*reply = true *reply = true
} }
return nil return nil
@ -136,9 +191,10 @@ func newLockServers(serverConfig serverCmdConfig) (lockServers []*lockServer) {
export = export[idx+1:] export = export[idx+1:]
} }
lockServers = append(lockServers, &lockServer{ lockServers = append(lockServers, &lockServer{
rpcPath: export, rpcPath: export,
mutex: sync.Mutex{}, mutex: sync.Mutex{},
lockMap: make(map[string][]bool), lockMap: make(map[string][]bool),
timestamp: time.Now().UTC(),
}) })
} }
} }

View File

@ -56,8 +56,12 @@ func initDsyncNodes(disks []string, port int) (bool, error) {
} }
// Initialize rpc lock client information only if this instance is a // Initialize rpc lock client information only if this instance is a
// distributed setup. // distributed setup.
clnts := make([]dsync.RPC, len(disks))
for i := 0; i < len(disks); i++ {
clnts[i] = newAuthClient(dsyncNodes[i], rpcPaths[i], serverConfig.GetCredential(), "Dsync.LoginHandler")
}
if isDist { if isDist {
return isDist, dsync.SetNodesWithPath(dsyncNodes, rpcPaths) return isDist, dsync.SetNodesWithClients(clnts)
} }
return isDist, nil return isDist, nil
} }

View File

@ -27,8 +27,7 @@ type networkStorage struct {
netScheme string netScheme string
netAddr string netAddr string
netPath string netPath string
rpcClient *RPCClient rpcClient *AuthRPCClient
rpcToken string
} }
const ( const (
@ -75,22 +74,6 @@ func toStorageErr(err error) error {
return err return err
} }
// Login rpc client makes an authentication request to the rpc server.
// Receives a session token which will be used for subsequent requests.
// FIXME: Currently these tokens expire in 100yrs.
func loginRPCClient(rpcClient *RPCClient) (tokenStr string, err error) {
cred := serverConfig.GetCredential()
reply := RPCLoginReply{}
if err = rpcClient.Call("Storage.LoginHandler", RPCLoginArgs{
Username: cred.AccessKeyID,
Password: cred.SecretAccessKey,
}, &reply); err != nil {
return "", err
}
// Reply back server provided token.
return reply.Token, nil
}
// Initialize new rpc client. // Initialize new rpc client.
func newRPCClient(networkPath string) (StorageAPI, error) { func newRPCClient(networkPath string) (StorageAPI, error) {
// Input validation. // Input validation.
@ -109,16 +92,8 @@ func newRPCClient(networkPath string) (StorageAPI, error) {
port := getPort(srvConfig.serverAddr) port := getPort(srvConfig.serverAddr)
rpcAddr := netAddr + ":" + strconv.Itoa(port) rpcAddr := netAddr + ":" + strconv.Itoa(port)
// Initialize rpc client with network address and rpc path. // Initialize rpc client with network address and rpc path.
rpcClient := newClient(rpcAddr, rpcPath) cred := serverConfig.GetCredential()
rpcClient := newAuthClient(rpcAddr, rpcPath, cred, "Storage.LoginHandler")
token, err := loginRPCClient(rpcClient)
if err != nil {
// Close the corresponding network connection w/ server to
// avoid leaking socket file descriptor.
rpcClient.Close()
return nil, err
}
// Initialize network storage. // Initialize network storage.
ndisk := &networkStorage{ ndisk := &networkStorage{
netScheme: func() string { netScheme: func() string {
@ -130,7 +105,6 @@ func newRPCClient(networkPath string) (StorageAPI, error) {
netAddr: netAddr, netAddr: netAddr,
netPath: netPath, netPath: netPath,
rpcClient: rpcClient, rpcClient: rpcClient,
rpcToken: token,
} }
// Returns successfully here. // Returns successfully here.
@ -143,8 +117,8 @@ func (n networkStorage) MakeVol(volume string) error {
return errVolumeBusy return errVolumeBusy
} }
reply := GenericReply{} reply := GenericReply{}
args := GenericVolArgs{n.rpcToken, volume} args := GenericVolArgs{GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, volume}
if err := n.rpcClient.Call("Storage.MakeVolHandler", args, &reply); err != nil { if err := n.rpcClient.Call("Storage.MakeVolHandler", &args, &reply); err != nil {
return toStorageErr(err) return toStorageErr(err)
} }
return nil return nil
@ -156,7 +130,7 @@ func (n networkStorage) ListVols() (vols []VolInfo, err error) {
return nil, errVolumeBusy return nil, errVolumeBusy
} }
ListVols := ListVolsReply{} ListVols := ListVolsReply{}
err = n.rpcClient.Call("Storage.ListVolsHandler", n.rpcToken, &ListVols) err = n.rpcClient.Call("Storage.ListVolsHandler", &GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, &ListVols)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -168,8 +142,8 @@ func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) {
if n.rpcClient == nil { if n.rpcClient == nil {
return VolInfo{}, errVolumeBusy return VolInfo{}, errVolumeBusy
} }
args := GenericVolArgs{n.rpcToken, volume} args := GenericVolArgs{GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, volume}
if err = n.rpcClient.Call("Storage.StatVolHandler", args, &volInfo); err != nil { if err = n.rpcClient.Call("Storage.StatVolHandler", &args, &volInfo); err != nil {
return VolInfo{}, toStorageErr(err) return VolInfo{}, toStorageErr(err)
} }
return volInfo, nil return volInfo, nil
@ -181,8 +155,8 @@ func (n networkStorage) DeleteVol(volume string) error {
return errVolumeBusy return errVolumeBusy
} }
reply := GenericReply{} reply := GenericReply{}
args := GenericVolArgs{n.rpcToken, volume} args := GenericVolArgs{GenericArgs{n.rpcClient.token, n.rpcClient.tstamp}, volume}
if err := n.rpcClient.Call("Storage.DeleteVolHandler", args, &reply); err != nil { if err := n.rpcClient.Call("Storage.DeleteVolHandler", &args, &reply); err != nil {
return toStorageErr(err) return toStorageErr(err)
} }
return nil return nil
@ -196,11 +170,11 @@ func (n networkStorage) AppendFile(volume, path string, buffer []byte) (err erro
return errVolumeBusy return errVolumeBusy
} }
reply := GenericReply{} reply := GenericReply{}
if err = n.rpcClient.Call("Storage.AppendFileHandler", AppendFileArgs{ if err = n.rpcClient.Call("Storage.AppendFileHandler", &AppendFileArgs{
Token: n.rpcToken, GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp},
Vol: volume, Vol: volume,
Path: path, Path: path,
Buffer: buffer, Buffer: buffer,
}, &reply); err != nil { }, &reply); err != nil {
return toStorageErr(err) return toStorageErr(err)
} }
@ -212,10 +186,10 @@ func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err er
if n.rpcClient == nil { if n.rpcClient == nil {
return FileInfo{}, errVolumeBusy return FileInfo{}, errVolumeBusy
} }
if err = n.rpcClient.Call("Storage.StatFileHandler", StatFileArgs{ if err = n.rpcClient.Call("Storage.StatFileHandler", &StatFileArgs{
Token: n.rpcToken, GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp},
Vol: volume, Vol: volume,
Path: path, Path: path,
}, &fileInfo); err != nil { }, &fileInfo); err != nil {
return FileInfo{}, toStorageErr(err) return FileInfo{}, toStorageErr(err)
} }
@ -230,10 +204,10 @@ func (n networkStorage) ReadAll(volume, path string) (buf []byte, err error) {
if n.rpcClient == nil { if n.rpcClient == nil {
return nil, errVolumeBusy return nil, errVolumeBusy
} }
if err = n.rpcClient.Call("Storage.ReadAllHandler", ReadAllArgs{ if err = n.rpcClient.Call("Storage.ReadAllHandler", &ReadAllArgs{
Token: n.rpcToken, GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp},
Vol: volume, Vol: volume,
Path: path, Path: path,
}, &buf); err != nil { }, &buf); err != nil {
return nil, toStorageErr(err) return nil, toStorageErr(err)
} }
@ -246,12 +220,12 @@ func (n networkStorage) ReadFile(volume string, path string, offset int64, buffe
return 0, errVolumeBusy return 0, errVolumeBusy
} }
var result []byte var result []byte
err = n.rpcClient.Call("Storage.ReadFileHandler", ReadFileArgs{ err = n.rpcClient.Call("Storage.ReadFileHandler", &ReadFileArgs{
Token: n.rpcToken, GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp},
Vol: volume, Vol: volume,
Path: path, Path: path,
Offset: offset, Offset: offset,
Size: len(buffer), Size: len(buffer),
}, &result) }, &result)
// Copy results to buffer. // Copy results to buffer.
copy(buffer, result) copy(buffer, result)
@ -264,10 +238,10 @@ func (n networkStorage) ListDir(volume, path string) (entries []string, err erro
if n.rpcClient == nil { if n.rpcClient == nil {
return nil, errVolumeBusy return nil, errVolumeBusy
} }
if err = n.rpcClient.Call("Storage.ListDirHandler", ListDirArgs{ if err = n.rpcClient.Call("Storage.ListDirHandler", &ListDirArgs{
Token: n.rpcToken, GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp},
Vol: volume, Vol: volume,
Path: path, Path: path,
}, &entries); err != nil { }, &entries); err != nil {
return nil, toStorageErr(err) return nil, toStorageErr(err)
} }
@ -281,10 +255,10 @@ func (n networkStorage) DeleteFile(volume, path string) (err error) {
return errVolumeBusy return errVolumeBusy
} }
reply := GenericReply{} reply := GenericReply{}
if err = n.rpcClient.Call("Storage.DeleteFileHandler", DeleteFileArgs{ if err = n.rpcClient.Call("Storage.DeleteFileHandler", &DeleteFileArgs{
Token: n.rpcToken, GenericArgs: GenericArgs{n.rpcClient.token, n.rpcClient.tstamp},
Vol: volume, Vol: volume,
Path: path, Path: path,
}, &reply); err != nil { }, &reply); err != nil {
return toStorageErr(err) return toStorageErr(err)
} }
@ -297,12 +271,12 @@ func (n networkStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string
return errVolumeBusy return errVolumeBusy
} }
reply := GenericReply{} reply := GenericReply{}
if err = n.rpcClient.Call("Storage.RenameFileHandler", RenameFileArgs{ if err = n.rpcClient.Call("Storage.RenameFileHandler", &RenameFileArgs{
Token: n.rpcToken, GenericArgs: GenericArgs{Token: n.rpcClient.token},
SrcVol: srcVolume, SrcVol: srcVolume,
SrcPath: srcPath, SrcPath: srcPath,
DstVol: dstVolume, DstVol: dstVolume,
DstPath: dstPath, DstPath: dstPath,
}, &reply); err != nil { }, &reply); err != nil {
return toStorageErr(err) return toStorageErr(err)
} }

View File

@ -16,6 +16,35 @@
package cmd package cmd
import "time"
// TokenSetter is to be implemented by types that need a way to update member that represents a token.
// e.g, See GenericArgs.
type TokenSetter interface {
SetToken(token string)
SetTimestamp(tstamp time.Time)
}
// GenericReply represents any generic RPC reply.
type GenericReply struct {
}
// GenericArgs represents any generic RPC arguments.
type GenericArgs struct {
Token string // Used to authenticate every RPC call.
Timestamp time.Time // Used to verify if the RPC call was issued between the same Login() and disconnect event pair.
}
// SetToken - sets the token to the supplied value.
func (ga *GenericArgs) SetToken(token string) {
ga.Token = token
}
// SetTimestamp - sets the timestamp to the supplied value.
func (ga *GenericArgs) SetTimestamp(tstamp time.Time) {
ga.Timestamp = tstamp
}
// RPCLoginArgs - login username and password for RPC. // RPCLoginArgs - login username and password for RPC.
type RPCLoginArgs struct { type RPCLoginArgs struct {
Username string Username string
@ -27,18 +56,13 @@ type RPCLoginArgs struct {
type RPCLoginReply struct { type RPCLoginReply struct {
Token string Token string
ServerVersion string ServerVersion string
Timestamp time.Time
} }
// GenericReply represents any generic RPC reply.
type GenericReply struct{}
// GenericArgs represents any generic RPC arguments.
type GenericArgs struct{}
// GenericVolArgs - generic volume args. // GenericVolArgs - generic volume args.
type GenericVolArgs struct { type GenericVolArgs struct {
// Authentication token generated by Login. // Authentication token generated by Login.
Token string GenericArgs
// Name of the volume. // Name of the volume.
Vol string Vol string
@ -53,8 +77,7 @@ type ListVolsReply struct {
// ReadAllArgs represents read all RPC arguments. // ReadAllArgs represents read all RPC arguments.
type ReadAllArgs struct { type ReadAllArgs struct {
// Authentication token generated by Login. // Authentication token generated by Login.
Token string GenericArgs
// Name of the volume. // Name of the volume.
Vol string Vol string
@ -65,8 +88,7 @@ type ReadAllArgs struct {
// ReadFileArgs represents read file RPC arguments. // ReadFileArgs represents read file RPC arguments.
type ReadFileArgs struct { type ReadFileArgs struct {
// Authentication token generated by Login. // Authentication token generated by Login.
Token string GenericArgs
// Name of the volume. // Name of the volume.
Vol string Vol string
@ -83,7 +105,7 @@ type ReadFileArgs struct {
// AppendFileArgs represents append file RPC arguments. // AppendFileArgs represents append file RPC arguments.
type AppendFileArgs struct { type AppendFileArgs struct {
// Authentication token generated by Login. // Authentication token generated by Login.
Token string GenericArgs
// Name of the volume. // Name of the volume.
Vol string Vol string
@ -98,7 +120,7 @@ type AppendFileArgs struct {
// StatFileArgs represents stat file RPC arguments. // StatFileArgs represents stat file RPC arguments.
type StatFileArgs struct { type StatFileArgs struct {
// Authentication token generated by Login. // Authentication token generated by Login.
Token string GenericArgs
// Name of the volume. // Name of the volume.
Vol string Vol string
@ -110,7 +132,7 @@ type StatFileArgs struct {
// DeleteFileArgs represents delete file RPC arguments. // DeleteFileArgs represents delete file RPC arguments.
type DeleteFileArgs struct { type DeleteFileArgs struct {
// Authentication token generated by Login. // Authentication token generated by Login.
Token string GenericArgs
// Name of the volume. // Name of the volume.
Vol string Vol string
@ -122,7 +144,7 @@ type DeleteFileArgs struct {
// ListDirArgs represents list contents RPC arguments. // ListDirArgs represents list contents RPC arguments.
type ListDirArgs struct { type ListDirArgs struct {
// Authentication token generated by Login. // Authentication token generated by Login.
Token string GenericArgs
// Name of the volume. // Name of the volume.
Vol string Vol string
@ -134,7 +156,7 @@ type ListDirArgs struct {
// RenameFileArgs represents rename file RPC arguments. // RenameFileArgs represents rename file RPC arguments.
type RenameFileArgs struct { type RenameFileArgs struct {
// Authentication token generated by Login. // Authentication token generated by Login.
Token string GenericArgs
// Name of source volume. // Name of source volume.
SrcVol string SrcVol string

View File

@ -88,8 +88,8 @@ func (s *storageServer) MakeVolHandler(args *GenericVolArgs, reply *GenericReply
} }
// ListVolsHandler - list vols handler is rpc wrapper for ListVols operation. // ListVolsHandler - list vols handler is rpc wrapper for ListVols operation.
func (s *storageServer) ListVolsHandler(token *string, reply *ListVolsReply) error { func (s *storageServer) ListVolsHandler(args *GenericArgs, reply *ListVolsReply) error {
if !isRPCTokenValid(*token) { if !isRPCTokenValid(args.Token) {
return errors.New("Invalid token") return errors.New("Invalid token")
} }
vols, err := s.storage.ListVols() vols, err := s.storage.ListVols()

View File

@ -19,7 +19,6 @@ package dsync
import ( import (
"math" "math"
"math/rand" "math/rand"
"net/rpc"
"sync" "sync"
"time" "time"
) )
@ -40,6 +39,20 @@ type Granted struct {
uid string uid string
} }
type LockArgs struct {
Token string
Timestamp time.Time
Name string
}
func (l *LockArgs) SetToken(token string) {
l.Token = token
}
func (l *LockArgs) SetTimestamp(tstamp time.Time) {
l.Timestamp = tstamp
}
func NewDRWMutex(name string) *DRWMutex { func NewDRWMutex(name string) *DRWMutex {
return &DRWMutex{ return &DRWMutex{
Name: name, Name: name,
@ -48,28 +61,6 @@ func NewDRWMutex(name string) *DRWMutex {
} }
} }
// Connect to respective lock server nodes on the first Lock() call.
func connectLazy() {
if clnts == nil {
panic("rpc client connections weren't initialized.")
}
for i := range clnts {
if clnts[i].rpc != nil {
continue
}
// Pass in unique path (as required by server.HandleHTTP().
// Ignore failure to connect, the lock server node may join the
// cluster later.
clnt, err := rpc.DialHTTPPath("tcp", nodes[i], rpcPaths[i])
if err != nil {
clnts[i].SetRPC(nil)
continue
}
clnts[i].SetRPC(clnt)
}
}
// RLock holds a read lock on dm. // RLock holds a read lock on dm.
// //
// If the lock is already in use, the calling goroutine // If the lock is already in use, the calling goroutine
@ -83,7 +74,6 @@ func (dm *DRWMutex) RLock() {
runs, backOff := 1, 1 runs, backOff := 1, 1
for { for {
connectLazy()
// create temp arrays on stack // create temp arrays on stack
locks := make([]bool, dnodeCount) locks := make([]bool, dnodeCount)
@ -128,8 +118,6 @@ func (dm *DRWMutex) Lock() {
runs, backOff := 1, 1 runs, backOff := 1, 1
for { for {
connectLazy()
// create temp arrays on stack // create temp arrays on stack
locks := make([]bool, dnodeCount) locks := make([]bool, dnodeCount)
ids := make([]string, dnodeCount) ids := make([]string, dnodeCount)
@ -161,7 +149,7 @@ func (dm *DRWMutex) Lock() {
// lock tries to acquire the distributed lock, returning true or false // lock tries to acquire the distributed lock, returning true or false
// //
func lock(clnts []*RPCClient, locks *[]bool, uids *[]string, lockName string, isReadLock bool) bool { func lock(clnts []RPC, locks *[]bool, uids *[]string, lockName string, isReadLock bool) bool {
// Create buffered channel of quorum size // Create buffered channel of quorum size
ch := make(chan Granted, dquorum) ch := make(chan Granted, dquorum)
@ -169,15 +157,15 @@ func lock(clnts []*RPCClient, locks *[]bool, uids *[]string, lockName string, is
for index, c := range clnts { for index, c := range clnts {
// broadcast lock request to all nodes // broadcast lock request to all nodes
go func(index int, isReadLock bool, c *RPCClient) { go func(index int, isReadLock bool, c RPC) {
// All client methods issuing RPCs are thread-safe and goroutine-safe, // All client methods issuing RPCs are thread-safe and goroutine-safe,
// i.e. it is safe to call them from multiple concurrently running go routines. // i.e. it is safe to call them from multiple concurrently running go routines.
var status bool var status bool
var err error var err error
if isReadLock { if isReadLock {
err = c.Call("Dsync.RLock", lockName, &status) err = c.Call("Dsync.RLock", &LockArgs{Name: lockName}, &status)
} else { } else {
err = c.Call("Dsync.Lock", lockName, &status) err = c.Call("Dsync.Lock", &LockArgs{Name: lockName}, &status)
} }
locked, uid := false, "" locked, uid := false, ""
@ -185,14 +173,8 @@ func lock(clnts []*RPCClient, locks *[]bool, uids *[]string, lockName string, is
locked = status locked = status
// TODO: Get UIOD again // TODO: Get UIOD again
uid = "" uid = ""
} else {
// If rpc call failed due to connection related errors, reset rpc.Client object
// to trigger reconnect on subsequent Lock()/Unlock() requests to the same node.
if IsRPCError(err) {
clnts[index].SetRPC(nil)
}
// silently ignore error, retry later
} }
// silently ignore error, retry later
ch <- Granted{index: index, locked: locked, uid: uid} ch <- Granted{index: index, locked: locked, uid: uid}
@ -277,7 +259,7 @@ func quorumMet(locks *[]bool) bool {
} }
// releaseAll releases all locks that are marked as locked // releaseAll releases all locks that are marked as locked
func releaseAll(clnts []*RPCClient, locks *[]bool, ids *[]string, lockName string, isReadLock bool) { func releaseAll(clnts []RPC, locks *[]bool, ids *[]string, lockName string, isReadLock bool) {
for lock := 0; lock < dnodeCount; lock++ { for lock := 0; lock < dnodeCount; lock++ {
if (*locks)[lock] { if (*locks)[lock] {
@ -337,38 +319,31 @@ func (dm *DRWMutex) Unlock() {
} }
// sendRelease sends a release message to a node that previously granted a lock // sendRelease sends a release message to a node that previously granted a lock
func sendRelease(c *RPCClient, name, uid string, isReadLock bool) { func sendRelease(c RPC, name, uid string, isReadLock bool) {
backOffArray := []time.Duration{30 * time.Second, 1 * time.Minute, 3 * time.Minute, 10 * time.Minute, 30 * time.Minute, 1 * time.Hour} backOffArray := []time.Duration{30 * time.Second, 1 * time.Minute, 3 * time.Minute, 10 * time.Minute, 30 * time.Minute, 1 * time.Hour}
go func(c *RPCClient, name, uid string) { go func(c RPC, name, uid string) {
for _, backOff := range backOffArray { for _, backOff := range backOffArray {
// Make sure we are connected
connectLazy()
// All client methods issuing RPCs are thread-safe and goroutine-safe, // All client methods issuing RPCs are thread-safe and goroutine-safe,
// i.e. it is safe to call them from multiple concurrently running goroutines. // i.e. it is safe to call them from multiple concurrently running goroutines.
var status bool var status bool
var err error var err error
// TODO: Send UID to server // TODO: Send UID to server
if isReadLock { if isReadLock {
if err = c.Call("Dsync.RUnlock", name, &status); err == nil { if err = c.Call("Dsync.RUnlock", &LockArgs{Name: name}, &status); err == nil {
// RUnlock delivered, exit out // RUnlock delivered, exit out
return return
} }
} else { } else {
if err = c.Call("Dsync.Unlock", name, &status); err == nil { if err = c.Call("Dsync.Unlock", &LockArgs{Name: name}, &status); err == nil {
// Unlock delivered, exit out // Unlock delivered, exit out
return return
} }
} }
// If rpc call failed due to connection related errors, reset rpc.Client object
// to trigger reconnect on subsequent Lock()/Unlock() requests to the same node.
c.SetRPC(nil)
// wait // wait
time.Sleep(backOff) time.Sleep(backOff)
} }

View File

@ -26,43 +26,30 @@ const DefaultPath = "/rpc/dsync"
// Number of nodes participating in the distributed locking. // Number of nodes participating in the distributed locking.
var dnodeCount int var dnodeCount int
// List of nodes participating.
var nodes []string
// List of rpc paths, one per lock server.
var rpcPaths []string
// List of rpc client objects, one per lock server. // List of rpc client objects, one per lock server.
var clnts []*RPCClient var clnts []RPC
// Simple majority based quorum, set to dNodeCount/2+1 // Simple majority based quorum, set to dNodeCount/2+1
var dquorum int var dquorum int
// SetNodesWithPath - initializes package-level global state variables such as // SetNodesWithPath - initializes package-level global state variables such as clnts.
// nodes, rpcPaths, clnts.
// N B - This function should be called only once inside any program that uses // N B - This function should be called only once inside any program that uses
// dsync. // dsync.
func SetNodesWithPath(nodeList []string, paths []string) (err error) { func SetNodesWithClients(rpcClnts []RPC) (err error) {
// Validate if number of nodes is within allowable range. // Validate if number of nodes is within allowable range.
if dnodeCount != 0 { if dnodeCount != 0 {
return errors.New("Cannot reinitialize dsync package") return errors.New("Cannot reinitialize dsync package")
} else if len(nodeList) < 4 { } else if len(rpcClnts) < 4 {
return errors.New("Dsync not designed for less than 4 nodes") return errors.New("Dsync not designed for less than 4 nodes")
} else if len(nodeList) > 16 { } else if len(rpcClnts) > 16 {
return errors.New("Dsync not designed for more than 16 nodes") return errors.New("Dsync not designed for more than 16 nodes")
} }
nodes = make([]string, len(nodeList)) dnodeCount = len(rpcClnts)
copy(nodes, nodeList[:])
rpcPaths = make([]string, len(paths))
copy(rpcPaths, paths[:])
dnodeCount = len(nodes)
dquorum = dnodeCount/2 + 1 dquorum = dnodeCount/2 + 1
clnts = make([]*RPCClient, dnodeCount)
// Initialize node name and rpc path for each RPCClient object. // Initialize node name and rpc path for each RPCClient object.
for i := range clnts { clnts = make([]RPC, dnodeCount)
clnts[i] = newClient(nodes[i], rpcPaths[i]) copy(clnts, rpcClnts)
}
return nil return nil
} }

29
vendor/github.com/minio/dsync/rpc-client-interface.go generated vendored Normal file
View File

@ -0,0 +1,29 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dsync
import "time"
type TokenSetter interface {
SetToken(token string)
SetTimestamp(tstamp time.Time)
}
type RPC interface {
Call(serviceMethod string, args TokenSetter, reply interface{}) error
Close() error
}

View File

@ -1,77 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dsync
import (
"net/rpc"
"reflect"
"sync"
)
// Wrapper type for rpc.Client that provides connection management like
// reconnect on first failure.
type RPCClient struct {
sync.Mutex
rpc *rpc.Client
node string
rpcPath string
}
func newClient(node, rpcPath string) *RPCClient {
return &RPCClient{
node: node,
rpcPath: rpcPath,
}
}
func (rpcClient *RPCClient) SetRPC(rpc *rpc.Client) {
rpcClient.Lock()
defer rpcClient.Unlock()
rpcClient.rpc = rpc
}
func (rpcClient *RPCClient) Call(serviceMethod string, args interface{}, reply interface{}) error {
rpcClient.Lock()
defer rpcClient.Unlock()
if rpcClient.rpc == nil {
return rpc.ErrShutdown
}
err := rpcClient.rpc.Call(serviceMethod, args, reply)
return err
}
func (rpcClient *RPCClient) Reconnect() error {
rpcClient.Lock()
defer rpcClient.Unlock()
clnt, err := rpc.DialHTTPPath("tcp", rpcClient.node, rpcClient.rpcPath)
if err != nil {
return err
}
rpcClient.rpc = clnt
return nil
}
func IsRPCError(err error) bool {
// The following are net/rpc specific errors that indicate that
// the connection may have been reset.
if err == rpc.ErrShutdown ||
reflect.TypeOf(err) == reflect.TypeOf((*rpc.ServerError)(nil)).Elem() {
return true
}
return false
}

6
vendor/vendor.json vendored
View File

@ -98,10 +98,10 @@
"revisionTime": "2015-11-18T20:00:48-08:00" "revisionTime": "2015-11-18T20:00:48-08:00"
}, },
{ {
"checksumSHA1": "kbVCnnU0gR/i8WA8Gs2I+/7kONY=", "checksumSHA1": "UmlhYLEvnNk+1e4CEDpVZ3c5mhQ=",
"path": "github.com/minio/dsync", "path": "github.com/minio/dsync",
"revision": "8f4819554f1f4fffc2e1c8c706b23e5c844997f4", "revision": "a095ea2cf13223a1bf7e20efcb83edacc3a610c1",
"revisionTime": "2016-08-17T23:34:37Z" "revisionTime": "2016-08-22T23:56:01Z"
}, },
{ {
"path": "github.com/minio/go-homedir", "path": "github.com/minio/go-homedir",