prep: Initialization should wait instead of exit the servers. (#2872)

- Servers do not exit for invalid credentials instead they print and wait.
- Servers do not exit for version mismatch instead they print and wait.
- Servers do not exit for time differences between nodes they print and wait.
This commit is contained in:
Harshavardhana 2016-10-07 11:15:55 -07:00 committed by GitHub
parent e53a9f6cab
commit f1bc9343a1
16 changed files with 161 additions and 32 deletions

View File

@ -135,6 +135,10 @@ func (authClient *AuthRPCClient) Login() error {
if reply.ServerVersion != Version { if reply.ServerVersion != Version {
return errServerVersionMismatch return errServerVersionMismatch
} }
curTime := time.Now().UTC()
if curTime.Sub(reply.Timestamp) > globalMaxSkewTime {
return errServerTimeMismatch
}
// Set token, time stamp as received from a successful login call. // Set token, time stamp as received from a successful login call.
authClient.token = reply.Token authClient.token = reply.Token
authClient.tstamp = reply.Timestamp authClient.tstamp = reply.Timestamp

View File

@ -24,6 +24,9 @@ var errServerNotInitialized = errors.New("Server not initialized, please try aga
// errServerVersionMismatch - server versions do not match. // errServerVersionMismatch - server versions do not match.
var errServerVersionMismatch = errors.New("Server versions do not match.") var errServerVersionMismatch = errors.New("Server versions do not match.")
// errServerTimeMismatch - server times are too far apart.
var errServerTimeMismatch = errors.New("Server times are too far apart.")
/// Auth operations /// Auth operations
// Login - login handler. // Login - login handler.
@ -40,6 +43,7 @@ func (c *controllerAPIHandlers) LoginHandler(args *RPCLoginArgs, reply *RPCLogin
return err return err
} }
reply.Token = token reply.Token = token
reply.Timestamp = c.timestamp
reply.ServerVersion = Version reply.ServerVersion = Version
return nil return nil
} }

View File

@ -18,6 +18,7 @@ package cmd
import ( import (
"net/rpc" "net/rpc"
"time"
router "github.com/gorilla/mux" router "github.com/gorilla/mux"
) )
@ -33,6 +34,7 @@ func registerControllerRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfi
ctrlHandlers := &controllerAPIHandlers{ ctrlHandlers := &controllerAPIHandlers{
ObjectAPI: newObjectLayerFn, ObjectAPI: newObjectLayerFn,
StorageDisks: srvCmdConfig.storageDisks, StorageDisks: srvCmdConfig.storageDisks,
timestamp: time.Now().UTC(),
} }
ctrlRPCServer := rpc.NewServer() ctrlRPCServer := rpc.NewServer()
@ -46,4 +48,5 @@ func registerControllerRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfi
type controllerAPIHandlers struct { type controllerAPIHandlers struct {
ObjectAPI func() ObjectLayer ObjectAPI func() ObjectLayer
StorageDisks []StorageAPI StorageDisks []StorageAPI
timestamp time.Time
} }

View File

@ -202,10 +202,10 @@ func (h timeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeErrorResponse(w, r, apiErr, r.URL.Path) writeErrorResponse(w, r, apiErr, r.URL.Path)
return return
} }
// Verify if the request date header is shifted by less than maxSkewTime parameter in the past // Verify if the request date header is shifted by less than globalMaxSkewTime parameter in the past
// or in the future, reject request otherwise. // or in the future, reject request otherwise.
curTime := time.Now().UTC() curTime := time.Now().UTC()
if curTime.Sub(amzDate) > maxSkewTime || amzDate.Sub(curTime) > maxSkewTime { if curTime.Sub(amzDate) > globalMaxSkewTime || amzDate.Sub(curTime) > globalMaxSkewTime {
writeErrorResponse(w, r, ErrRequestTimeTooSkewed, r.URL.Path) writeErrorResponse(w, r, ErrRequestTimeTooSkewed, r.URL.Path)
return return
} }

View File

