minio/internal/dsync/dsync-client_test.go

150 lines
4.3 KiB
Go

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package dsync
import (
"bytes"
"context"
"errors"
"net/http"
"net/url"
"time"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/rest"
)
// ReconnectRPCClient is a wrapper type for rpc.Client which provides reconnect on first failure.
type ReconnectRPCClient struct {
u *url.URL
rpc *rest.Client
}
// newClient constructs a ReconnectRPCClient object with addr and endpoint initialized.
// It _doesn't_ connect to the remote endpoint. See Call method to see when the
// connect happens.
func newClient(endpoint string) NetLocker {
u, err := url.Parse(endpoint)
if err != nil {
panic(err)
}
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
MaxIdleConnsPerHost: 1024,
WriteBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
ReadBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
IdleConnTimeout: 15 * time.Second,
ResponseHeaderTimeout: 15 * time.Minute, // Set conservative timeouts for MinIO internode.
TLSHandshakeTimeout: 15 * time.Second,
ExpectContinueTimeout: 15 * time.Second,
// Go net/http automatically unzip if content-type is
// gzip disable this feature, as we are always interested
// in raw stream.
DisableCompression: true,
}
return &ReconnectRPCClient{
u: u,
rpc: rest.NewClient(u, tr, nil),
}
}
// Close closes the underlying socket file descriptor.
func (rpcClient *ReconnectRPCClient) IsOnline() bool {
// If rpc client has not connected yet there is nothing to close.
return rpcClient.rpc != nil
}
func (rpcClient *ReconnectRPCClient) IsLocal() bool {
return false
}
// Close closes the underlying socket file descriptor.
func (rpcClient *ReconnectRPCClient) Close() error {
return nil
}
var (
errLockConflict = errors.New("lock conflict")
errLockNotFound = errors.New("lock not found")
)
func toLockError(err error) error {
if err == nil {
return nil
}
switch err.Error() {
case errLockConflict.Error():
return errLockConflict
case errLockNotFound.Error():
return errLockNotFound
}
return err
}
// Call makes a RPC call to the remote endpoint using the default codec, namely encoding/gob.
func (rpcClient *ReconnectRPCClient) Call(method string, args LockArgs) (status bool, err error) {
buf, err := args.MarshalMsg(nil)
if err != nil {
return false, err
}
body := bytes.NewReader(buf)
respBody, err := rpcClient.rpc.Call(context.Background(), method,
url.Values{}, body, body.Size())
defer xhttp.DrainBody(respBody)
switch toLockError(err) {
case nil:
return true, nil
case errLockConflict, errLockNotFound:
return false, nil
default:
return false, err
}
}
func (rpcClient *ReconnectRPCClient) RLock(ctx context.Context, args LockArgs) (status bool, err error) {
return rpcClient.Call("/v1/rlock", args)
}
func (rpcClient *ReconnectRPCClient) Lock(ctx context.Context, args LockArgs) (status bool, err error) {
return rpcClient.Call("/v1/lock", args)
}
func (rpcClient *ReconnectRPCClient) RUnlock(ctx context.Context, args LockArgs) (status bool, err error) {
return rpcClient.Call("/v1/runlock", args)
}
func (rpcClient *ReconnectRPCClient) Unlock(ctx context.Context, args LockArgs) (status bool, err error) {
return rpcClient.Call("/v1/unlock", args)
}
func (rpcClient *ReconnectRPCClient) Refresh(ctx context.Context, args LockArgs) (refreshed bool, err error) {
return rpcClient.Call("/v1/refresh", args)
}
func (rpcClient *ReconnectRPCClient) ForceUnlock(ctx context.Context, args LockArgs) (reply bool, err error) {
return rpcClient.Call("/v1/force-unlock", args)
}
func (rpcClient *ReconnectRPCClient) String() string {
return rpcClient.u.String()
}