mirror of
https://github.com/minio/minio.git
synced 2025-01-24 13:13:16 -05:00
ac07df2985
start watcher after all creds have been loaded to avoid any conflicting locks that might get deadlocked. Deprecate unused peer calls for LoadUsers()
1144 lines
31 KiB
Go
1144 lines
31 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/gob"
|
|
"io"
|
|
"io/ioutil"
|
|
"math"
|
|
"math/rand"
|
|
"net/url"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/minio/minio/cmd/http"
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/cmd/rest"
|
|
bucketsse "github.com/minio/minio/pkg/bucket/encryption"
|
|
"github.com/minio/minio/pkg/bucket/lifecycle"
|
|
objectlock "github.com/minio/minio/pkg/bucket/object/lock"
|
|
"github.com/minio/minio/pkg/bucket/policy"
|
|
"github.com/minio/minio/pkg/event"
|
|
"github.com/minio/minio/pkg/madmin"
|
|
xnet "github.com/minio/minio/pkg/net"
|
|
trace "github.com/minio/minio/pkg/trace"
|
|
)
|
|
|
|
const (
|
|
kiB int64 = 1 << 10
|
|
miB int64 = kiB << 10
|
|
giB int64 = miB << 10
|
|
)
|
|
|
|
// client to talk to peer Nodes.
|
|
type peerRESTClient struct {
|
|
host *xnet.Host
|
|
restClient *rest.Client
|
|
connected int32
|
|
}
|
|
|
|
// Reconnect to a peer rest server.
|
|
func (client *peerRESTClient) reConnect() {
|
|
atomic.StoreInt32(&client.connected, 1)
|
|
}
|
|
|
|
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
|
|
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
|
|
// after verifying format.json
|
|
func (client *peerRESTClient) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
|
return client.callWithContext(context.Background(), method, values, body, length)
|
|
}
|
|
|
|
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
|
|
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
|
|
// after verifying format.json
|
|
func (client *peerRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
|
if !client.IsOnline() {
|
|
client.reConnect()
|
|
}
|
|
|
|
if values == nil {
|
|
values = make(url.Values)
|
|
}
|
|
|
|
respBody, err = client.restClient.CallWithContext(ctx, method, values, body, length)
|
|
if err == nil {
|
|
return respBody, nil
|
|
}
|
|
|
|
if isNetworkError(err) {
|
|
atomic.StoreInt32(&client.connected, 0)
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
|
|
// Stringer provides a canonicalized representation of node.
|
|
func (client *peerRESTClient) String() string {
|
|
return client.host.String()
|
|
}
|
|
|
|
// IsOnline - returns whether RPC client failed to connect or not.
|
|
func (client *peerRESTClient) IsOnline() bool {
|
|
return atomic.LoadInt32(&client.connected) == 1
|
|
}
|
|
|
|
// Close - marks the client as closed.
|
|
func (client *peerRESTClient) Close() error {
|
|
atomic.StoreInt32(&client.connected, 0)
|
|
client.restClient.Close()
|
|
return nil
|
|
}
|
|
|
|
// GetLocksResp stores various info from the client for each lock that is requested.
|
|
type GetLocksResp []map[string][]lockRequesterInfo
|
|
|
|
// NetReadPerfInfo - fetch network read performance information for a remote node.
|
|
func (client *peerRESTClient) NetReadPerfInfo(size int64) (info ServerNetReadPerfInfo, err error) {
|
|
params := make(url.Values)
|
|
params.Set(peerRESTNetPerfSize, strconv.FormatInt(size, 10))
|
|
respBody, err := client.call(
|
|
peerRESTMethodNetReadPerfInfo,
|
|
params,
|
|
rand.New(rand.NewSource(time.Now().UnixNano())),
|
|
size,
|
|
)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// CollectNetPerfInfo - collect network performance information of other peers.
|
|
func (client *peerRESTClient) CollectNetPerfInfo(size int64) (info []ServerNetReadPerfInfo, err error) {
|
|
params := make(url.Values)
|
|
params.Set(peerRESTNetPerfSize, strconv.FormatInt(size, 10))
|
|
respBody, err := client.call(peerRESTMethodCollectNetPerfInfo, params, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// GetLocks - fetch older locks for a remote node.
|
|
func (client *peerRESTClient) GetLocks() (locks GetLocksResp, err error) {
|
|
respBody, err := client.call(peerRESTMethodGetLocks, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&locks)
|
|
return locks, err
|
|
}
|
|
|
|
// ServerInfo - fetch server information for a remote node.
|
|
func (client *peerRESTClient) ServerInfo() (info madmin.ServerProperties, err error) {
|
|
respBody, err := client.call(peerRESTMethodServerInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// CPULoadInfo - fetch CPU information for a remote node.
|
|
func (client *peerRESTClient) CPULoadInfo() (info ServerCPULoadInfo, err error) {
|
|
respBody, err := client.call(peerRESTMethodCPULoadInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// CPUInfo - fetch CPU hardware information for a remote node.
|
|
func (client *peerRESTClient) CPUInfo() (info madmin.ServerCPUHardwareInfo, err error) {
|
|
respBody, err := client.call(peerRESTMethodHardwareCPUInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// NetworkInfo - fetch network hardware information for a remote node.
|
|
func (client *peerRESTClient) NetworkInfo() (info madmin.ServerNetworkHardwareInfo, err error) {
|
|
respBody, err := client.call(peerRESTMethodHardwareNetworkInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
type networkOverloadedErr struct{}
|
|
|
|
var networkOverloaded networkOverloadedErr
|
|
|
|
func (n networkOverloadedErr) Error() string {
|
|
return "network overloaded"
|
|
}
|
|
|
|
type progressReader struct {
|
|
r io.Reader
|
|
progressChan chan int64
|
|
}
|
|
|
|
func (p *progressReader) Read(b []byte) (int, error) {
|
|
n, err := p.r.Read(b)
|
|
if err != nil && err != io.EOF {
|
|
return n, err
|
|
}
|
|
p.progressChan <- int64(n)
|
|
return n, err
|
|
}
|
|
|
|
func (client *peerRESTClient) doNetOBDTest(ctx context.Context, dataSize int64, threadCount uint) (info madmin.NetOBDInfo, err error) {
|
|
latencies := []float64{}
|
|
throughputs := []float64{}
|
|
|
|
buf := make([]byte, dataSize)
|
|
|
|
buflimiter := make(chan struct{}, threadCount)
|
|
errChan := make(chan error, threadCount)
|
|
|
|
totalTransferred := int64(0)
|
|
transferChan := make(chan int64, threadCount)
|
|
go func() {
|
|
for v := range transferChan {
|
|
atomic.AddInt64(&totalTransferred, v)
|
|
}
|
|
}()
|
|
|
|
// ensure enough samples to obtain normal distribution
|
|
maxSamples := int(10 * threadCount)
|
|
|
|
innerCtx, cancel := context.WithCancel(ctx)
|
|
|
|
slowSamples := int32(0)
|
|
maxSlowSamples := int32(maxSamples / 20)
|
|
slowSample := func() {
|
|
if slowSamples > maxSlowSamples { // 5% of total
|
|
return
|
|
}
|
|
if atomic.AddInt32(&slowSamples, 1) >= maxSlowSamples {
|
|
errChan <- networkOverloaded
|
|
cancel()
|
|
}
|
|
}
|
|
|
|
wg := sync.WaitGroup{}
|
|
finish := func() {
|
|
<-buflimiter
|
|
wg.Done()
|
|
}
|
|
|
|
for i := 0; i < maxSamples; i++ {
|
|
select {
|
|
case <-ctx.Done():
|
|
return info, ctx.Err()
|
|
case err = <-errChan:
|
|
case buflimiter <- struct{}{}:
|
|
wg.Add(1)
|
|
|
|
if innerCtx.Err() != nil {
|
|
finish()
|
|
continue
|
|
}
|
|
|
|
go func(i int) {
|
|
bufReader := bytes.NewReader(buf)
|
|
bufReadCloser := ioutil.NopCloser(&progressReader{
|
|
r: bufReader,
|
|
progressChan: transferChan,
|
|
})
|
|
start := time.Now()
|
|
before := atomic.LoadInt64(&totalTransferred)
|
|
|
|
ctx, cancel := context.WithTimeout(innerCtx, 10*time.Second)
|
|
defer cancel()
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodNetOBDInfo, nil, bufReadCloser, dataSize)
|
|
if err != nil {
|
|
|
|
if netErr, ok := err.(*rest.NetworkError); ok {
|
|
if urlErr, ok := netErr.Err.(*url.Error); ok {
|
|
if urlErr.Err.Error() == context.DeadlineExceeded.Error() {
|
|
slowSample()
|
|
finish()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
errChan <- err
|
|
finish()
|
|
return
|
|
}
|
|
http.DrainBody(respBody)
|
|
|
|
after := atomic.LoadInt64(&totalTransferred)
|
|
finish()
|
|
end := time.Now()
|
|
|
|
latency := float64(end.Sub(start).Seconds())
|
|
|
|
if latency > maxLatencyForSizeThreads(dataSize, threadCount) {
|
|
slowSample()
|
|
}
|
|
|
|
/* Throughput = (total data transferred across all threads / time taken) */
|
|
throughput := float64(float64((after - before)) / latency)
|
|
|
|
latencies = append(latencies, latency)
|
|
throughputs = append(throughputs, throughput)
|
|
}(i)
|
|
}
|
|
}
|
|
wg.Wait()
|
|
|
|
if err != nil {
|
|
return info, err
|
|
}
|
|
|
|
latency, throughput, err := xnet.ComputeOBDStats(latencies, throughputs)
|
|
info = madmin.NetOBDInfo{
|
|
Latency: latency,
|
|
Throughput: throughput,
|
|
}
|
|
return info, err
|
|
|
|
}
|
|
|
|
func maxLatencyForSizeThreads(size int64, threadCount uint) float64 {
|
|
Gbit100 := 12.5 * float64(giB)
|
|
Gbit40 := 5.00 * float64(giB)
|
|
Gbit25 := 3.25 * float64(giB)
|
|
Gbit10 := 1.25 * float64(giB)
|
|
// Gbit1 := 0.25 * float64(giB)
|
|
|
|
// Given the current defaults, each combination of size/thread
|
|
// is supposed to fully saturate the intended pipe when all threads are active
|
|
// i.e. if the test is performed in a perfectly controlled environment, i.e. without
|
|
// CPU scheduling latencies and/or network jitters, then all threads working
|
|
// simultaneously should result in each of them completing in 1s
|
|
//
|
|
// In reality, I've assumed a normal distribution of latency with expected mean of 1s and min of 0s
|
|
// Then, 95% of threads should complete within 2 seconds (2 std. deviations from the mean). The 2s comes
|
|
// from fitting the normal curve such that the mean is 1.
|
|
//
|
|
// i.e. we expect that no more than 5% of threads to take longer than 2s to push the data.
|
|
//
|
|
// throughput | max latency
|
|
// 100 Gbit | 2s
|
|
// 40 Gbit | 2s
|
|
// 25 Gbit | 2s
|
|
// 10 Gbit | 2s
|
|
// 1 Gbit | inf
|
|
|
|
throughput := float64(int64(size) * int64(threadCount))
|
|
if throughput >= Gbit100 {
|
|
return 2.0
|
|
} else if throughput >= Gbit40 {
|
|
return 2.0
|
|
} else if throughput >= Gbit25 {
|
|
return 2.0
|
|
} else if throughput >= Gbit10 {
|
|
return 2.0
|
|
}
|
|
return math.MaxFloat64
|
|
}
|
|
|
|
// NetOBDInfo - fetch Net OBD information for a remote node.
|
|
func (client *peerRESTClient) NetOBDInfo(ctx context.Context) (info madmin.NetOBDInfo, err error) {
|
|
|
|
// 100 Gbit -> 256 MiB * 50 threads
|
|
// 40 Gbit -> 256 MiB * 20 threads
|
|
// 25 Gbit -> 128 MiB * 25 threads
|
|
// 10 Gbit -> 128 MiB * 10 threads
|
|
// 1 Gbit -> 64 MiB * 2 threads
|
|
|
|
type step struct {
|
|
size int64
|
|
threads uint
|
|
}
|
|
steps := []step{
|
|
{ // 100 Gbit
|
|
size: 256 * miB,
|
|
threads: 50,
|
|
},
|
|
{ // 40 Gbit
|
|
size: 256 * miB,
|
|
threads: 20,
|
|
},
|
|
{ // 25 Gbit
|
|
size: 128 * miB,
|
|
threads: 25,
|
|
},
|
|
{ // 10 Gbit
|
|
size: 128 * miB,
|
|
threads: 10,
|
|
},
|
|
{ // 1 Gbit
|
|
size: 64 * miB,
|
|
threads: 2,
|
|
},
|
|
}
|
|
|
|
for i := range steps {
|
|
size := steps[i].size
|
|
threads := steps[i].threads
|
|
|
|
if info, err = client.doNetOBDTest(ctx, size, threads); err != nil {
|
|
if err == networkOverloaded {
|
|
continue
|
|
}
|
|
|
|
if netErr, ok := err.(*rest.NetworkError); ok {
|
|
if urlErr, ok := netErr.Err.(*url.Error); ok {
|
|
if urlErr.Err.Error() == context.Canceled.Error() {
|
|
continue
|
|
}
|
|
if urlErr.Err.Error() == context.DeadlineExceeded.Error() {
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return info, err
|
|
}
|
|
return info, err
|
|
}
|
|
|
|
// DispatchNetOBDInfo - dispatch other nodes to run Net OBD.
|
|
func (client *peerRESTClient) DispatchNetOBDInfo(ctx context.Context) (info madmin.ServerNetOBDInfo, err error) {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodDispatchNetOBDInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return
|
|
}
|
|
|
|
// DriveOBDInfo - fetch Drive OBD information for a remote node.
|
|
func (client *peerRESTClient) DriveOBDInfo(ctx context.Context) (info madmin.ServerDrivesOBDInfo, err error) {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodDriveOBDInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// CPUOBDInfo - fetch CPU OBD information for a remote node.
|
|
func (client *peerRESTClient) CPUOBDInfo(ctx context.Context) (info madmin.ServerCPUOBDInfo, err error) {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodCPUOBDInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// DiskHwOBDInfo - fetch Disk HW OBD information for a remote node.
|
|
func (client *peerRESTClient) DiskHwOBDInfo(ctx context.Context) (info madmin.ServerDiskHwOBDInfo, err error) {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodDiskHwOBDInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// OsOBDInfo - fetch OsInfo OBD information for a remote node.
|
|
func (client *peerRESTClient) OsOBDInfo(ctx context.Context) (info madmin.ServerOsOBDInfo, err error) {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodOsInfoOBDInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// MemOBDInfo - fetch MemInfo OBD information for a remote node.
|
|
func (client *peerRESTClient) MemOBDInfo(ctx context.Context) (info madmin.ServerMemOBDInfo, err error) {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodMemOBDInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// ProcOBDInfo - fetch ProcInfo OBD information for a remote node.
|
|
func (client *peerRESTClient) ProcOBDInfo(ctx context.Context) (info madmin.ServerProcOBDInfo, err error) {
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodProcOBDInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// DrivePerfInfo - fetch Drive performance information for a remote node.
|
|
func (client *peerRESTClient) DrivePerfInfo(size int64) (info madmin.ServerDrivesPerfInfo, err error) {
|
|
params := make(url.Values)
|
|
params.Set(peerRESTDrivePerfSize, strconv.FormatInt(size, 10))
|
|
respBody, err := client.call(peerRESTMethodDrivePerfInfo, params, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// MemUsageInfo - fetch memory usage information for a remote node.
|
|
func (client *peerRESTClient) MemUsageInfo() (info ServerMemUsageInfo, err error) {
|
|
respBody, err := client.call(peerRESTMethodMemUsageInfo, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&info)
|
|
return info, err
|
|
}
|
|
|
|
// StartProfiling - Issues profiling command on the peer node.
|
|
func (client *peerRESTClient) StartProfiling(profiler string) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTProfiler, profiler)
|
|
respBody, err := client.call(peerRESTMethodStartProfiling, values, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// DownloadProfileData - download profiled data from a remote node.
|
|
func (client *peerRESTClient) DownloadProfileData() (data map[string][]byte, err error) {
|
|
respBody, err := client.call(peerRESTMethodDownloadProfilingData, nil, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&data)
|
|
return data, err
|
|
}
|
|
|
|
// DeleteBucket - Delete notification and policies related to the bucket.
|
|
func (client *peerRESTClient) DeleteBucket(bucket string) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTBucket, bucket)
|
|
respBody, err := client.call(peerRESTMethodDeleteBucket, values, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// ReloadFormat - reload format on the peer node.
|
|
func (client *peerRESTClient) ReloadFormat(dryRun bool) error {
|
|
values := make(url.Values)
|
|
if dryRun {
|
|
values.Set(peerRESTDryRun, "true")
|
|
} else {
|
|
values.Set(peerRESTDryRun, "false")
|
|
}
|
|
|
|
respBody, err := client.call(peerRESTMethodReloadFormat, values, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// SendEvent - calls send event RPC.
|
|
func (client *peerRESTClient) SendEvent(bucket string, targetID, remoteTargetID event.TargetID, eventData event.Event) error {
|
|
numTries := 10
|
|
for {
|
|
err := client.sendEvent(bucket, targetID, remoteTargetID, eventData)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if numTries == 0 {
|
|
return err
|
|
}
|
|
numTries--
|
|
time.Sleep(5 * time.Second)
|
|
}
|
|
}
|
|
|
|
func (client *peerRESTClient) sendEvent(bucket string, targetID, remoteTargetID event.TargetID, eventData event.Event) error {
|
|
args := sendEventRequest{
|
|
TargetID: remoteTargetID,
|
|
Event: eventData,
|
|
}
|
|
|
|
values := make(url.Values)
|
|
values.Set(peerRESTBucket, bucket)
|
|
|
|
var reader bytes.Buffer
|
|
err := gob.NewEncoder(&reader).Encode(args)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
respBody, err := client.call(peerRESTMethodSendEvent, values, &reader, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var eventResp sendEventResp
|
|
defer http.DrainBody(respBody)
|
|
err = gob.NewDecoder(respBody).Decode(&eventResp)
|
|
|
|
if err != nil || !eventResp.Success {
|
|
reqInfo := &logger.ReqInfo{BucketName: bucket}
|
|
reqInfo.AppendTags("targetID", targetID.Name)
|
|
reqInfo.AppendTags("event", eventData.EventName.String())
|
|
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
|
logger.LogIf(ctx, err)
|
|
globalNotificationSys.RemoveRemoteTarget(bucket, targetID)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// RemoteTargetExist - calls remote target ID exist REST API.
|
|
func (client *peerRESTClient) RemoteTargetExist(bucket string, targetID event.TargetID) (bool, error) {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTBucket, bucket)
|
|
|
|
var reader bytes.Buffer
|
|
err := gob.NewEncoder(&reader).Encode(targetID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
respBody, err := client.call(peerRESTMethodTargetExists, values, &reader, -1)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
var targetExists remoteTargetExistsResp
|
|
err = gob.NewDecoder(respBody).Decode(&targetExists)
|
|
return targetExists.Exists, err
|
|
}
|
|
|
|
// RemoveBucketPolicy - Remove bucket policy on the peer node.
|
|
func (client *peerRESTClient) RemoveBucketPolicy(bucket string) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTBucket, bucket)
|
|
respBody, err := client.call(peerRESTMethodBucketPolicyRemove, values, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// RemoveBucketObjectLockConfig - Remove bucket object lock config on the peer node.
|
|
func (client *peerRESTClient) RemoveBucketObjectLockConfig(bucket string) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTBucket, bucket)
|
|
respBody, err := client.call(peerRESTMethodBucketObjectLockConfigRemove, values, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// SetBucketPolicy - Set bucket policy on the peer node.
|
|
func (client *peerRESTClient) SetBucketPolicy(bucket string, bucketPolicy *policy.Policy) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTBucket, bucket)
|
|
|
|
var reader bytes.Buffer
|
|
err := gob.NewEncoder(&reader).Encode(bucketPolicy)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
respBody, err := client.call(peerRESTMethodBucketPolicySet, values, &reader, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// RemoveBucketLifecycle - Remove bucket lifecycle configuration on the peer node
|
|
func (client *peerRESTClient) RemoveBucketLifecycle(bucket string) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTBucket, bucket)
|
|
respBody, err := client.call(peerRESTMethodBucketLifecycleRemove, values, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// SetBucketLifecycle - Set bucket lifecycle configuration on the peer node
|
|
func (client *peerRESTClient) SetBucketLifecycle(bucket string, bucketLifecycle *lifecycle.Lifecycle) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTBucket, bucket)
|
|
|
|
var reader bytes.Buffer
|
|
err := gob.NewEncoder(&reader).Encode(bucketLifecycle)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
respBody, err := client.call(peerRESTMethodBucketLifecycleSet, values, &reader, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// RemoveBucketSSEConfig - Remove bucket encryption configuration on the peer node
|
|
func (client *peerRESTClient) RemoveBucketSSEConfig(bucket string) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTBucket, bucket)
|
|
respBody, err := client.call(peerRESTMethodBucketEncryptionRemove, values, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// SetBucketSSEConfig - Set bucket encryption configuration on the peer node
|
|
func (client *peerRESTClient) SetBucketSSEConfig(bucket string, encConfig *bucketsse.BucketSSEConfig) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTBucket, bucket)
|
|
|
|
var reader bytes.Buffer
|
|
err := gob.NewEncoder(&reader).Encode(encConfig)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
respBody, err := client.call(peerRESTMethodBucketEncryptionSet, values, &reader, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// PutBucketNotification - Put bucket notification on the peer node.
|
|
func (client *peerRESTClient) PutBucketNotification(bucket string, rulesMap event.RulesMap) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTBucket, bucket)
|
|
|
|
var reader bytes.Buffer
|
|
err := gob.NewEncoder(&reader).Encode(&rulesMap)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
respBody, err := client.call(peerRESTMethodBucketNotificationPut, values, &reader, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// PutBucketObjectLockConfig - PUT bucket object lock configuration.
|
|
func (client *peerRESTClient) PutBucketObjectLockConfig(bucket string, retention objectlock.Retention) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTBucket, bucket)
|
|
|
|
var reader bytes.Buffer
|
|
err := gob.NewEncoder(&reader).Encode(&retention)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
respBody, err := client.call(peerRESTMethodPutBucketObjectLockConfig, values, &reader, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// DeletePolicy - delete a specific canned policy.
|
|
func (client *peerRESTClient) DeletePolicy(policyName string) (err error) {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTPolicy, policyName)
|
|
|
|
respBody, err := client.call(peerRESTMethodDeletePolicy, values, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// LoadPolicy - reload a specific canned policy.
|
|
func (client *peerRESTClient) LoadPolicy(policyName string) (err error) {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTPolicy, policyName)
|
|
|
|
respBody, err := client.call(peerRESTMethodLoadPolicy, values, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// LoadPolicyMapping - reload a specific policy mapping
|
|
func (client *peerRESTClient) LoadPolicyMapping(userOrGroup string, isGroup bool) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTUserOrGroup, userOrGroup)
|
|
if isGroup {
|
|
values.Set(peerRESTIsGroup, "")
|
|
}
|
|
|
|
respBody, err := client.call(peerRESTMethodLoadPolicyMapping, values, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// DeleteUser - delete a specific user.
|
|
func (client *peerRESTClient) DeleteUser(accessKey string) (err error) {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTUser, accessKey)
|
|
|
|
respBody, err := client.call(peerRESTMethodDeleteUser, values, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// LoadUser - reload a specific user.
|
|
func (client *peerRESTClient) LoadUser(accessKey string, temp bool) (err error) {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTUser, accessKey)
|
|
values.Set(peerRESTUserTemp, strconv.FormatBool(temp))
|
|
|
|
respBody, err := client.call(peerRESTMethodLoadUser, values, nil, -1)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// LoadGroup - send load group command to peers.
|
|
func (client *peerRESTClient) LoadGroup(group string) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTGroup, group)
|
|
respBody, err := client.call(peerRESTMethodLoadGroup, values, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// ServerUpdate - sends server update message to remote peers.
|
|
func (client *peerRESTClient) ServerUpdate(updateURL, sha256Hex string, latestReleaseTime time.Time) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTUpdateURL, updateURL)
|
|
values.Set(peerRESTSha256Hex, sha256Hex)
|
|
if !latestReleaseTime.IsZero() {
|
|
values.Set(peerRESTLatestRelease, latestReleaseTime.Format(time.RFC3339))
|
|
} else {
|
|
values.Set(peerRESTLatestRelease, "")
|
|
}
|
|
respBody, err := client.call(peerRESTMethodServerUpdate, values, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
// SignalService - sends signal to peer nodes.
|
|
func (client *peerRESTClient) SignalService(sig serviceSignal) error {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTSignal, strconv.Itoa(int(sig)))
|
|
respBody, err := client.call(peerRESTMethodSignalService, values, nil, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
return nil
|
|
}
|
|
|
|
func (client *peerRESTClient) BackgroundHealStatus() (madmin.BgHealState, error) {
|
|
respBody, err := client.call(peerRESTMethodBackgroundHealStatus, nil, nil, -1)
|
|
if err != nil {
|
|
return madmin.BgHealState{}, err
|
|
}
|
|
defer http.DrainBody(respBody)
|
|
|
|
state := madmin.BgHealState{}
|
|
err = gob.NewDecoder(respBody).Decode(&state)
|
|
return state, err
|
|
}
|
|
|
|
func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh chan struct{}, trcAll, trcErr bool) {
|
|
values := make(url.Values)
|
|
values.Set(peerRESTTraceAll, strconv.FormatBool(trcAll))
|
|
values.Set(peerRESTTraceErr, strconv.FormatBool(trcErr))
|
|
|
|
// To cancel the REST request in case doneCh gets closed.
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
cancelCh := make(chan struct{})
|
|
defer close(cancelCh)
|
|
go func() {
|
|
select {
|
|
case <-doneCh:
|
|
case <-cancelCh:
|
|
// There was an error in the REST request.
|
|
}
|
|
cancel()
|
|
}()
|
|
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodTrace, values, nil, -1)
|
|
defer http.DrainBody(respBody)
|
|
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
dec := gob.NewDecoder(respBody)
|
|
for {
|
|
var info trace.Info
|
|
if err = dec.Decode(&info); err != nil {
|
|
return
|
|
}
|
|
if len(info.NodeName) > 0 {
|
|
select {
|
|
case traceCh <- info:
|
|
default:
|
|
// Do not block on slow receivers.
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (client *peerRESTClient) doListen(listenCh chan interface{}, doneCh chan struct{}, v url.Values) {
|
|
// To cancel the REST request in case doneCh gets closed.
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
cancelCh := make(chan struct{})
|
|
defer close(cancelCh)
|
|
go func() {
|
|
select {
|
|
case <-doneCh:
|
|
case <-cancelCh:
|
|
// There was an error in the REST request.
|
|
}
|
|
cancel()
|
|
}()
|
|
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodListen, v, nil, -1)
|
|
defer http.DrainBody(respBody)
|
|
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
dec := gob.NewDecoder(respBody)
|
|
for {
|
|
var ev event.Event
|
|
if err = dec.Decode(&ev); err != nil {
|
|
return
|
|
}
|
|
if len(ev.EventVersion) > 0 {
|
|
select {
|
|
case listenCh <- ev:
|
|
default:
|
|
// Do not block on slow receivers.
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Listen - listen on peers.
|
|
func (client *peerRESTClient) Listen(listenCh chan interface{}, doneCh chan struct{}, v url.Values) {
|
|
go func() {
|
|
for {
|
|
client.doListen(listenCh, doneCh, v)
|
|
select {
|
|
case <-doneCh:
|
|
return
|
|
default:
|
|
// There was error in the REST request, retry after sometime as probably the peer is down.
|
|
time.Sleep(5 * time.Second)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Trace - send http trace request to peer nodes
|
|
func (client *peerRESTClient) Trace(traceCh chan interface{}, doneCh chan struct{}, trcAll, trcErr bool) {
|
|
go func() {
|
|
for {
|
|
client.doTrace(traceCh, doneCh, trcAll, trcErr)
|
|
select {
|
|
case <-doneCh:
|
|
return
|
|
default:
|
|
// There was error in the REST request, retry after sometime as probably the peer is down.
|
|
time.Sleep(5 * time.Second)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// ConsoleLog - sends request to peer nodes to get console logs
|
|
func (client *peerRESTClient) ConsoleLog(logCh chan interface{}, doneCh chan struct{}) {
|
|
go func() {
|
|
for {
|
|
// get cancellation context to properly unsubscribe peers
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
respBody, err := client.callWithContext(ctx, peerRESTMethodLog, nil, nil, -1)
|
|
if err != nil {
|
|
// Retry the failed request.
|
|
time.Sleep(5 * time.Second)
|
|
} else {
|
|
dec := gob.NewDecoder(respBody)
|
|
|
|
go func() {
|
|
<-doneCh
|
|
cancel()
|
|
}()
|
|
|
|
for {
|
|
var log madmin.LogInfo
|
|
if err = dec.Decode(&log); err != nil {
|
|
break
|
|
}
|
|
select {
|
|
case logCh <- log:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-doneCh:
|
|
cancel()
|
|
http.DrainBody(respBody)
|
|
return
|
|
default:
|
|
// There was error in the REST request, retry.
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func getRemoteHosts(endpointZones EndpointZones) []*xnet.Host {
|
|
var remoteHosts []*xnet.Host
|
|
for _, hostStr := range GetRemotePeers(endpointZones) {
|
|
host, err := xnet.ParseHost(hostStr)
|
|
if err != nil {
|
|
logger.LogIf(context.Background(), err)
|
|
continue
|
|
}
|
|
remoteHosts = append(remoteHosts, host)
|
|
}
|
|
|
|
return remoteHosts
|
|
}
|
|
|
|
func getRestClients(endpoints EndpointZones) []*peerRESTClient {
|
|
peerHosts := getRemoteHosts(endpoints)
|
|
restClients := make([]*peerRESTClient, len(peerHosts))
|
|
for i, host := range peerHosts {
|
|
client, err := newPeerRESTClient(host)
|
|
if err != nil {
|
|
logger.LogIf(context.Background(), err)
|
|
continue
|
|
}
|
|
restClients[i] = client
|
|
}
|
|
|
|
return restClients
|
|
}
|
|
|
|
// Returns a peer rest client.
|
|
func newPeerRESTClient(peer *xnet.Host) (*peerRESTClient, error) {
|
|
|
|
scheme := "http"
|
|
if globalIsSSL {
|
|
scheme = "https"
|
|
}
|
|
|
|
serverURL := &url.URL{
|
|
Scheme: scheme,
|
|
Host: peer.String(),
|
|
Path: peerRESTPath,
|
|
}
|
|
|
|
var tlsConfig *tls.Config
|
|
if globalIsSSL {
|
|
tlsConfig = &tls.Config{
|
|
ServerName: peer.Name,
|
|
RootCAs: globalRootCAs,
|
|
}
|
|
}
|
|
|
|
trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout, rest.DefaultRESTTimeout)
|
|
restClient, err := rest.NewClient(serverURL, trFn, newAuthToken)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &peerRESTClient{host: peer, restClient: restClient, connected: 1}, nil
|
|
}
|