@ -65,7 +65,7 @@ var (
var ( var (
// The maximum allowed difference between the request generation time and the server processing time // The maximum allowed difference between the request generation time and the server processing time
maxSkewTime = 15 * time.Minute globalMaxSkewTime = 15 * time.Minute
) )
// global colors. // global colors.

View File

@ -134,3 +134,28 @@ func getFormatMsg(storageDisks []StorageAPI) string {
} }
return msg return msg
} }
func printConfigErrMsg(storageDisks []StorageAPI, sErrs []error, fn printOnceFunc) {
msg := getConfigErrMsg(storageDisks, sErrs)
fn(msg)
}
// Generate a formatted message when cluster is misconfigured.
func getConfigErrMsg(storageDisks []StorageAPI, sErrs []error) string {
msg := colorBlue("\nDetected configuration inconsistencies in the cluster. Please fix following servers.")
for i, disk := range storageDisks {
if disk == nil {
continue
}
if sErrs[i] == nil {
continue
}
msg += fmt.Sprintf(
"\n[%s] %s : %s",
int2Str(i+1, len(storageDisks)),
storageDisks[i],
sErrs[i],
)
}
return msg
}

View File

@ -20,15 +20,40 @@ import "testing"
// Tests heal message to be correct and properly formatted. // Tests heal message to be correct and properly formatted.
func TestHealMsg(t *testing.T) { func TestHealMsg(t *testing.T) {
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatal("Unable to initialize test config", err)
}
defer removeAll(rootPath)
storageDisks, fsDirs := prepareXLStorageDisks(t) storageDisks, fsDirs := prepareXLStorageDisks(t)
errs := make([]error, len(storageDisks))
defer removeRoots(fsDirs) defer removeRoots(fsDirs)
nilDisks := deepCopyStorageDisks(storageDisks)
nilDisks[5] = nil
authErrs := make([]error, len(storageDisks))
authErrs[5] = errAuthentication
testCases := []struct { testCases := []struct {
endPoint string endPoint string
storageDisks []StorageAPI storageDisks []StorageAPI
serrs []error
}{ }{
// Test - 1 for valid disks and errors.
{ {
endPoint: "http://10.1.10.1:9000", endPoint: "http://10.1.10.1:9000",
storageDisks: storageDisks, storageDisks: storageDisks,
serrs: errs,
},
// Test - 2 for one of the disks is nil.
{
endPoint: "http://10.1.10.1:9000",
storageDisks: nilDisks,
serrs: errs,
},
// Test - 3 for one of the errs is authentication.
{
endPoint: "http://10.1.10.1:9000",
storageDisks: nilDisks,
serrs: authErrs,
}, },
} }
for i, testCase := range testCases { for i, testCase := range testCases {
@ -44,6 +69,10 @@ func TestHealMsg(t *testing.T) {
if msg == "" { if msg == "" {
t.Fatalf("Test: %d Unable to get format message.", i+1) t.Fatalf("Test: %d Unable to get format message.", i+1)
} }
msg = getConfigErrMsg(testCase.storageDisks, testCase.serrs)
if msg == "" {
t.Fatalf("Test: %d Unable to get config error message.", i+1)
}
} }
} }

View File

