minio/cmd/rpc.go
Anis Elleuch f9fecf0e76 storage: Increase the timeout of storage REST requests (#7218)
This commit increases storage REST requests to 5 minutes, this includes
the opening TCP connection, and sending/receiving data. This will reduce
clients receiving errors when the server is under high load.
2019-02-12 23:27:33 -08:00

277 lines
6.4 KiB
Go

/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"crypto/tls"
"fmt"
"net"
"net/url"
"sync"
"time"
xrpc "github.com/minio/minio/cmd/rpc"
xnet "github.com/minio/minio/pkg/net"
)
// DefaultSkewTime - skew time is 15 minutes between minio peers.
const DefaultSkewTime = 15 * time.Minute
// defaultRPCTimeout - default RPC timeout is one minute.
const defaultRPCTimeout = 5 * time.Minute
// defaultRPCRetryTime - default RPC time to wait before retry after a network error
const defaultRPCRetryTime = 1 * time.Minute
var errRPCRetry = fmt.Errorf("rpc: retry error")
func isNetError(err error) bool {
if err == nil {
return false
}
if uerr, isURLError := err.(*url.Error); isURLError {
if uerr.Timeout() {
return true
}
err = uerr.Err
}
_, isNetOpError := err.(*net.OpError)
return isNetOpError
}
// RPCVersion - RPC semantic version based on semver 2.0.0 https://semver.org/.
type RPCVersion struct {
Major uint64
Minor uint64
Patch uint64
}
// Compare - compares given version with this version.
func (v RPCVersion) Compare(o RPCVersion) int {
compare := func(v1, v2 uint64) int {
if v1 == v2 {
return 0
}
if v1 > v2 {
return 1
}
return -1
}
if r := compare(v.Major, o.Major); r != 0 {
return r
}
if r := compare(v.Minor, o.Minor); r != 0 {
return r
}
return compare(v.Patch, o.Patch)
}
func (v RPCVersion) String() string {
return fmt.Sprintf("%v.%v.%v", v.Major, v.Minor, v.Patch)
}
// AuthArgs - base argument for any RPC call for authentication.
type AuthArgs struct {
Token string
RPCVersion RPCVersion
RequestTime time.Time
}
// Authenticate - checks if given arguments are valid to allow RPC call.
// This is xrpc.Authenticator and is called in RPC server.
func (args AuthArgs) Authenticate() error {
// Check whether request time is within acceptable skew time.
utcNow := time.Now().UTC()
if args.RequestTime.Sub(utcNow) > DefaultSkewTime || utcNow.Sub(args.RequestTime) > DefaultSkewTime {
return fmt.Errorf("client time %v is too apart with server time %v", args.RequestTime, utcNow)
}
if globalRPCAPIVersion.Compare(args.RPCVersion) != 0 {
return fmt.Errorf("version mismatch. expected: %v, received: %v", globalRPCAPIVersion, args.RPCVersion)
}
if !isAuthTokenValid(args.Token) {
return errAuthentication
}
return nil
}
// SetAuthArgs - sets given authentication arguments to this args. This is called in RPC client.
func (args *AuthArgs) SetAuthArgs(authArgs AuthArgs) {
*args = authArgs
}
// VoidReply - void (empty) RPC reply.
type VoidReply struct{}
// RPCClientArgs - RPC client arguments.
type RPCClientArgs struct {
NewAuthTokenFunc func() string
RPCVersion RPCVersion
ServiceName string
ServiceURL *xnet.URL
TLSConfig *tls.Config
}
// validate - checks whether given args are valid or not.
func (args RPCClientArgs) validate() error {
if args.NewAuthTokenFunc == nil {
return fmt.Errorf("NewAuthTokenFunc must not be empty")
}
if args.ServiceName == "" {
return fmt.Errorf("ServiceName must not be empty")
}
if args.ServiceURL.Scheme != "http" && args.ServiceURL.Scheme != "https" {
return fmt.Errorf("unknown RPC URL %v", args.ServiceURL)
}
if args.ServiceURL.User != nil || args.ServiceURL.ForceQuery || args.ServiceURL.RawQuery != "" || args.ServiceURL.Fragment != "" {
return fmt.Errorf("unknown RPC URL %v", args.ServiceURL)
}
if args.ServiceURL.Scheme == "https" && args.TLSConfig == nil {
return fmt.Errorf("tls configuration must not be empty for https url %v", args.ServiceURL)
}
return nil
}
// RPCClient - base RPC client.
type RPCClient struct {
sync.RWMutex
args RPCClientArgs
authToken string
rpcClient *xrpc.Client
retryTicker *time.Ticker
}
func (client *RPCClient) setRetryTicker(ticker *time.Ticker) {
if ticker == nil {
client.RLock()
isNil := client.retryTicker == nil
client.RUnlock()
if isNil {
return
}
}
client.Lock()
defer client.Unlock()
if client.retryTicker != nil {
client.retryTicker.Stop()
}
client.retryTicker = ticker
}
// Call - calls servicemethod on remote server.
func (client *RPCClient) Call(serviceMethod string, args interface {
SetAuthArgs(args AuthArgs)
}, reply interface{}) (err error) {
lockedCall := func() error {
client.RLock()
retryTicker := client.retryTicker
client.RUnlock()
if retryTicker != nil {
select {
case <-retryTicker.C:
default:
return errRPCRetry
}
}
client.RLock()
authToken := client.authToken
client.RUnlock()
// Make RPC call.
args.SetAuthArgs(AuthArgs{authToken, client.args.RPCVersion, time.Now().UTC()})
return client.rpcClient.Call(serviceMethod, args, reply)
}
call := func() error {
err = lockedCall()
if err == errRPCRetry {
return err
}
if isNetError(err) {
client.setRetryTicker(time.NewTicker(defaultRPCRetryTime))
} else {
client.setRetryTicker(nil)
}
return err
}
// If authentication error is received, retry the same call only once
// with new authentication token.
if err = call(); err == nil {
return nil
}
if err.Error() != errAuthentication.Error() {
return err
}
client.Lock()
client.authToken = client.args.NewAuthTokenFunc()
client.Unlock()
return call()
}
// Close - closes underneath RPC client.
func (client *RPCClient) Close() error {
client.Lock()
defer client.Unlock()
client.authToken = ""
return client.rpcClient.Close()
}
// ServiceURL - returns service URL used for RPC call.
func (client *RPCClient) ServiceURL() *xnet.URL {
// Take copy of ServiceURL
u := *(client.args.ServiceURL)
return &u
}
// NewRPCClient - returns new RPC client.
func NewRPCClient(args RPCClientArgs) (*RPCClient, error) {
if err := args.validate(); err != nil {
return nil, err
}
return &RPCClient{
args: args,
authToken: args.NewAuthTokenFunc(),
rpcClient: xrpc.NewClient(args.ServiceURL, args.TLSConfig, defaultRPCTimeout),
}, nil
}