auth/rpc: Take remote disk offline after maximum allowed attempts. (#3288)

Disks when are offline for a long period of time, we should
ignore the disk after trying Login upto 5 times.

This is to reduce the network chattiness, this also reduces
the overall time spent on `net.Dial`.

Fixes #3286
This commit is contained in:
Harshavardhana 2016-11-20 16:57:12 -08:00 committed by GitHub
parent ffbee70e04
commit 0b9f0d14a1
11 changed files with 136 additions and 81 deletions

View File

@ -97,12 +97,13 @@ type authConfig struct {
// AuthRPCClient is a wrapper type for RPCClient which provides JWT based authentication across reconnects.
type AuthRPCClient struct {
mu sync.Mutex
config *authConfig
rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client
isLoggedIn bool // Indicates if the auth client has been logged in and token is valid.
token string // JWT based token
serverVersion string // Server version exchanged by the RPC.
mu sync.Mutex
config *authConfig
rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client
isLoggedIn bool // Indicates if the auth client has been logged in and token is valid.
serverToken string // Disk rpc JWT based token.
serverVersion string // Server version exchanged by the RPC.
serverIOErrCnt int // Keeps track of total errors occurred for each RPC call.
}
// newAuthClient - returns a jwt based authenticated (go) rpc client, which does automatic reconnect.
@ -127,30 +128,51 @@ func (authClient *AuthRPCClient) Close() error {
}
// Login - a jwt based authentication is performed with rpc server.
func (authClient *AuthRPCClient) Login() error {
func (authClient *AuthRPCClient) Login() (err error) {
authClient.mu.Lock()
// As soon as the function returns unlock,
defer authClient.mu.Unlock()
// Take remote disk offline if the total server errors
// are more than maximum allowable IO error limit.
if authClient.serverIOErrCnt > maxAllowedIOError {
return errFaultyRemoteDisk
}
// In defer sequence this is called first, so error
// increment happens well with in the lock.
defer func() {
if err != nil {
authClient.serverIOErrCnt++
}
}()
// Return if already logged in.
if authClient.isLoggedIn {
return nil
}
reply := RPCLoginReply{}
if err := authClient.rpc.Call(authClient.config.loginMethod, RPCLoginArgs{
if err = authClient.rpc.Call(authClient.config.loginMethod, RPCLoginArgs{
Username: authClient.config.accessKey,
Password: authClient.config.secretKey,
}, &reply); err != nil {
return err
}
// Validate if version do indeed match.
if reply.ServerVersion != Version {
return errServerVersionMismatch
}
// Validate if server timestamp is skewed.
curTime := time.Now().UTC()
if curTime.Sub(reply.Timestamp) > globalMaxSkewTime {
return errServerTimeMismatch
}
// Set token, time stamp as received from a successful login call.
authClient.token = reply.Token
authClient.serverToken = reply.Token
authClient.serverVersion = reply.ServerVersion
authClient.isLoggedIn = true
return nil
@ -166,7 +188,7 @@ func (authClient *AuthRPCClient) Call(serviceMethod string, args interface {
// On successful login, attempt the call.
if err = authClient.Login(); err == nil {
// Set token and timestamp before the rpc call.
args.SetToken(authClient.token)
args.SetToken(authClient.serverToken)
args.SetTimestamp(time.Now().UTC())
// Call the underlying rpc.
@ -183,17 +205,17 @@ func (authClient *AuthRPCClient) Call(serviceMethod string, args interface {
}
// Node returns the node (network address) of the connection
func (authClient *AuthRPCClient) Node() string {
func (authClient *AuthRPCClient) Node() (node string) {
if authClient.rpc != nil {
return authClient.rpc.node
node = authClient.rpc.node
}
return ""
return node
}
// RPCPath returns the RPC path of the connection
func (authClient *AuthRPCClient) RPCPath() string {
func (authClient *AuthRPCClient) RPCPath() (rpcPath string) {
if authClient.rpc != nil {
return authClient.rpc.rpcPath
rpcPath = authClient.rpc.rpcPath
}
return ""
return rpcPath
}

View File

@ -0,0 +1,51 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import "testing"
// Tests authorized RPC client.
func TestAuthRPCClient(t *testing.T) {
authCfg := &authConfig{
accessKey: "123",
secretKey: "123",
secureConn: false,
address: "localhost:9000",
path: "/rpc/disk",
loginMethod: "MyPackage.LoginHandler",
}
authRPC := newAuthClient(authCfg)
if authRPC.Node() != authCfg.address {
t.Fatalf("Unexpected node value %s, but expected %s", authRPC.Node(), authCfg.address)
}
if authRPC.RPCPath() != authCfg.path {
t.Fatalf("Unexpected node value %s, but expected %s", authRPC.RPCPath(), authCfg.path)
}
authCfg = &authConfig{
accessKey: "123",
secretKey: "123",
secureConn: false,
loginMethod: "MyPackage.LoginHandler",
}
authRPC = newAuthClient(authCfg)
if authRPC.Node() != authCfg.address {
t.Fatalf("Unexpected node value %s, but expected %s", authRPC.Node(), authCfg.address)
}
if authRPC.RPCPath() != authCfg.path {
t.Fatalf("Unexpected node value %s, but expected %s", authRPC.RPCPath(), authCfg.path)
}
}

View File

@ -18,7 +18,6 @@ package cmd
import (
"bytes"
"errors"
"io"
"io/ioutil"
"os"
@ -48,8 +47,6 @@ type posix struct {
pool sync.Pool
}
var errFaultyDisk = errors.New("Faulty disk")
// checkPathLength - returns error if given path name length more than 255
func checkPathLength(pathName string) error {
// Apple OS X path length is limited to 1016

View File

@ -33,6 +33,12 @@ var errDiskFull = errors.New("disk path full")
// errDiskNotFount - cannot find the underlying configured disk anymore.
var errDiskNotFound = errors.New("disk not found")
// errFaultyRemoteDisk - remote disk is faulty.
var errFaultyRemoteDisk = errors.New("remote disk is faulty")
// errFaultyDisk - disk is faulty.
var errFaultyDisk = errors.New("disk is faulty")
// errDiskAccessDenied - we don't have write permissions on disk.
var errDiskAccessDenied = errors.New("disk access denied")

View File

@ -22,6 +22,17 @@ import (
"sync"
)
// list all errors that can be ignored in a bucket metadata operation.
var bucketMetadataOpIgnoredErrs = append(bucketOpIgnoredErrs, errVolumeNotFound)
// list all errors that can be ignore in a bucket operation.
var bucketOpIgnoredErrs = []error{
errFaultyDisk,
errFaultyRemoteDisk,
errDiskNotFound,
errDiskAccessDenied,
}
/// Bucket operations
// MakeBucket - make a bucket.
@ -69,11 +80,7 @@ func (xl xlObjects) MakeBucket(bucket string) error {
}
// Verify we have any other errors which should undo make bucket.
if reducedErr := reduceErrs(dErrs, []error{
errDiskNotFound,
errFaultyDisk,
errDiskAccessDenied,
}); reducedErr != nil {
if reducedErr := reduceErrs(dErrs, bucketOpIgnoredErrs); reducedErr != nil {
return toObjectErr(reducedErr, bucket)
}
return nil
@ -120,14 +127,6 @@ func undoMakeBucket(storageDisks []StorageAPI, bucket string) {
wg.Wait()
}
// list all errors that can be ignored in a bucket metadata operation.
var bucketMetadataOpIgnoredErrs = []error{
errDiskNotFound,
errDiskAccessDenied,
errFaultyDisk,
errVolumeNotFound,
}
// getBucketInfo - returns the BucketInfo from one of the load balanced disks.
func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err error) {
for _, disk := range xl.getLoadBalancedDisks() {
@ -290,11 +289,7 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
return toObjectErr(traceError(errXLWriteQuorum), bucket)
}
if reducedErr := reduceErrs(dErrs, []error{
errFaultyDisk,
errDiskNotFound,
errDiskAccessDenied,
}); reducedErr != nil {
if reducedErr := reduceErrs(dErrs, bucketOpIgnoredErrs); reducedErr != nil {
return toObjectErr(reducedErr, bucket)
}

View File

@ -122,11 +122,7 @@ func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error
}
// Verify we have any other errors which should be returned as failure.
if reducedErr := reduceErrs(dErrs, []error{
errDiskNotFound,
errFaultyDisk,
errDiskAccessDenied,
}); reducedErr != nil {
if reducedErr := reduceErrs(dErrs, bucketOpIgnoredErrs); reducedErr != nil {
return toObjectErr(reducedErr, bucket)
}
return nil

View File

@ -211,6 +211,7 @@ var objMetadataOpIgnoredErrs = []error{
errDiskNotFound,
errDiskAccessDenied,
errFaultyDisk,
errFaultyRemoteDisk,
errVolumeNotFound,
errFileAccessDenied,
errFileNotFound,
@ -336,11 +337,7 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []
return traceError(errXLWriteQuorum)
}
return reduceErrs(mErrs, []error{
errDiskNotFound,
errFaultyDisk,
errDiskAccessDenied,
})
return reduceErrs(mErrs, objectOpIgnoredErrs)
}
// writeSameXLMetadata - write `xl.json` on all disks in order.
@ -380,9 +377,5 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet
return traceError(errXLWriteQuorum)
}
return reduceErrs(mErrs, []error{
errDiskNotFound,
errFaultyDisk,
errDiskAccessDenied,
})
return reduceErrs(mErrs, objectOpIgnoredErrs)
}

View File

@ -140,13 +140,7 @@ func (xl xlObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange)
}
wg.Wait()
// Ignored errors list.
ignoredErrs := []error{
errDiskNotFound,
errFaultyDisk,
errDiskAccessDenied,
}
return reduceErrs(errs, ignoredErrs)
return reduceErrs(errs, objectOpIgnoredErrs)
}
// Returns if the prefix is a multipart upload.
@ -257,11 +251,5 @@ func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, quorum in
return traceError(errXLWriteQuorum)
}
// List of ignored errors.
ignoredErrs := []error{
errDiskNotFound,
errDiskAccessDenied,
errFaultyDisk,
}
return reduceErrs(mErrs, ignoredErrs)
return reduceErrs(mErrs, objectOpIgnoredErrs)
}

View File

@ -32,6 +32,14 @@ import (
"github.com/minio/minio/pkg/objcache"
)
// list all errors which can be ignored in object operations.
var objectOpIgnoredErrs = []error{
errDiskNotFound,
errDiskAccessDenied,
errFaultyDisk,
errFaultyRemoteDisk,
}
/// Object Operations
// GetObject - reads an object erasured coded across multiple
@ -71,11 +79,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
return traceError(InsufficientReadQuorum{}, errs...)
}
if reducedErr := reduceErrs(errs, []error{
errDiskNotFound,
errFaultyDisk,
errDiskAccessDenied,
}); reducedErr != nil {
if reducedErr := reduceErrs(errs, objectOpIgnoredErrs); reducedErr != nil {
return toObjectErr(reducedErr, bucket, object)
}
@ -333,11 +337,7 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string,
return traceError(errXLWriteQuorum)
}
// Return on first error, also undo any partially successful rename operations.
return reduceErrs(errs, []error{
errDiskNotFound,
errDiskAccessDenied,
errFaultyDisk,
})
return reduceErrs(errs, objectOpIgnoredErrs)
}
// renamePart - renames a part of the source object to the destination

View File

@ -52,19 +52,25 @@ func reduceErrs(errs []error, ignoredErrs []error) error {
return traceError(errMax, errs...)
}
// List of all errors which are ignored while verifying quorum.
var quorumIgnoredErrs = []error{
errFaultyDisk,
errFaultyRemoteDisk,
errDiskNotFound,
errDiskAccessDenied,
}
// Validates if we have quorum based on the errors related to disk only.
// Returns 'true' if we have quorum, 'false' if we don't.
func isDiskQuorum(errs []error, minQuorumCount int) bool {
var count int
errs = errorsCause(errs)
for _, err := range errs {
switch err {
case errDiskNotFound, errFaultyDisk, errDiskAccessDenied:
continue
// Check if the error can be ignored for quorum verification.
if !isErrIgnored(err, quorumIgnoredErrs) {
count++
}
count++
}
return count >= minQuorumCount
}

View File

@ -77,6 +77,7 @@ var xlTreeWalkIgnoredErrs = []error{
errDiskNotFound,
errDiskAccessDenied,
errFaultyDisk,
errFaultyRemoteDisk,
}
// newXLObjects - initialize new xl object layer.