@ -93,6 +93,9 @@ const (
// WaitForFormatting - Wait for formatting to be triggered from the '1st' server in the cluster. // WaitForFormatting - Wait for formatting to be triggered from the '1st' server in the cluster.
WaitForFormatting WaitForFormatting
// WaitForConfig - Wait for all servers to have the same config including (credentials, version and time).
WaitForConfig
// InitObjectLayer - Initialize object layer. // InitObjectLayer - Initialize object layer.
InitObjectLayer InitObjectLayer
@ -101,6 +104,26 @@ const (
Abort Abort
) )
// Quick error to actions converts looking for specific errors which need to
// be returned quickly and server should wait instead.
func quickErrToActions(errMap map[error]int) InitActions {
var action InitActions
switch {
case errMap[errInvalidAccessKeyID] > 0:
fallthrough
case errMap[errAuthentication] > 0:
fallthrough
case errMap[errServerVersionMismatch] > 0:
fallthrough
case errMap[errServerTimeMismatch] > 0:
action = WaitForConfig
}
return action
}
// Preparatory initialization stage for XL validates known errors.
// Converts them into specific actions. These actions have special purpose
// which caller decides on what needs to be done.
func prepForInitXL(firstDisk bool, sErrs []error, diskCount int) InitActions { func prepForInitXL(firstDisk bool, sErrs []error, diskCount int) InitActions {
// Count errors by error value. // Count errors by error value.
errMap := make(map[error]int) errMap := make(map[error]int)
@ -108,6 +131,11 @@ func prepForInitXL(firstDisk bool, sErrs []error, diskCount int) InitActions {
errMap[err]++ errMap[err]++
} }
// Validates and converts specific config errors into WaitForConfig.
if quickErrToActions(errMap) == WaitForConfig {
return WaitForConfig
}
quorum := diskCount/2 + 1 quorum := diskCount/2 + 1
disksOffline := errMap[errDiskNotFound] disksOffline := errMap[errDiskNotFound]
disksFormatted := errMap[nil] disksFormatted := errMap[nil]
@ -151,7 +179,7 @@ func prepForInitXL(firstDisk bool, sErrs []error, diskCount int) InitActions {
} }
// Some of the formatted disks are possibly corrupted or unformatted, heal them. // Some of the formatted disks are possibly corrupted or unformatted, heal them.
return WaitForHeal return WaitForHeal
} // No quorum wait for quorum number of disks. } // Exhausted all our checks, un-handled errors perhaps we Abort.
return WaitForQuorum return WaitForQuorum
} }
@ -201,6 +229,9 @@ func retryFormattingDisks(firstDisk bool, firstEndpoint string, storageDisks []S
"Initializing data volume. Waiting for minimum %d servers to come online.\n", "Initializing data volume. Waiting for minimum %d servers to come online.\n",
len(storageDisks)/2+1, len(storageDisks)/2+1,
) )
case WaitForConfig:
// Print configuration errors.
printConfigErrMsg(storageDisks, sErrs, printOnceFn())
case WaitForAll: case WaitForAll:
console.Println("Initializing data volume for first time. Waiting for other servers to come online.") console.Println("Initializing data volume for first time. Waiting for other servers to come online.")
case WaitForFormatting: case WaitForFormatting:

View File

