controller/auth: Implement JWT based authorization for controller. (#2544)

Fixes #2474
This commit is contained in:
Harshavardhana 2016-08-24 10:14:14 -07:00
parent 200d327737
commit 9605fde04d
15 changed files with 240 additions and 162 deletions

View File

@ -17,29 +17,95 @@
package cmd package cmd
import ( import (
"fmt"
"net/rpc" "net/rpc"
"time" "time"
"github.com/minio/dsync" jwtgo "github.com/dgrijalva/jwt-go"
) )
// AuthRPCClient is a wrapper type for RPCClient which provides JWT based authentication across reconnects. // GenericReply represents any generic RPC reply.
type AuthRPCClient struct { type GenericReply struct{}
rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client
cred credential // AccessKey and SecretKey // GenericArgs represents any generic RPC arguments.
isLoggedIn bool // Indicates if the auth client has been logged in and token is valid. type GenericArgs struct {
token string // JWT based token Token string // Used to authenticate every RPC call.
tstamp time.Time // Timestamp as received on Login RPC. Timestamp time.Time // Used to verify if the RPC call was issued between the same Login() and disconnect event pair.
}
// SetToken - sets the token to the supplied value.
func (ga *GenericArgs) SetToken(token string) {
ga.Token = token
}
// SetTimestamp - sets the timestamp to the supplied value.
func (ga *GenericArgs) SetTimestamp(tstamp time.Time) {
ga.Timestamp = tstamp
}
// RPCLoginArgs - login username and password for RPC.
type RPCLoginArgs struct {
Username string
Password string
}
// RPCLoginReply - login reply provides generated token to be used
// with subsequent requests.
type RPCLoginReply struct {
Token string
ServerVersion string
Timestamp time.Time
}
// Validates if incoming token is valid.
func isRPCTokenValid(tokenStr string) bool {
jwt, err := newJWT(defaultTokenExpiry) // Expiry set to 100yrs.
if err != nil {
errorIf(err, "Unable to initialize JWT")
return false
}
token, err := jwtgo.Parse(tokenStr, func(token *jwtgo.Token) (interface{}, error) {
if _, ok := token.Method.(*jwtgo.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("Unexpected signing method: %v", token.Header["alg"])
}
return []byte(jwt.SecretAccessKey), nil
})
if err != nil {
errorIf(err, "Unable to parse JWT token string")
return false
}
// Return if token is valid.
return token.Valid
}
// Auth config represents authentication credentials and Login method name to be used
// for fetching JWT tokens from the RPC server.
type authConfig struct {
accessKey string // Username for the server.
secretKey string // Password for the server.
address string // Network address path of RPC server.
path string // Network path for HTTP dial.
loginMethod string // RPC service name for authenticating using JWT loginMethod string // RPC service name for authenticating using JWT
} }
// AuthRPCClient is a wrapper type for RPCClient which provides JWT based authentication across reconnects.
type AuthRPCClient struct {
config *authConfig
rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client
isLoggedIn bool // Indicates if the auth client has been logged in and token is valid.
token string // JWT based token
tstamp time.Time // Timestamp as received on Login RPC.
}
// newAuthClient - returns a jwt based authenticated (go) rpc client, which does automatic reconnect. // newAuthClient - returns a jwt based authenticated (go) rpc client, which does automatic reconnect.
func newAuthClient(node, rpcPath string, cred credential, loginMethod string) *AuthRPCClient { func newAuthClient(cfg *authConfig) *AuthRPCClient {
return &AuthRPCClient{ return &AuthRPCClient{
rpc: newClient(node, rpcPath), // Save the config.
cred: cred, config: cfg,
isLoggedIn: false, // Not logged in yet. // Initialize a new reconnectable rpc client.
loginMethod: loginMethod, rpc: newClient(cfg.address, cfg.path),
// Allocated auth client not logged in yet.
isLoggedIn: false,
} }
} }
@ -57,9 +123,9 @@ func (authClient *AuthRPCClient) Login() error {
return nil return nil
} }
reply := RPCLoginReply{} reply := RPCLoginReply{}
if err := authClient.rpc.Call(authClient.loginMethod, RPCLoginArgs{ if err := authClient.rpc.Call(authClient.config.loginMethod, RPCLoginArgs{
Username: authClient.cred.AccessKeyID, Username: authClient.config.accessKey,
Password: authClient.cred.SecretAccessKey, Password: authClient.config.secretKey,
}, &reply); err != nil { }, &reply); err != nil {
return err return err
} }
@ -73,7 +139,10 @@ func (authClient *AuthRPCClient) Login() error {
// Call - If rpc connection isn't established yet since previous disconnect, // Call - If rpc connection isn't established yet since previous disconnect,
// connection is established, a jwt authenticated login is performed and then // connection is established, a jwt authenticated login is performed and then
// the call is performed. // the call is performed.
func (authClient *AuthRPCClient) Call(serviceMethod string, args dsync.TokenSetter, reply interface{}) (err error) { func (authClient *AuthRPCClient) Call(serviceMethod string, args interface {
SetToken(token string)
SetTimestamp(tstamp time.Time)
}, reply interface{}) (err error) {
// On successful login, attempt the call. // On successful login, attempt the call.
if err = authClient.Login(); err == nil { if err = authClient.Login(); err == nil {
// Set token and timestamp before the rpc call. // Set token and timestamp before the rpc call.

View File

@ -18,7 +18,6 @@ package cmd
import ( import (
"fmt" "fmt"
"net/rpc"
"net/url" "net/url"
"path" "path"
"strings" "strings"
@ -80,17 +79,22 @@ func healControl(ctx *cli.Context) {
cli.ShowCommandHelpAndExit(ctx, "heal", 1) cli.ShowCommandHelpAndExit(ctx, "heal", 1)
} }
client, err := rpc.DialHTTPPath("tcp", parsedURL.Host, path.Join(reservedBucket, controlPath)) authCfg := &authConfig{
fatalIf(err, "Unable to connect to %s", parsedURL.Host) accessKey: serverConfig.GetCredential().AccessKeyID,
secretKey: serverConfig.GetCredential().SecretAccessKey,
address: parsedURL.Host,
path: path.Join(reservedBucket, controlPath),
loginMethod: "Controller.LoginHandler",
}
client := newAuthClient(authCfg)
// If object does not have trailing "/" then it's an object, hence heal it. // If object does not have trailing "/" then it's an object, hence heal it.
if objectName != "" && !strings.HasSuffix(objectName, slashSeparator) { if objectName != "" && !strings.HasSuffix(objectName, slashSeparator) {
fmt.Printf("Healing : /%s/%s", bucketName, objectName) fmt.Printf("Healing : /%s/%s\n", bucketName, objectName)
args := &HealObjectArgs{bucketName, objectName} args := &HealObjectArgs{Bucket: bucketName, Object: objectName}
reply := &HealObjectReply{} reply := &HealObjectReply{}
err = client.Call("Control.HealObject", args, reply) err = client.Call("Controller.HealObjectHandler", args, reply)
fatalIf(err, "RPC Control.HealObject call failed") errorIf(err, "Healing object %s failed.", objectName)
fmt.Println()
return return
} }
@ -98,23 +102,32 @@ func healControl(ctx *cli.Context) {
prefix := objectName prefix := objectName
marker := "" marker := ""
for { for {
args := HealListArgs{bucketName, prefix, marker, "", 1000} args := &HealListArgs{
Bucket: bucketName,
Prefix: prefix,
Marker: marker,
Delimiter: "",
MaxKeys: 1000,
}
reply := &HealListReply{} reply := &HealListReply{}
err = client.Call("Control.ListObjectsHeal", args, reply) err = client.Call("Controller.ListObjectsHealHandler", args, reply)
fatalIf(err, "RPC Heal.ListObjects call failed") fatalIf(err, "Unable to list objects for healing.")
// Heal the objects returned in the ListObjects reply. // Heal the objects returned in the ListObjects reply.
for _, obj := range reply.Objects { for _, obj := range reply.Objects {
fmt.Printf("Healing : /%s/%s", bucketName, obj) fmt.Printf("Healing : /%s/%s\n", bucketName, obj)
reply := &HealObjectReply{} reply := &GenericReply{}
err = client.Call("Control.HealObject", HealObjectArgs{bucketName, obj}, reply) healArgs := &HealObjectArgs{Bucket: bucketName, Object: obj}
fatalIf(err, "RPC Heal.HealObject call failed") err = client.Call("Controller.HealObjectHandler", healArgs, reply)
fmt.Println() errorIf(err, "Healing object %s failed.", obj)
} }
if !reply.IsTruncated { if !reply.IsTruncated {
// End of listing. // End of listing.
break break
} }
// Set the marker to list the next set of keys.
marker = reply.NextMarker marker = reply.NextMarker
} }
} }

View File

@ -17,7 +17,6 @@
package cmd package cmd
import ( import (
"net/rpc"
"net/url" "net/url"
"path" "path"
@ -55,14 +54,19 @@ func shutdownControl(c *cli.Context) {
cli.ShowCommandHelpAndExit(c, "shutdown", 1) cli.ShowCommandHelpAndExit(c, "shutdown", 1)
} }
parsedURL, err := url.ParseRequestURI(c.Args()[0]) parsedURL, err := url.Parse(c.Args()[0])
fatalIf(err, "Unable to parse URL") fatalIf(err, "Unable to parse URL.")
client, err := rpc.DialHTTPPath("tcp", parsedURL.Host, path.Join(reservedBucket, controlPath)) authCfg := &authConfig{
fatalIf(err, "Unable to connect to %s", parsedURL.Host) accessKey: serverConfig.GetCredential().AccessKeyID,
secretKey: serverConfig.GetCredential().SecretAccessKey,
args := &ShutdownArgs{Reboot: c.Bool("restart")} address: parsedURL.Host,
reply := &ShutdownReply{} path: path.Join(reservedBucket, controlPath),
err = client.Call("Control.Shutdown", args, reply) loginMethod: "Controller.LoginHandler",
fatalIf(err, "RPC Control.Shutdown call failed") }
client := newAuthClient(authCfg)
args := &ShutdownArgs{Restart: c.Bool("restart")}
err = client.Call("Controller.ShutdownHandler", args, &GenericReply{})
errorIf(err, "Shutting down Minio server at %s failed.", parsedURL.Host)
} }

View File

@ -16,8 +16,31 @@
package cmd package cmd
/// Auth operations
// Login - login handler.
func (c *controllerAPIHandlers) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) error {
jwt, err := newJWT(defaultTokenExpiry)
if err != nil {
return err
}
if err = jwt.Authenticate(args.Username, args.Password); err != nil {
return err
}
token, err := jwt.GenerateToken(args.Username)
if err != nil {
return err
}
reply.Token = token
reply.ServerVersion = Version
return nil
}
// HealListArgs - argument for ListObjects RPC. // HealListArgs - argument for ListObjects RPC.
type HealListArgs struct { type HealListArgs struct {
// Authentication token generated by Login.
GenericArgs
Bucket string Bucket string
Prefix string Prefix string
Marker string Marker string
@ -25,7 +48,7 @@ type HealListArgs struct {
MaxKeys int MaxKeys int
} }
// HealListReply - reply by ListObjects RPC. // HealListReply - reply object by ListObjects RPC.
type HealListReply struct { type HealListReply struct {
IsTruncated bool IsTruncated bool
NextMarker string NextMarker string
@ -33,12 +56,15 @@ type HealListReply struct {
} }
// ListObjects - list all objects that needs healing. // ListObjects - list all objects that needs healing.
func (c *controllerAPIHandlers) ListObjectsHeal(arg *HealListArgs, reply *HealListReply) error { func (c *controllerAPIHandlers) ListObjectsHealHandler(args *HealListArgs, reply *HealListReply) error {
objAPI := c.ObjectAPI() objAPI := c.ObjectAPI()
if objAPI == nil { if objAPI == nil {
return errInvalidArgument return errVolumeBusy
} }
info, err := objAPI.ListObjectsHeal(arg.Bucket, arg.Prefix, arg.Marker, arg.Delimiter, arg.MaxKeys) if !isRPCTokenValid(args.Token) {
return errInvalidToken
}
info, err := objAPI.ListObjectsHeal(args.Bucket, args.Prefix, args.Marker, args.Delimiter, args.MaxKeys)
if err != nil { if err != nil {
return err return err
} }
@ -52,7 +78,13 @@ func (c *controllerAPIHandlers) ListObjectsHeal(arg *HealListArgs, reply *HealLi
// HealObjectArgs - argument for HealObject RPC. // HealObjectArgs - argument for HealObject RPC.
type HealObjectArgs struct { type HealObjectArgs struct {
// Authentication token generated by Login.
GenericArgs
// Name of the bucket.
Bucket string Bucket string
// Name of the object.
Object string Object string
} }
@ -60,26 +92,33 @@ type HealObjectArgs struct {
type HealObjectReply struct{} type HealObjectReply struct{}
// HealObject - heal the object. // HealObject - heal the object.
func (c *controllerAPIHandlers) HealObject(arg *HealObjectArgs, reply *HealObjectReply) error { func (c *controllerAPIHandlers) HealObjectHandler(args *HealObjectArgs, reply *GenericReply) error {
objAPI := c.ObjectAPI() objAPI := c.ObjectAPI()
if objAPI == nil { if objAPI == nil {
return errInvalidArgument return errVolumeBusy
} }
return objAPI.HealObject(arg.Bucket, arg.Object) if !isRPCTokenValid(args.Token) {
return errInvalidToken
}
return objAPI.HealObject(args.Bucket, args.Object)
} }
// ShutdownArgs - argument for Shutdown RPC. // ShutdownArgs - argument for Shutdown RPC.
type ShutdownArgs struct { type ShutdownArgs struct {
Reboot bool // Authentication token generated by Login.
GenericArgs
// Should the server be restarted, call active connections are served before server
// is restarted.
Restart bool
} }
// ShutdownReply - reply by Shutdown RPC. // Shutdown - Shutsdown the server.
type ShutdownReply struct{} func (c *controllerAPIHandlers) ShutdownHandler(args *ShutdownArgs, reply *GenericReply) error {
if !isRPCTokenValid(args.Token) {
// Shutdown - Shutdown the server. return errInvalidToken
}
func (c *controllerAPIHandlers) Shutdown(arg *ShutdownArgs, reply *ShutdownReply) error { if args.Restart {
if arg.Reboot {
globalShutdownSignalCh <- shutdownRestart globalShutdownSignalCh <- shutdownRestart
} else { } else {
globalShutdownSignalCh <- shutdownHalt globalShutdownSignalCh <- shutdownHalt

View File

@ -27,10 +27,10 @@ const (
controlPath = "/controller" controlPath = "/controller"
) )
// Register control RPC handlers. // Register controller RPC handlers.
func registerControlRPCRouter(mux *router.Router, ctrlHandlers *controllerAPIHandlers) { func registerControllerRPCRouter(mux *router.Router, ctrlHandlers *controllerAPIHandlers) {
ctrlRPCServer := rpc.NewServer() ctrlRPCServer := rpc.NewServer()
ctrlRPCServer.RegisterName("Control", ctrlHandlers) ctrlRPCServer.RegisterName("Controller", ctrlHandlers)
ctrlRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter() ctrlRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter()
ctrlRouter.Path(controlPath).Handler(ctrlRPCServer) ctrlRouter.Path(controlPath).Handler(ctrlRPCServer)

View File

@ -17,7 +17,6 @@
package cmd package cmd
import ( import (
"errors"
"fmt" "fmt"
"net/rpc" "net/rpc"
"path" "path"
@ -59,10 +58,10 @@ type lockServer struct {
func (l *lockServer) verifyArgs(args *LockArgs) error { func (l *lockServer) verifyArgs(args *LockArgs) error {
if !l.timestamp.Equal(args.Timestamp) { if !l.timestamp.Equal(args.Timestamp) {
return errors.New("Timestamps don't match, server may have restarted.") return errInvalidTimestamp
} }
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {
return errors.New("Invalid token") return errInvalidToken
} }
return nil return nil
} }

View File

@ -34,14 +34,19 @@ var nsMutex *nsLockMap
func initDsyncNodes(disks []string, port int) error { func initDsyncNodes(disks []string, port int) error {
serverPort := strconv.Itoa(port) serverPort := strconv.Itoa(port)
cred := serverConfig.GetCredential() cred := serverConfig.GetCredential()
loginMethod := "Dsync.LoginHandler"
// Initialize rpc lock client information only if this instance is a distributed setup. // Initialize rpc lock client information only if this instance is a distributed setup.
var clnts []dsync.RPC var clnts []dsync.RPC
for _, disk := range disks { for _, disk := range disks {
if idx := strings.LastIndex(disk, ":"); idx != -1 { if idx := strings.LastIndex(disk, ":"); idx != -1 {
dsyncAddr := disk[:idx] + ":" + serverPort // Construct a new dsync server addr. clnts = append(clnts, newAuthClient(&authConfig{
rpcPath := pathutil.Join(lockRPCPath, disk[idx+1:]) // Construct a new rpc path for the disk. accessKey: cred.AccessKeyID,
clnts = append(clnts, newAuthClient(dsyncAddr, rpcPath, cred, loginMethod)) secretKey: cred.SecretAccessKey,
// Construct a new dsync server addr.
address: disk[:idx] + ":" + serverPort,
// Construct a new rpc path for the disk.
path: pathutil.Join(lockRPCPath, disk[idx+1:]),
loginMethod: "Dsync.LoginHandler",
}))
} }
} }
return dsync.SetNodesWithClients(clnts) return dsync.SetNodesWithClients(clnts)

View File

@ -109,7 +109,7 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
} }
// Initialize Controller. // Initialize Controller.
ctrlHandlers := &controllerAPIHandlers{ controllerHandlers := &controllerAPIHandlers{
ObjectAPI: newObjectLayerFn, ObjectAPI: newObjectLayerFn,
} }
@ -122,11 +122,8 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
// Initialize distributed NS lock. // Initialize distributed NS lock.
initDistributedNSLock(mux, srvCmdConfig) initDistributedNSLock(mux, srvCmdConfig)
// FIXME: till net/rpc auth is brought in "minio control" can be enabled only though // Register controller rpc router.
// this env variable. registerControllerRPCRouter(mux, controllerHandlers)
if !strings.EqualFold(os.Getenv("MINIO_CONTROL"), "off") {
registerControlRPCRouter(mux, ctrlHandlers)
}
// set environmental variable MINIO_BROWSER=off to disable minio web browser. // set environmental variable MINIO_BROWSER=off to disable minio web browser.
// By default minio web browser is enabled. // By default minio web browser is enabled.
@ -134,11 +131,10 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
registerWebRouter(mux, webHandlers) registerWebRouter(mux, webHandlers)
} }
registerAPIRouter(mux, apiHandlers)
// Add new routers here. // Add new routers here.
registerAPIRouter(mux, apiHandlers)
// List of some generic handlers which are applied for all // List of some generic handlers which are applied for all incoming requests.
// incoming requests.
var handlerFns = []HandlerFunc{ var handlerFns = []HandlerFunc{
// Limits the number of concurrent http requests. // Limits the number of concurrent http requests.
setRateLimitHandler, setRateLimitHandler,

View File

@ -24,7 +24,6 @@ import (
) )
type networkStorage struct { type networkStorage struct {
netScheme string
netAddr string netAddr string
netPath string netPath string
rpcClient *AuthRPCClient rpcClient *AuthRPCClient
@ -93,15 +92,15 @@ func newRPCClient(networkPath string) (StorageAPI, error) {
rpcAddr := netAddr + ":" + strconv.Itoa(port) rpcAddr := netAddr + ":" + strconv.Itoa(port)
// Initialize rpc client with network address and rpc path. // Initialize rpc client with network address and rpc path.
cred := serverConfig.GetCredential() cred := serverConfig.GetCredential()
rpcClient := newAuthClient(rpcAddr, rpcPath, cred, "Storage.LoginHandler") rpcClient := newAuthClient(&authConfig{
accessKey: cred.AccessKeyID,
secretKey: cred.SecretAccessKey,
address: rpcAddr,
path: rpcPath,
loginMethod: "Storage.LoginHandler",
})
// Initialize network storage. // Initialize network storage.
ndisk := &networkStorage{ ndisk := &networkStorage{
netScheme: func() string {
if !isSSL() {
return "http"
}
return "https"
}(),
netAddr: netAddr, netAddr: netAddr,
netPath: netPath, netPath: netPath,
rpcClient: rpcClient, rpcClient: rpcClient,

View File

@ -16,41 +16,6 @@
package cmd package cmd
import "time"
// GenericReply represents any generic RPC reply.
type GenericReply struct{}
// GenericArgs represents any generic RPC arguments.
type GenericArgs struct {
Token string // Used to authenticate every RPC call.
Timestamp time.Time // Used to verify if the RPC call was issued between the same Login() and disconnect event pair.
}
// SetToken - sets the token to the supplied value.
func (ga *GenericArgs) SetToken(token string) {
ga.Token = token
}
// SetTimestamp - sets the timestamp to the supplied value.
func (ga *GenericArgs) SetTimestamp(tstamp time.Time) {
ga.Timestamp = tstamp
}
// RPCLoginArgs - login username and password for RPC.
type RPCLoginArgs struct {
Username string
Password string
}
// RPCLoginReply - login reply provides generated token to be used
// with subsequent requests.
type RPCLoginReply struct {
Token string
ServerVersion string
Timestamp time.Time
}
// GenericVolArgs - generic volume args. // GenericVolArgs - generic volume args.
type GenericVolArgs struct { type GenericVolArgs struct {
// Authentication token generated by Login. // Authentication token generated by Login.

View File

@ -18,14 +18,11 @@ package cmd
import ( import (
"bytes" "bytes"
"errors"
"fmt"
"io" "io"
"net/rpc" "net/rpc"
"path" "path"
"strings" "strings"
jwtgo "github.com/dgrijalva/jwt-go"
router "github.com/gorilla/mux" router "github.com/gorilla/mux"
) )
@ -36,27 +33,6 @@ type storageServer struct {
path string path string
} }
// Validates if incoming token is valid.
func isRPCTokenValid(tokenStr string) bool {
jwt, err := newJWT(defaultWebTokenExpiry) // Expiry set to 24Hrs.
if err != nil {
errorIf(err, "Unable to initialize JWT")
return false
}
token, err := jwtgo.Parse(tokenStr, func(token *jwtgo.Token) (interface{}, error) {
if _, ok := token.Method.(*jwtgo.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("Unexpected signing method: %v", token.Header["alg"])
}
return []byte(jwt.SecretAccessKey), nil
})
if err != nil {
errorIf(err, "Unable to parse JWT token string")
return false
}
// Return if token is valid.
return token.Valid
}
/// Auth operations /// Auth operations
// Login - login handler. // Login - login handler.
@ -82,7 +58,7 @@ func (s *storageServer) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) e
// MakeVolHandler - make vol handler is rpc wrapper for MakeVol operation. // MakeVolHandler - make vol handler is rpc wrapper for MakeVol operation.
func (s *storageServer) MakeVolHandler(args *GenericVolArgs, reply *GenericReply) error { func (s *storageServer) MakeVolHandler(args *GenericVolArgs, reply *GenericReply) error {
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {
return errors.New("Invalid token") return errInvalidToken
} }
return s.storage.MakeVol(args.Vol) return s.storage.MakeVol(args.Vol)
} }
@ -90,7 +66,7 @@ func (s *storageServer) MakeVolHandler(args *GenericVolArgs, reply *GenericReply
// ListVolsHandler - list vols handler is rpc wrapper for ListVols operation. // ListVolsHandler - list vols handler is rpc wrapper for ListVols operation.
func (s *storageServer) ListVolsHandler(args *GenericArgs, reply *ListVolsReply) error { func (s *storageServer) ListVolsHandler(args *GenericArgs, reply *ListVolsReply) error {
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {
return errors.New("Invalid token") return errInvalidToken
} }
vols, err := s.storage.ListVols() vols, err := s.storage.ListVols()
if err != nil { if err != nil {
@ -103,7 +79,7 @@ func (s *storageServer) ListVolsHandler(args *GenericArgs, reply *ListVolsReply)
// StatVolHandler - stat vol handler is a rpc wrapper for StatVol operation. // StatVolHandler - stat vol handler is a rpc wrapper for StatVol operation.
func (s *storageServer) StatVolHandler(args *GenericVolArgs, reply *VolInfo) error { func (s *storageServer) StatVolHandler(args *GenericVolArgs, reply *VolInfo) error {
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {
return errors.New("Invalid token") return errInvalidToken
} }
volInfo, err := s.storage.StatVol(args.Vol) volInfo, err := s.storage.StatVol(args.Vol)
if err != nil { if err != nil {
@ -117,7 +93,7 @@ func (s *storageServer) StatVolHandler(args *GenericVolArgs, reply *VolInfo) err
// DeleteVol operation. // DeleteVol operation.
func (s *storageServer) DeleteVolHandler(args *GenericVolArgs, reply *GenericReply) error { func (s *storageServer) DeleteVolHandler(args *GenericVolArgs, reply *GenericReply) error {
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {
return errors.New("Invalid token") return errInvalidToken
} }
return s.storage.DeleteVol(args.Vol) return s.storage.DeleteVol(args.Vol)
} }
@ -127,7 +103,7 @@ func (s *storageServer) DeleteVolHandler(args *GenericVolArgs, reply *GenericRep
// StatFileHandler - stat file handler is rpc wrapper to stat file. // StatFileHandler - stat file handler is rpc wrapper to stat file.
func (s *storageServer) StatFileHandler(args *StatFileArgs, reply *FileInfo) error { func (s *storageServer) StatFileHandler(args *StatFileArgs, reply *FileInfo) error {
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {
return errors.New("Invalid token") return errInvalidToken
} }
fileInfo, err := s.storage.StatFile(args.Vol, args.Path) fileInfo, err := s.storage.StatFile(args.Vol, args.Path)
if err != nil { if err != nil {
@ -140,7 +116,7 @@ func (s *storageServer) StatFileHandler(args *StatFileArgs, reply *FileInfo) err
// ListDirHandler - list directory handler is rpc wrapper to list dir. // ListDirHandler - list directory handler is rpc wrapper to list dir.
func (s *storageServer) ListDirHandler(args *ListDirArgs, reply *[]string) error { func (s *storageServer) ListDirHandler(args *ListDirArgs, reply *[]string) error {
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {
return errors.New("Invalid token") return errInvalidToken
} }
entries, err := s.storage.ListDir(args.Vol, args.Path) entries, err := s.storage.ListDir(args.Vol, args.Path)
if err != nil { if err != nil {
@ -153,7 +129,7 @@ func (s *storageServer) ListDirHandler(args *ListDirArgs, reply *[]string) error
// ReadAllHandler - read all handler is rpc wrapper to read all storage API. // ReadAllHandler - read all handler is rpc wrapper to read all storage API.
func (s *storageServer) ReadAllHandler(args *ReadFileArgs, reply *[]byte) error { func (s *storageServer) ReadAllHandler(args *ReadFileArgs, reply *[]byte) error {
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {
return errors.New("Invalid token") return errInvalidToken
} }
buf, err := s.storage.ReadAll(args.Vol, args.Path) buf, err := s.storage.ReadAll(args.Vol, args.Path)
if err != nil { if err != nil {
@ -172,7 +148,7 @@ func (s *storageServer) ReadFileHandler(args *ReadFileArgs, reply *[]byte) (err
} }
}() // Do not crash the server. }() // Do not crash the server.
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {
return errors.New("Invalid token") return errInvalidToken
} }
// Allocate the requested buffer from the client. // Allocate the requested buffer from the client.
*reply = make([]byte, args.Size) *reply = make([]byte, args.Size)
@ -192,7 +168,7 @@ func (s *storageServer) ReadFileHandler(args *ReadFileArgs, reply *[]byte) (err
// AppendFileHandler - append file handler is rpc wrapper to append file. // AppendFileHandler - append file handler is rpc wrapper to append file.
func (s *storageServer) AppendFileHandler(args *AppendFileArgs, reply *GenericReply) error { func (s *storageServer) AppendFileHandler(args *AppendFileArgs, reply *GenericReply) error {
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {
return errors.New("Invalid token") return errInvalidToken
} }
return s.storage.AppendFile(args.Vol, args.Path, args.Buffer) return s.storage.AppendFile(args.Vol, args.Path, args.Buffer)
} }
@ -200,7 +176,7 @@ func (s *storageServer) AppendFileHandler(args *AppendFileArgs, reply *GenericRe
// DeleteFileHandler - delete file handler is rpc wrapper to delete file. // DeleteFileHandler - delete file handler is rpc wrapper to delete file.
func (s *storageServer) DeleteFileHandler(args *DeleteFileArgs, reply *GenericReply) error { func (s *storageServer) DeleteFileHandler(args *DeleteFileArgs, reply *GenericReply) error {
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {
return errors.New("Invalid token") return errInvalidToken
} }
return s.storage.DeleteFile(args.Vol, args.Path) return s.storage.DeleteFile(args.Vol, args.Path)
} }
@ -208,7 +184,7 @@ func (s *storageServer) DeleteFileHandler(args *DeleteFileArgs, reply *GenericRe
// RenameFileHandler - rename file handler is rpc wrapper to rename file. // RenameFileHandler - rename file handler is rpc wrapper to rename file.
func (s *storageServer) RenameFileHandler(args *RenameFileArgs, reply *GenericReply) error { func (s *storageServer) RenameFileHandler(args *RenameFileArgs, reply *GenericReply) error {
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {
return errors.New("Invalid token") return errInvalidToken
} }
return s.storage.RenameFile(args.SrcVol, args.SrcPath, args.DstVol, args.DstPath) return s.storage.RenameFile(args.SrcVol, args.SrcPath, args.DstVol, args.DstPath)
} }

View File

@ -38,6 +38,9 @@ var errSignatureMismatch = errors.New("Signature does not match")
// used when token used for authentication by the MinioBrowser has expired // used when token used for authentication by the MinioBrowser has expired
var errInvalidToken = errors.New("Invalid token") var errInvalidToken = errors.New("Invalid token")
// used when cached timestamp do not match with what client remembers.
var errInvalidTimestamp = errors.New("Timestamps don't match, server may have restarted.")
// If x-amz-content-sha256 header value mismatches with what we calculate. // If x-amz-content-sha256 header value mismatches with what we calculate.
var errContentSHA256Mismatch = errors.New("sha256 mismatch") var errContentSHA256Mismatch = errors.New("sha256 mismatch")

View File

@ -19,6 +19,7 @@ package dsync
import ( import (
"math" "math"
"math/rand" "math/rand"
"net"
"sync" "sync"
"time" "time"
) )
@ -336,11 +337,21 @@ func sendRelease(c RPC, name, uid string, isReadLock bool) {
if err = c.Call("Dsync.RUnlock", &LockArgs{Name: name}, &status); err == nil { if err = c.Call("Dsync.RUnlock", &LockArgs{Name: name}, &status); err == nil {
// RUnlock delivered, exit out // RUnlock delivered, exit out
return return
} else if err != nil {
if nErr, ok := err.(net.Error); ok && nErr.Timeout() {
// RUnlock possibly failed with server timestamp mismatch, server may have restarted.
return
}
} }
} else { } else {
if err = c.Call("Dsync.Unlock", &LockArgs{Name: name}, &status); err == nil { if err = c.Call("Dsync.Unlock", &LockArgs{Name: name}, &status); err == nil {
// Unlock delivered, exit out // Unlock delivered, exit out
return return
} else if err != nil {
if nErr, ok := err.(net.Error); ok && nErr.Timeout() {
// Unlock possibly failed with server timestamp mismatch, server may have restarted.
return
}
} }
} }

View File

@ -18,12 +18,11 @@ package dsync
import "time" import "time"
type TokenSetter interface { // RPC - is dsync compatible client interface.
type RPC interface {
Call(serviceMethod string, args interface {
SetToken(token string) SetToken(token string)
SetTimestamp(tstamp time.Time) SetTimestamp(tstamp time.Time)
} }, reply interface{}) error
type RPC interface {
Call(serviceMethod string, args TokenSetter, reply interface{}) error
Close() error Close() error
} }

6
vendor/vendor.json vendored
View File

@ -98,10 +98,10 @@
"revisionTime": "2015-11-18T20:00:48-08:00" "revisionTime": "2015-11-18T20:00:48-08:00"
}, },
{ {
"checksumSHA1": "UmlhYLEvnNk+1e4CEDpVZ3c5mhQ=", "checksumSHA1": "OOADbvXPHaDRzp8WEvNw6esmfu0=",
"path": "github.com/minio/dsync", "path": "github.com/minio/dsync",
"revision": "a095ea2cf13223a1bf7e20efcb83edacc3a610c1", "revision": "a2d8949cd8284e6cfa5b2ff9b617e4edb87f513f",
"revisionTime": "2016-08-22T23:56:01Z" "revisionTime": "2016-08-24T09:12:34Z"
}, },
{ {
"path": "github.com/minio/go-homedir", "path": "github.com/minio/go-homedir",