diff --git a/cmd/auth-rpc-client.go b/cmd/auth-rpc-client.go index a07d65829..5a79c9a3a 100644 --- a/cmd/auth-rpc-client.go +++ b/cmd/auth-rpc-client.go @@ -97,12 +97,13 @@ type authConfig struct { // AuthRPCClient is a wrapper type for RPCClient which provides JWT based authentication across reconnects. type AuthRPCClient struct { - mu sync.Mutex - config *authConfig - rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client - isLoggedIn bool // Indicates if the auth client has been logged in and token is valid. - token string // JWT based token - serverVersion string // Server version exchanged by the RPC. + mu sync.Mutex + config *authConfig + rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client + isLoggedIn bool // Indicates if the auth client has been logged in and token is valid. + serverToken string // Disk rpc JWT based token. + serverVersion string // Server version exchanged by the RPC. + serverIOErrCnt int // Keeps track of total errors occurred for each RPC call. } // newAuthClient - returns a jwt based authenticated (go) rpc client, which does automatic reconnect. @@ -127,30 +128,51 @@ func (authClient *AuthRPCClient) Close() error { } // Login - a jwt based authentication is performed with rpc server. -func (authClient *AuthRPCClient) Login() error { +func (authClient *AuthRPCClient) Login() (err error) { authClient.mu.Lock() + // As soon as the function returns unlock, defer authClient.mu.Unlock() + + // Take remote disk offline if the total server errors + // are more than maximum allowable IO error limit. + if authClient.serverIOErrCnt > maxAllowedIOError { + return errFaultyRemoteDisk + } + + // In defer sequence this is called first, so error + // increment happens well with in the lock. + defer func() { + if err != nil { + authClient.serverIOErrCnt++ + } + }() + // Return if already logged in. if authClient.isLoggedIn { return nil } + reply := RPCLoginReply{} - if err := authClient.rpc.Call(authClient.config.loginMethod, RPCLoginArgs{ + if err = authClient.rpc.Call(authClient.config.loginMethod, RPCLoginArgs{ Username: authClient.config.accessKey, Password: authClient.config.secretKey, }, &reply); err != nil { return err } + // Validate if version do indeed match. if reply.ServerVersion != Version { return errServerVersionMismatch } + + // Validate if server timestamp is skewed. curTime := time.Now().UTC() if curTime.Sub(reply.Timestamp) > globalMaxSkewTime { return errServerTimeMismatch } + // Set token, time stamp as received from a successful login call. - authClient.token = reply.Token + authClient.serverToken = reply.Token authClient.serverVersion = reply.ServerVersion authClient.isLoggedIn = true return nil @@ -166,7 +188,7 @@ func (authClient *AuthRPCClient) Call(serviceMethod string, args interface { // On successful login, attempt the call. if err = authClient.Login(); err == nil { // Set token and timestamp before the rpc call. - args.SetToken(authClient.token) + args.SetToken(authClient.serverToken) args.SetTimestamp(time.Now().UTC()) // Call the underlying rpc. @@ -183,17 +205,17 @@ func (authClient *AuthRPCClient) Call(serviceMethod string, args interface { } // Node returns the node (network address) of the connection -func (authClient *AuthRPCClient) Node() string { +func (authClient *AuthRPCClient) Node() (node string) { if authClient.rpc != nil { - return authClient.rpc.node + node = authClient.rpc.node } - return "" + return node } // RPCPath returns the RPC path of the connection -func (authClient *AuthRPCClient) RPCPath() string { +func (authClient *AuthRPCClient) RPCPath() (rpcPath string) { if authClient.rpc != nil { - return authClient.rpc.rpcPath + rpcPath = authClient.rpc.rpcPath } - return "" + return rpcPath } diff --git a/cmd/auth-rpc-client_test.go b/cmd/auth-rpc-client_test.go new file mode 100644 index 000000000..fb27d4822 --- /dev/null +++ b/cmd/auth-rpc-client_test.go @@ -0,0 +1,51 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import "testing" + +// Tests authorized RPC client. +func TestAuthRPCClient(t *testing.T) { + authCfg := &authConfig{ + accessKey: "123", + secretKey: "123", + secureConn: false, + address: "localhost:9000", + path: "/rpc/disk", + loginMethod: "MyPackage.LoginHandler", + } + authRPC := newAuthClient(authCfg) + if authRPC.Node() != authCfg.address { + t.Fatalf("Unexpected node value %s, but expected %s", authRPC.Node(), authCfg.address) + } + if authRPC.RPCPath() != authCfg.path { + t.Fatalf("Unexpected node value %s, but expected %s", authRPC.RPCPath(), authCfg.path) + } + authCfg = &authConfig{ + accessKey: "123", + secretKey: "123", + secureConn: false, + loginMethod: "MyPackage.LoginHandler", + } + authRPC = newAuthClient(authCfg) + if authRPC.Node() != authCfg.address { + t.Fatalf("Unexpected node value %s, but expected %s", authRPC.Node(), authCfg.address) + } + if authRPC.RPCPath() != authCfg.path { + t.Fatalf("Unexpected node value %s, but expected %s", authRPC.RPCPath(), authCfg.path) + } +} diff --git a/cmd/posix.go b/cmd/posix.go index f90d87dfb..e734f2b5a 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -18,7 +18,6 @@ package cmd import ( "bytes" - "errors" "io" "io/ioutil" "os" @@ -48,8 +47,6 @@ type posix struct { pool sync.Pool } -var errFaultyDisk = errors.New("Faulty disk") - // checkPathLength - returns error if given path name length more than 255 func checkPathLength(pathName string) error { // Apple OS X path length is limited to 1016 diff --git a/cmd/storage-errors.go b/cmd/storage-errors.go index 787febf25..accb27240 100644 --- a/cmd/storage-errors.go +++ b/cmd/storage-errors.go @@ -33,6 +33,12 @@ var errDiskFull = errors.New("disk path full") // errDiskNotFount - cannot find the underlying configured disk anymore. var errDiskNotFound = errors.New("disk not found") +// errFaultyRemoteDisk - remote disk is faulty. +var errFaultyRemoteDisk = errors.New("remote disk is faulty") + +// errFaultyDisk - disk is faulty. +var errFaultyDisk = errors.New("disk is faulty") + // errDiskAccessDenied - we don't have write permissions on disk. var errDiskAccessDenied = errors.New("disk access denied") diff --git a/cmd/xl-v1-bucket.go b/cmd/xl-v1-bucket.go index ebd3437f9..baa90a199 100644 --- a/cmd/xl-v1-bucket.go +++ b/cmd/xl-v1-bucket.go @@ -22,6 +22,17 @@ import ( "sync" ) +// list all errors that can be ignored in a bucket metadata operation. +var bucketMetadataOpIgnoredErrs = append(bucketOpIgnoredErrs, errVolumeNotFound) + +// list all errors that can be ignore in a bucket operation. +var bucketOpIgnoredErrs = []error{ + errFaultyDisk, + errFaultyRemoteDisk, + errDiskNotFound, + errDiskAccessDenied, +} + /// Bucket operations // MakeBucket - make a bucket. @@ -69,11 +80,7 @@ func (xl xlObjects) MakeBucket(bucket string) error { } // Verify we have any other errors which should undo make bucket. - if reducedErr := reduceErrs(dErrs, []error{ - errDiskNotFound, - errFaultyDisk, - errDiskAccessDenied, - }); reducedErr != nil { + if reducedErr := reduceErrs(dErrs, bucketOpIgnoredErrs); reducedErr != nil { return toObjectErr(reducedErr, bucket) } return nil @@ -120,14 +127,6 @@ func undoMakeBucket(storageDisks []StorageAPI, bucket string) { wg.Wait() } -// list all errors that can be ignored in a bucket metadata operation. -var bucketMetadataOpIgnoredErrs = []error{ - errDiskNotFound, - errDiskAccessDenied, - errFaultyDisk, - errVolumeNotFound, -} - // getBucketInfo - returns the BucketInfo from one of the load balanced disks. func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err error) { for _, disk := range xl.getLoadBalancedDisks() { @@ -290,11 +289,7 @@ func (xl xlObjects) DeleteBucket(bucket string) error { return toObjectErr(traceError(errXLWriteQuorum), bucket) } - if reducedErr := reduceErrs(dErrs, []error{ - errFaultyDisk, - errDiskNotFound, - errDiskAccessDenied, - }); reducedErr != nil { + if reducedErr := reduceErrs(dErrs, bucketOpIgnoredErrs); reducedErr != nil { return toObjectErr(reducedErr, bucket) } diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index 37b0f7464..accb6d31e 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -122,11 +122,7 @@ func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error } // Verify we have any other errors which should be returned as failure. - if reducedErr := reduceErrs(dErrs, []error{ - errDiskNotFound, - errFaultyDisk, - errDiskAccessDenied, - }); reducedErr != nil { + if reducedErr := reduceErrs(dErrs, bucketOpIgnoredErrs); reducedErr != nil { return toObjectErr(reducedErr, bucket) } return nil diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index bac05a5db..d4a4388e6 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -211,6 +211,7 @@ var objMetadataOpIgnoredErrs = []error{ errDiskNotFound, errDiskAccessDenied, errFaultyDisk, + errFaultyRemoteDisk, errVolumeNotFound, errFileAccessDenied, errFileNotFound, @@ -336,11 +337,7 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas [] return traceError(errXLWriteQuorum) } - return reduceErrs(mErrs, []error{ - errDiskNotFound, - errFaultyDisk, - errDiskAccessDenied, - }) + return reduceErrs(mErrs, objectOpIgnoredErrs) } // writeSameXLMetadata - write `xl.json` on all disks in order. @@ -380,9 +377,5 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet return traceError(errXLWriteQuorum) } - return reduceErrs(mErrs, []error{ - errDiskNotFound, - errFaultyDisk, - errDiskAccessDenied, - }) + return reduceErrs(mErrs, objectOpIgnoredErrs) } diff --git a/cmd/xl-v1-multipart-common.go b/cmd/xl-v1-multipart-common.go index 061e4c895..c9f47598f 100644 --- a/cmd/xl-v1-multipart-common.go +++ b/cmd/xl-v1-multipart-common.go @@ -140,13 +140,7 @@ func (xl xlObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange) } wg.Wait() - // Ignored errors list. - ignoredErrs := []error{ - errDiskNotFound, - errFaultyDisk, - errDiskAccessDenied, - } - return reduceErrs(errs, ignoredErrs) + return reduceErrs(errs, objectOpIgnoredErrs) } // Returns if the prefix is a multipart upload. @@ -257,11 +251,5 @@ func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, quorum in return traceError(errXLWriteQuorum) } - // List of ignored errors. - ignoredErrs := []error{ - errDiskNotFound, - errDiskAccessDenied, - errFaultyDisk, - } - return reduceErrs(mErrs, ignoredErrs) + return reduceErrs(mErrs, objectOpIgnoredErrs) } diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 75cff8680..f6698f894 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -32,6 +32,14 @@ import ( "github.com/minio/minio/pkg/objcache" ) +// list all errors which can be ignored in object operations. +var objectOpIgnoredErrs = []error{ + errDiskNotFound, + errDiskAccessDenied, + errFaultyDisk, + errFaultyRemoteDisk, +} + /// Object Operations // GetObject - reads an object erasured coded across multiple @@ -71,11 +79,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i return traceError(InsufficientReadQuorum{}, errs...) } - if reducedErr := reduceErrs(errs, []error{ - errDiskNotFound, - errFaultyDisk, - errDiskAccessDenied, - }); reducedErr != nil { + if reducedErr := reduceErrs(errs, objectOpIgnoredErrs); reducedErr != nil { return toObjectErr(reducedErr, bucket, object) } @@ -333,11 +337,7 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, return traceError(errXLWriteQuorum) } // Return on first error, also undo any partially successful rename operations. - return reduceErrs(errs, []error{ - errDiskNotFound, - errDiskAccessDenied, - errFaultyDisk, - }) + return reduceErrs(errs, objectOpIgnoredErrs) } // renamePart - renames a part of the source object to the destination diff --git a/cmd/xl-v1-utils.go b/cmd/xl-v1-utils.go index 4813bbec5..3b6305fb6 100644 --- a/cmd/xl-v1-utils.go +++ b/cmd/xl-v1-utils.go @@ -52,19 +52,25 @@ func reduceErrs(errs []error, ignoredErrs []error) error { return traceError(errMax, errs...) } +// List of all errors which are ignored while verifying quorum. +var quorumIgnoredErrs = []error{ + errFaultyDisk, + errFaultyRemoteDisk, + errDiskNotFound, + errDiskAccessDenied, +} + // Validates if we have quorum based on the errors related to disk only. // Returns 'true' if we have quorum, 'false' if we don't. func isDiskQuorum(errs []error, minQuorumCount int) bool { var count int errs = errorsCause(errs) for _, err := range errs { - switch err { - case errDiskNotFound, errFaultyDisk, errDiskAccessDenied: - continue + // Check if the error can be ignored for quorum verification. + if !isErrIgnored(err, quorumIgnoredErrs) { + count++ } - count++ } - return count >= minQuorumCount } diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index c94e517dd..87d78e3b5 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -77,6 +77,7 @@ var xlTreeWalkIgnoredErrs = []error{ errDiskNotFound, errDiskAccessDenied, errFaultyDisk, + errFaultyRemoteDisk, } // newXLObjects - initialize new xl object layer.