@ -32,6 +32,8 @@ func (action InitActions) String() string {
return "WaitForAll" return "WaitForAll"
case WaitForQuorum: case WaitForQuorum:
return "WaitForQuorum" return "WaitForQuorum"
case WaitForConfig:
return "WaitForConfig"
case Abort: case Abort:
return "Abort" return "Abort"
default: default:
@ -79,6 +81,26 @@ func TestPrepForInitXL(t *testing.T) {
errDiskNotFound, errDiskNotFound, errDiskNotFound, errDiskNotFound, errDiskNotFound, errDiskNotFound, errDiskNotFound, errDiskNotFound,
errDiskNotFound, nil, nil, nil, errDiskNotFound, nil, nil, nil,
} }
// Invalid access key id.
accessKeyIDErr := []error{
errInvalidAccessKeyID, nil, nil, nil,
nil, nil, nil, nil,
}
// Authentication error.
authenticationErr := []error{
nil, nil, nil, nil,
errAuthentication, nil, nil, nil,
}
// Server version mismatch.
serverVersionMismatch := []error{
errServerVersionMismatch, nil, nil, nil,
errServerVersionMismatch, nil, nil, nil,
}
// Server time mismatch.
serverTimeMismatch := []error{
nil, nil, nil, nil,
errServerTimeMismatch, nil, nil, nil,
}
testCases := []struct { testCases := []struct {
// Params for prepForInit(). // Params for prepForInit().
@ -105,6 +127,11 @@ func TestPrepForInitXL(t *testing.T) {
{false, noQuourm, 8, WaitForQuorum}, {false, noQuourm, 8, WaitForQuorum},
{false, minorityCorrupted, 8, WaitForHeal}, {false, minorityCorrupted, 8, WaitForHeal},
{false, majorityCorrupted, 8, Abort}, {false, majorityCorrupted, 8, Abort},
// Config mistakes.
{true, accessKeyIDErr, 8, WaitForConfig},
{true, authenticationErr, 8, WaitForConfig},
{true, serverVersionMismatch, 8, WaitForConfig},
{true, serverTimeMismatch, 8, WaitForConfig},
} }
for i, test := range testCases { for i, test := range testCases {
actual := prepForInitXL(test.firstDisk, test.errs, test.diskCount) actual := prepForInitXL(test.firstDisk, test.errs, test.diskCount)

View File

@ -45,9 +45,6 @@ func (r *lockedRandSource) Seed(seed int64) {
r.lk.Unlock() r.lk.Unlock()
} }
// MaxRetry is the maximum number of retries before stopping.
var MaxRetry = 5
// MaxJitter will randomize over the full exponential backoff time // MaxJitter will randomize over the full exponential backoff time
const MaxJitter = 1.0 const MaxJitter = 1.0

View File

@ -246,18 +246,6 @@ func checkNamingDisks(disks []string) error {
return nil return nil
} }
// Validates remote disks are successfully accessible, ignores networks errors.
func validateRemoteDisks(disks []StorageAPI) error {
for _, disk := range disks {
_, err := disk.DiskInfo()
if _, ok := err.(*net.OpError); ok {
continue
}
return err
}
return nil
}
// Validate input disks. // Validate input disks.
func validateDisks(disks []string, ignoredDisks []string) []StorageAPI { func validateDisks(disks []string, ignoredDisks []string) []StorageAPI {
isXL := len(disks) > 1 isXL := len(disks) > 1
@ -278,10 +266,6 @@ func validateDisks(disks []string, ignoredDisks []string) []StorageAPI {
} }
storageDisks, err := initStorageDisks(disks, ignoredDisks) storageDisks, err := initStorageDisks(disks, ignoredDisks)
fatalIf(err, "Unable to initialize storage disks.") fatalIf(err, "Unable to initialize storage disks.")
if isXL {
err = validateRemoteDisks(storageDisks)
fatalIf(err, "Unable to validate remote disks.")
}
return storageDisks return storageDisks
} }

View File

