mirror of
https://github.com/minio/minio.git
synced 2025-01-23 12:43:16 -05:00
2f9e2147f5
PUT calls cannot afford to have large latency build-ups due to contentious usage.json, or worse letting them fail with some unexpected error, this can happen when this file is concurrently being updated via scanner or it is being healed during a disk replacement heal. However, these are fairly quick in theory, stressed clusters can quickly show visible latency this can add up leading to invalid errors returned during PUT. It is perhaps okay for us to relax this error return requirement instead, make sure that we log that we are proceeding to take in the requests while the quota is using an older value for the quota enforcement. These things will reconcile themselves eventually, via scanner making sure to overwrite the usage.json. Bonus: make sure that storage-rest-client sets ExpectTimeouts to be 'true', such that DiskInfo() call with contextTimeout does not prematurely disconnect the servers leading to a longer healthCheck, back-off routine. This can easily pile up while also causing active callers to disconnect, leading to quorum loss. DiskInfo is actively used in the PUT, Multipart call path for upgrading parity when disks are down, it in-turn shouldn't cause more disks to go down.
947 lines
27 KiB
Go
947 lines
27 KiB
Go
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/gob"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/minio/madmin-go/v2"
|
|
"github.com/minio/minio/internal/bucket/bandwidth"
|
|
"github.com/minio/minio/internal/event"
|
|
xhttp "github.com/minio/minio/internal/http"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/minio/minio/internal/rest"
|
|
"github.com/minio/pkg/logger/message/log"
|
|
xnet "github.com/minio/pkg/net"
|
|
"github.com/tinylib/msgp/msgp"
|
|
)
|
|
|
|
// client to talk to peer Nodes.
|
|
type peerRESTClient struct {
|
|
host *xnet.Host
|
|
restClient *rest.Client
|
|
}
|
|
|
|
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
|
|
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
|
|
// after verifying format.json
|
|
func (client *peerRESTClient) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
|
return client.callWithContext(GlobalContext, method, values, body, length)
|
|
}
|
|
|
|
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
|
|
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
|
|
// after verifying format.json
|
|
func (client *peerRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
|
if client == nil || !client.IsOnline() {
|
|
return nil, errPeerNotReachable
|
|
}
|
|
|
|
if values == nil {
|
|
values = make(url.Values)
|
|
}
|
|
|
|
respBody, err = client.restClient.Call(ctx, method, values, body, length)
|
|
if err == nil {
|
|
return respBody, nil
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
|
|
// Stringer provides a canonicalized representation of node.
|
|
func (client *peerRESTClient) String() string {
|
|
return client.host.String()
|
|
}
|
|
|
|
// IsOnline returns true if the peer client is online.
|
|
func (client *peerRESTClient) IsOnline() bool {
|
|
return client.restClient.IsOnline()
|
|
}
|
|
|
|
// Close - marks the client as closed.
|
|
func (client *peerRESTClient) Close() error {
|
|
client.restClient.Close()
|
|
return nil
|
|
}
|
|
|
|
// GetLocks - fetch older locks for a remote node.
|
|
func (client *peerRESTClient) GetLocks() (lockMap map[string][]lockRequesterInfo, err error) {
|
|
respBody, err := client.call(peerRESTMethodGetLocks, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
lockMap = map[string][]lockRequesterInfo{}
|
|
defer xhttp.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&lockMap)
|
|
return lockMap, err
|
|
}
|
|
|
|
// LocalStorageInfo - fetch server information for a remote node.
|
|
func (client *peerRESTClient) LocalStorageInfo() (info StorageInfo, err error) {
|
|
respBody, err := client.call(peerRESTMethodLocalStorageInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// ServerInfo - fetch server information for a remote node.
|
|
func (client *peerRESTClient) ServerInfo() (info madmin.ServerProperties, err error) {
|
|
respBody, err := client.call(peerRESTMethodServerInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// GetCPUs - fetch CPU information for a remote node.
|
|
func (client *peerRESTClient) GetCPUs(ctx context.Context) (info madmin.CPUs, err error) {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodCPUInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// GetPartitions - fetch disk partition information for a remote node.
|
|
func (client *peerRESTClient) GetPartitions(ctx context.Context) (info madmin.Partitions, err error) {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodDiskHwInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// GetOSInfo - fetch OS information for a remote node.
|
|
func (client *peerRESTClient) GetOSInfo(ctx context.Context) (info madmin.OSInfo, err error) {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodOsInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// GetSELinuxInfo - fetch SELinux information for a remote node.
|
|
func (client *peerRESTClient) GetSELinuxInfo(ctx context.Context) (info madmin.SysServices, err error) {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodSysServices, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// GetSysConfig - fetch sys config for a remote node.
|
|
func (client *peerRESTClient) GetSysConfig(ctx context.Context) (info madmin.SysConfig, err error) {
|
|
sent := time.Now()
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodSysConfig, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
roundtrip := int32(time.Since(sent).Milliseconds())
|
|
defer xhttp.DrainBody(respBody)
|
|
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
if ti, ok := info.Config["time-info"].(madmin.TimeInfo); ok {
|
|
ti.RoundtripDuration = roundtrip
|
|
info.Config["time-info"] = ti
|
|
}
|
|
return info, err
|
|
}
|
|
|
|
// GetSysErrors - fetch sys errors for a remote node.
|
|
func (client *peerRESTClient) GetSysErrors(ctx context.Context) (info madmin.SysErrors, err error) {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodSysErrors, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// GetMemInfo - fetch memory information for a remote node.
|
|
func (client *peerRESTClient) GetMemInfo(ctx context.Context) (info madmin.MemInfo, err error) {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodMemInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// GetMetrics - fetch metrics from a remote node.
|
|
func (client *peerRESTClient) GetMetrics(ctx context.Context, t madmin.MetricType, opts collectMetricsOpts) (info madmin.RealtimeMetrics, err error) {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTMetricsTypes, strconv.FormatUint(uint64(t), 10))
|
|
for disk := range opts.disks {
|
|
values.Set(peerRESTDisk, disk)
|
|
}
|
|
values.Set(peerRESTJobID, opts.jobID)
|
|
values.Set(peerRESTDepID, opts.depID)
|
|
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodMetrics, values, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// GetProcInfo - fetch MinIO process information for a remote node.
|
|
func (client *peerRESTClient) GetProcInfo(ctx context.Context) (info madmin.ProcInfo, err error) {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodProcInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// StartProfiling - Issues profiling command on the peer node.
|
|
func (client *peerRESTClient) StartProfiling(profiler string) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTProfiler, profiler)
|
|
respBody, err := client.call(peerRESTMethodStartProfiling, values, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// DownloadProfileData - download profiled data from a remote node.
|
|
func (client *peerRESTClient) DownloadProfileData() (data map[string][]byte, err error) {
|
|
respBody, err := client.call(peerRESTMethodDownloadProfilingData, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&data)
|
|
return data, err
|
|
}
|
|
|
|
// GetBucketStats - load bucket statistics
|
|
func (client *peerRESTClient) GetBucketStats(bucket string) (BucketStats, error) {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTBucket, bucket)
|
|
respBody, err := client.call(peerRESTMethodGetBucketStats, values, nil, -1)
|
|
if err != nil {
|
|
return BucketStats{}, err
|
|
}
|
|
|
|
var bs BucketStats
|
|
defer xhttp.DrainBody(respBody)
|
|
return bs, msgp.Decode(respBody, &bs)
|
|
}
|
|
|
|
// GetAllBucketStats - load replication stats for all buckets
|
|
func (client *peerRESTClient) GetAllBucketStats() (BucketStatsMap, error) {
|
|
values := make(url.Values)
|
|
respBody, err := client.call(peerRESTMethodGetAllBucketStats, values, nil, -1)
|
|
if err != nil {
|
|
return BucketStatsMap{}, err
|
|
}
|
|
|
|
bsMap := BucketStatsMap{}
|
|
defer xhttp.DrainBody(respBody)
|
|
return bsMap, msgp.Decode(respBody, &bsMap)
|
|
}
|
|
|
|
// LoadBucketMetadata - load bucket metadata
|
|
func (client *peerRESTClient) LoadBucketMetadata(bucket string) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTBucket, bucket)
|
|
respBody, err := client.call(peerRESTMethodLoadBucketMetadata, values, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// DeleteBucketMetadata - Delete bucket metadata
|
|
func (client *peerRESTClient) DeleteBucketMetadata(bucket string) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTBucket, bucket)
|
|
respBody, err := client.call(peerRESTMethodDeleteBucketMetadata, values, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// DeletePolicy - delete a specific canned policy.
|
|
func (client *peerRESTClient) DeletePolicy(policyName string) (err error) {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTPolicy, policyName)
|
|
|
|
respBody, err := client.call(peerRESTMethodDeletePolicy, values, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// LoadPolicy - reload a specific canned policy.
|
|
func (client *peerRESTClient) LoadPolicy(policyName string) (err error) {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTPolicy, policyName)
|
|
|
|
respBody, err := client.call(peerRESTMethodLoadPolicy, values, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// LoadPolicyMapping - reload a specific policy mapping
|
|
func (client *peerRESTClient) LoadPolicyMapping(userOrGroup string, userType IAMUserType, isGroup bool) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTUserOrGroup, userOrGroup)
|
|
values.Set(peerRESTUserType, strconv.Itoa(int(userType)))
|
|
if isGroup {
|
|
values.Set(peerRESTIsGroup, "")
|
|
}
|
|
|
|
respBody, err := client.call(peerRESTMethodLoadPolicyMapping, values, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// DeleteUser - delete a specific user.
|
|
func (client *peerRESTClient) DeleteUser(accessKey string) (err error) {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTUser, accessKey)
|
|
|
|
respBody, err := client.call(peerRESTMethodDeleteUser, values, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// DeleteServiceAccount - delete a specific service account.
|
|
func (client *peerRESTClient) DeleteServiceAccount(accessKey string) (err error) {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTUser, accessKey)
|
|
|
|
respBody, err := client.call(peerRESTMethodDeleteServiceAccount, values, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// LoadUser - reload a specific user.
|
|
func (client *peerRESTClient) LoadUser(accessKey string, temp bool) (err error) {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTUser, accessKey)
|
|
values.Set(peerRESTUserTemp, strconv.FormatBool(temp))
|
|
|
|
respBody, err := client.call(peerRESTMethodLoadUser, values, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// LoadServiceAccount - reload a specific service account.
|
|
func (client *peerRESTClient) LoadServiceAccount(accessKey string) (err error) {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTUser, accessKey)
|
|
|
|
respBody, err := client.call(peerRESTMethodLoadServiceAccount, values, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// LoadGroup - send load group command to peers.
|
|
func (client *peerRESTClient) LoadGroup(group string) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTGroup, group)
|
|
respBody, err := client.call(peerRESTMethodLoadGroup, values, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
type binaryInfo struct {
|
|
URL *url.URL
|
|
Sha256Sum []byte
|
|
ReleaseInfo string
|
|
BinaryFile []byte
|
|
}
|
|
|
|
// VerifyBinary - sends verify binary message to remote peers.
|
|
func (client *peerRESTClient) VerifyBinary(ctx context.Context, u *url.URL, sha256Sum []byte, releaseInfo string, readerInput []byte) error {
|
|
values := make(url.Values)
|
|
var reader bytes.Buffer
|
|
if err := gob.NewEncoder(&reader).Encode(binaryInfo{
|
|
URL: u,
|
|
Sha256Sum: sha256Sum,
|
|
ReleaseInfo: releaseInfo,
|
|
BinaryFile: readerInput,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodDownloadBinary, values, &reader, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// CommitBinary - sends commit binary message to remote peers.
|
|
func (client *peerRESTClient) CommitBinary(ctx context.Context) error {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodCommitBinary, nil, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// SignalService - sends signal to peer nodes.
|
|
func (client *peerRESTClient) SignalService(sig serviceSignal, subSys string) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTSignal, strconv.Itoa(int(sig)))
|
|
values.Set(peerRESTSubSys, subSys)
|
|
respBody, err := client.call(peerRESTMethodSignalService, values, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
func (client *peerRESTClient) BackgroundHealStatus() (madmin.BgHealState, error) {
|
|
respBody, err := client.call(peerRESTMethodBackgroundHealStatus, nil, nil, -1)
|
|
if err != nil {
|
|
return madmin.BgHealState{}, err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
|
|
state := madmin.BgHealState{}
|
|
err = gob.NewDecoder(respBody).Decode(&state)
|
|
return state, err
|
|
}
|
|
|
|
// GetLocalDiskIDs - get a peer's local disks' IDs.
|
|
func (client *peerRESTClient) GetLocalDiskIDs(ctx context.Context) (diskIDs []string) {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodGetLocalDiskIDs, nil, nil, -1)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return nil
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
if err = gob.NewDecoder(respBody).Decode(&diskIDs); err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return nil
|
|
}
|
|
return diskIDs
|
|
}
|
|
|
|
// GetMetacacheListing - get a new or existing metacache.
|
|
func (client *peerRESTClient) GetMetacacheListing(ctx context.Context, o listPathOptions) (*metacache, error) {
|
|
if client == nil {
|
|
resp := localMetacacheMgr.getBucket(ctx, o.Bucket).findCache(o)
|
|
return &resp, nil
|
|
}
|
|
|
|
var reader bytes.Buffer
|
|
err := gob.NewEncoder(&reader).Encode(o)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodGetMetacacheListing, nil, &reader, int64(reader.Len()))
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return nil, err
|
|
}
|
|
var resp metacache
|
|
defer xhttp.DrainBody(respBody)
|
|
return &resp, msgp.Decode(respBody, &resp)
|
|
}
|
|
|
|
// UpdateMetacacheListing - update an existing metacache it will unconditionally be updated to the new state.
|
|
func (client *peerRESTClient) UpdateMetacacheListing(ctx context.Context, m metacache) (metacache, error) {
|
|
if client == nil {
|
|
return localMetacacheMgr.updateCacheEntry(m)
|
|
}
|
|
b, err := m.MarshalMsg(nil)
|
|
if err != nil {
|
|
return m, err
|
|
}
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodUpdateMetacacheListing, nil, bytes.NewBuffer(b), int64(len(b)))
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return m, err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
var resp metacache
|
|
return resp, msgp.Decode(respBody, &resp)
|
|
}
|
|
|
|
func (client *peerRESTClient) ReloadPoolMeta(ctx context.Context) error {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodReloadPoolMeta, nil, nil, 0)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
func (client *peerRESTClient) StopRebalance(ctx context.Context) error {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodStopRebalance, nil, nil, 0)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
func (client *peerRESTClient) LoadRebalanceMeta(ctx context.Context, startRebalance bool) error {
|
|
values := url.Values{}
|
|
values.Set(peerRESTStartRebalance, strconv.FormatBool(startRebalance))
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodLoadRebalanceMeta, values, nil, 0)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
func (client *peerRESTClient) LoadTransitionTierConfig(ctx context.Context) error {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodLoadTransitionTierConfig, nil, nil, 0)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
func (client *peerRESTClient) doTrace(traceCh chan<- madmin.TraceInfo, doneCh <-chan struct{}, traceOpts madmin.ServiceTraceOpts) {
|
|
values := make(url.Values)
|
|
traceOpts.AddParams(values)
|
|
|
|
// To cancel the REST request in case doneCh gets closed.
|
|
ctx, cancel := context.WithCancel(GlobalContext)
|
|
|
|
cancelCh := make(chan struct{})
|
|
defer close(cancelCh)
|
|
go func() {
|
|
select {
|
|
case <-doneCh:
|
|
case <-cancelCh:
|
|
// There was an error in the REST request.
|
|
}
|
|
cancel()
|
|
}()
|
|
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodTrace, values, nil, -1)
|
|
defer xhttp.DrainBody(respBody)
|
|
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
dec := gob.NewDecoder(respBody)
|
|
for {
|
|
var info madmin.TraceInfo
|
|
if err = dec.Decode(&info); err != nil {
|
|
return
|
|
}
|
|
if len(info.NodeName) > 0 {
|
|
select {
|
|
case traceCh <- info:
|
|
default:
|
|
// Do not block on slow receivers.
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (client *peerRESTClient) doListen(listenCh chan<- event.Event, doneCh <-chan struct{}, v url.Values) {
|
|
// To cancel the REST request in case doneCh gets closed.
|
|
ctx, cancel := context.WithCancel(GlobalContext)
|
|
|
|
cancelCh := make(chan struct{})
|
|
defer close(cancelCh)
|
|
go func() {
|
|
select {
|
|
case <-doneCh:
|
|
case <-cancelCh:
|
|
// There was an error in the REST request.
|
|
}
|
|
cancel()
|
|
}()
|
|
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodListen, v, nil, -1)
|
|
defer xhttp.DrainBody(respBody)
|
|
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
dec := gob.NewDecoder(respBody)
|
|
for {
|
|
var ev event.Event
|
|
if err := dec.Decode(&ev); err != nil {
|
|
return
|
|
}
|
|
if len(ev.EventVersion) > 0 {
|
|
select {
|
|
case listenCh <- ev:
|
|
default:
|
|
// Do not block on slow receivers.
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Listen - listen on peers.
|
|
func (client *peerRESTClient) Listen(listenCh chan<- event.Event, doneCh <-chan struct{}, v url.Values) {
|
|
go func() {
|
|
for {
|
|
client.doListen(listenCh, doneCh, v)
|
|
select {
|
|
case <-doneCh:
|
|
return
|
|
default:
|
|
// There was error in the REST request, retry after sometime as probably the peer is down.
|
|
time.Sleep(5 * time.Second)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Trace - send http trace request to peer nodes
|
|
func (client *peerRESTClient) Trace(traceCh chan<- madmin.TraceInfo, doneCh <-chan struct{}, traceOpts madmin.ServiceTraceOpts) {
|
|
go func() {
|
|
for {
|
|
client.doTrace(traceCh, doneCh, traceOpts)
|
|
select {
|
|
case <-doneCh:
|
|
return
|
|
default:
|
|
// There was error in the REST request, retry after sometime as probably the peer is down.
|
|
time.Sleep(5 * time.Second)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (client *peerRESTClient) doConsoleLog(logCh chan log.Info, doneCh <-chan struct{}) {
|
|
// To cancel the REST request in case doneCh gets closed.
|
|
ctx, cancel := context.WithCancel(GlobalContext)
|
|
|
|
cancelCh := make(chan struct{})
|
|
defer close(cancelCh)
|
|
go func() {
|
|
select {
|
|
case <-doneCh:
|
|
case <-cancelCh:
|
|
// There was an error in the REST request.
|
|
}
|
|
cancel()
|
|
}()
|
|
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodLog, nil, nil, -1)
|
|
defer xhttp.DrainBody(respBody)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
dec := gob.NewDecoder(respBody)
|
|
for {
|
|
var lg log.Info
|
|
if err = dec.Decode(&lg); err != nil {
|
|
break
|
|
}
|
|
select {
|
|
case logCh <- lg:
|
|
default:
|
|
// Do not block on slow receivers.
|
|
}
|
|
}
|
|
}
|
|
|
|
// ConsoleLog - sends request to peer nodes to get console logs
|
|
func (client *peerRESTClient) ConsoleLog(logCh chan log.Info, doneCh <-chan struct{}) {
|
|
go func() {
|
|
for {
|
|
client.doConsoleLog(logCh, doneCh)
|
|
select {
|
|
case <-doneCh:
|
|
return
|
|
default:
|
|
// There was error in the REST request, retry after sometime as probably the peer is down.
|
|
time.Sleep(5 * time.Second)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// newPeerRestClients creates new peer clients.
|
|
// The two slices will point to the same clients,
|
|
// but 'all' will contain nil entry for local client.
|
|
// The 'all' slice will be in the same order across the cluster.
|
|
func newPeerRestClients(endpoints EndpointServerPools) (remote, all []*peerRESTClient) {
|
|
if !globalIsDistErasure {
|
|
// Only useful in distributed setups
|
|
return nil, nil
|
|
}
|
|
hosts := endpoints.hostsSorted()
|
|
remote = make([]*peerRESTClient, 0, len(hosts))
|
|
all = make([]*peerRESTClient, len(hosts))
|
|
for i, host := range hosts {
|
|
if host == nil {
|
|
continue
|
|
}
|
|
all[i] = newPeerRESTClient(host)
|
|
remote = append(remote, all[i])
|
|
}
|
|
if len(all) != len(remote)+1 {
|
|
logger.LogIf(context.Background(), fmt.Errorf("WARNING: Expected number of all hosts (%v) to be remote +1 (%v)", len(all), len(remote)))
|
|
}
|
|
return remote, all
|
|
}
|
|
|
|
// Returns a peer rest client.
|
|
func newPeerRESTClient(peer *xnet.Host) *peerRESTClient {
|
|
scheme := "http"
|
|
if globalIsTLS {
|
|
scheme = "https"
|
|
}
|
|
|
|
serverURL := &url.URL{
|
|
Scheme: scheme,
|
|
Host: peer.String(),
|
|
Path: peerRESTPath,
|
|
}
|
|
|
|
restClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
|
|
// Use a separate client to avoid recursive calls.
|
|
healthClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
|
|
healthClient.NoMetrics = true
|
|
|
|
// Construct a new health function.
|
|
restClient.HealthCheckFn = func() bool {
|
|
ctx, cancel := context.WithTimeout(context.Background(), restClient.HealthCheckTimeout)
|
|
defer cancel()
|
|
respBody, err := healthClient.Call(ctx, peerRESTMethodHealth, nil, nil, -1)
|
|
xhttp.DrainBody(respBody)
|
|
return !isNetworkError(err)
|
|
}
|
|
|
|
return &peerRESTClient{host: peer, restClient: restClient}
|
|
}
|
|
|
|
// MonitorBandwidth - send http trace request to peer nodes
|
|
func (client *peerRESTClient) MonitorBandwidth(ctx context.Context, buckets []string) (*bandwidth.BucketBandwidthReport, error) {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTBuckets, strings.Join(buckets, ","))
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodGetBandwidth, values, nil, -1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
|
|
dec := gob.NewDecoder(respBody)
|
|
var bandwidthReport bandwidth.BucketBandwidthReport
|
|
err = dec.Decode(&bandwidthReport)
|
|
return &bandwidthReport, err
|
|
}
|
|
|
|
func (client *peerRESTClient) GetPeerMetrics(ctx context.Context) (<-chan Metric, error) {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodGetPeerMetrics, nil, nil, -1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
dec := gob.NewDecoder(respBody)
|
|
ch := make(chan Metric)
|
|
go func(ch chan<- Metric) {
|
|
defer func() {
|
|
xhttp.DrainBody(respBody)
|
|
close(ch)
|
|
}()
|
|
for {
|
|
var metric Metric
|
|
if err := dec.Decode(&metric); err != nil {
|
|
return
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case ch <- metric:
|
|
}
|
|
}
|
|
}(ch)
|
|
return ch, nil
|
|
}
|
|
|
|
func (client *peerRESTClient) SpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, error) {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTSize, strconv.Itoa(opts.objectSize))
|
|
values.Set(peerRESTConcurrent, strconv.Itoa(opts.concurrency))
|
|
values.Set(peerRESTDuration, opts.duration.String())
|
|
values.Set(peerRESTStorageClass, opts.storageClass)
|
|
values.Set(peerRESTBucket, opts.bucketName)
|
|
|
|
respBody, err := client.callWithContext(context.Background(), peerRESTMethodSpeedTest, values, nil, -1)
|
|
if err != nil {
|
|
return SpeedTestResult{}, err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
waitReader, err := waitForHTTPResponse(respBody)
|
|
if err != nil {
|
|
return SpeedTestResult{}, err
|
|
}
|
|
|
|
var result SpeedTestResult
|
|
err = gob.NewDecoder(waitReader).Decode(&result)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
if result.Error != "" {
|
|
return result, errors.New(result.Error)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (client *peerRESTClient) DriveSpeedTest(ctx context.Context, opts madmin.DriveSpeedTestOpts) (madmin.DriveSpeedTestResult, error) {
|
|
queryVals := make(url.Values)
|
|
if opts.Serial {
|
|
queryVals.Set("serial", "true")
|
|
}
|
|
queryVals.Set("blocksize", strconv.FormatUint(opts.BlockSize, 10))
|
|
queryVals.Set("filesize", strconv.FormatUint(opts.FileSize, 10))
|
|
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodDriveSpeedTest, queryVals, nil, -1)
|
|
if err != nil {
|
|
return madmin.DriveSpeedTestResult{}, err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
waitReader, err := waitForHTTPResponse(respBody)
|
|
if err != nil {
|
|
return madmin.DriveSpeedTestResult{}, err
|
|
}
|
|
|
|
var result madmin.DriveSpeedTestResult
|
|
err = gob.NewDecoder(waitReader).Decode(&result)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
if result.Error != "" {
|
|
return result, errors.New(result.Error)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (client *peerRESTClient) ReloadSiteReplicationConfig(ctx context.Context) error {
|
|
respBody, err := client.callWithContext(context.Background(), peerRESTMethodReloadSiteReplicationConfig, nil, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
func (client *peerRESTClient) GetLastDayTierStats(ctx context.Context) (DailyAllTierStats, error) {
|
|
var result map[string]lastDayTierStats
|
|
respBody, err := client.callWithContext(context.Background(), peerRESTMethodGetLastDayTierStats, nil, nil, -1)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
|
|
err = gob.NewDecoder(respBody).Decode(&result)
|
|
if err != nil {
|
|
return DailyAllTierStats{}, err
|
|
}
|
|
return DailyAllTierStats(result), nil
|
|
}
|
|
|
|
// DevNull - Used by netperf to pump data to peer
|
|
func (client *peerRESTClient) DevNull(ctx context.Context, r io.Reader) error {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodDevNull, nil, r, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
return err
|
|
}
|
|
|
|
// Netperf - To initiate netperf on peer
|
|
func (client *peerRESTClient) Netperf(ctx context.Context, duration time.Duration) (madmin.NetperfNodeResult, error) {
|
|
var result madmin.NetperfNodeResult
|
|
values := make(url.Values)
|
|
values.Set(peerRESTDuration, duration.String())
|
|
respBody, err := client.callWithContext(context.Background(), peerRESTMethodNetperf, values, nil, -1)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
defer xhttp.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&result)
|
|
return result, err
|
|
}
|