@ -78,6 +78,10 @@ func (jwt *JWT) GenerateToken(accessKey string) (string, error) {
return token.SignedString([]byte(jwt.SecretAccessKey)) return token.SignedString([]byte(jwt.SecretAccessKey))
} }
var errInvalidAccessKeyID = errors.New("The access key ID you provided does not exist in our records.")
var errAuthentication = errors.New("Authentication failed, check your access credentials.")
// Authenticate - authenticates incoming access key and secret key. // Authenticate - authenticates incoming access key and secret key.
func (jwt *JWT) Authenticate(accessKey, secretKey string) error { func (jwt *JWT) Authenticate(accessKey, secretKey string) error {
// Trim spaces. // Trim spaces.
@ -91,13 +95,12 @@ func (jwt *JWT) Authenticate(accessKey, secretKey string) error {
} }
if accessKey != jwt.AccessKeyID { if accessKey != jwt.AccessKeyID {
return errors.New("Access key does not match") return errInvalidAccessKeyID
} }
hashedSecretKey, _ := bcrypt.GenerateFromPassword([]byte(jwt.SecretAccessKey), bcrypt.DefaultCost) hashedSecretKey, _ := bcrypt.GenerateFromPassword([]byte(jwt.SecretAccessKey), bcrypt.DefaultCost)
if bcrypt.CompareHashAndPassword(hashedSecretKey, []byte(secretKey)) != nil { if bcrypt.CompareHashAndPassword(hashedSecretKey, []byte(secretKey)) != nil {
return errors.New("Authentication failed") return errAuthentication
} }
// Success. // Success.

View File

@ -201,9 +201,9 @@ func TestAuthenticate(t *testing.T) {
// Secret key too long. // Secret key too long.
{"myuser", "pass1234567890123456789012345678901234567", fmt.Errorf("Invalid secret key")}, {"myuser", "pass1234567890123456789012345678901234567", fmt.Errorf("Invalid secret key")},
// Authentication error. // Authentication error.
{"myuser", "mypassword", fmt.Errorf("Access key does not match")}, {"myuser", "mypassword", errInvalidAccessKeyID},
// Authentication error. // Authentication error.
{serverConfig.GetCredential().AccessKeyID, "mypassword", fmt.Errorf("Authentication failed")}, {serverConfig.GetCredential().AccessKeyID, "mypassword", errAuthentication},
// Success. // Success.
{serverConfig.GetCredential().AccessKeyID, serverConfig.GetCredential().SecretAccessKey, nil}, {serverConfig.GetCredential().AccessKeyID, serverConfig.GetCredential().SecretAccessKey, nil},
// Success when access key contains leading/trailing spaces. // Success when access key contains leading/trailing spaces.
@ -213,12 +213,10 @@ func TestAuthenticate(t *testing.T) {
// Run tests. // Run tests.
for _, testCase := range testCases { for _, testCase := range testCases {
err := jwt.Authenticate(testCase.accessKey, testCase.secretKey) err := jwt.Authenticate(testCase.accessKey, testCase.secretKey)
if testCase.expectedErr != nil { if testCase.expectedErr != nil {
if err == nil { if err == nil {
t.Fatalf("%+v: expected: %s, got: <nil>", testCase, testCase.expectedErr) t.Fatalf("%+v: expected: %s, got: <nil>", testCase, testCase.expectedErr)
} }
if testCase.expectedErr.Error() != err.Error() { if testCase.expectedErr.Error() != err.Error() {
t.Fatalf("%+v: expected: %s, got: %s", testCase, testCase.expectedErr, err) t.Fatalf("%+v: expected: %s, got: %s", testCase, testCase.expectedErr, err)
} }

View File

@ -81,6 +81,14 @@ func toStorageErr(err error) error {
return errCorruptedFormat return errCorruptedFormat
case errUnformattedDisk.Error(): case errUnformattedDisk.Error():
return errUnformattedDisk return errUnformattedDisk
case errInvalidAccessKeyID.Error():
return errInvalidAccessKeyID
case errAuthentication.Error():
return errAuthentication
case errServerVersionMismatch.Error():
return errServerVersionMismatch
case errServerTimeMismatch.Error():
return errServerTimeMismatch
} }
return err return err
} }

View File

@ -104,6 +104,22 @@ func TestStorageErr(t *testing.T) {
expectedErr: errFileNameTooLong, expectedErr: errFileNameTooLong,
err: fmt.Errorf("%s", errFileNameTooLong.Error()), err: fmt.Errorf("%s", errFileNameTooLong.Error()),
}, },
{
expectedErr: errInvalidAccessKeyID,
err: fmt.Errorf("%s", errInvalidAccessKeyID.Error()),
},
{
expectedErr: errAuthentication,
err: fmt.Errorf("%s", errAuthentication.Error()),
},
{
expectedErr: errServerVersionMismatch,
err: fmt.Errorf("%s", errServerVersionMismatch.Error()),
},
{
expectedErr: errServerTimeMismatch,
err: fmt.Errorf("%s", errServerTimeMismatch.Error()),
},
{ {
expectedErr: unknownErr, expectedErr: unknownErr,
err: unknownErr, err: unknownErr,

View File

@ -229,7 +229,7 @@ func newRPCServer(serverConfig serverCmdConfig) (servers []*storageServer, err e
if len(ignoredExports) > 0 { if len(ignoredExports) > 0 {
ignoredSet = set.CreateStringSet(ignoredExports...) ignoredSet = set.CreateStringSet(ignoredExports...)
} }
t := time.Now().UTC() tstamp := time.Now().UTC()
for _, export := range exports { for _, export := range exports {
if ignoredSet.Contains(export) { if ignoredSet.Contains(export) {
// Ignore initializing ignored export. // Ignore initializing ignored export.
@ -251,7 +251,7 @@ func newRPCServer(serverConfig serverCmdConfig) (servers []*storageServer, err e
servers = append(servers, &storageServer{ servers = append(servers, &storageServer{
storage: storage, storage: storage,
path: export, path: export,
timestamp: t, timestamp: tstamp,
}) })
} }
} }