mirror of
https://github.com/minio/minio.git
synced 2025-01-24 13:13:16 -05:00
95bf4a57b6
Create new code paths for multiple subsystems in the code. This will make maintaing this easier later. Also introduce bugLogIf() for errors that should not happen in the first place.
1376 lines
52 KiB
Go
1376 lines
52 KiB
Go
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/gob"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/klauspost/compress/zstd"
|
|
"github.com/minio/madmin-go/v3"
|
|
"github.com/minio/minio/internal/bucket/bandwidth"
|
|
b "github.com/minio/minio/internal/bucket/bandwidth"
|
|
"github.com/minio/minio/internal/event"
|
|
"github.com/minio/minio/internal/grid"
|
|
xioutil "github.com/minio/minio/internal/ioutil"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/minio/minio/internal/pubsub"
|
|
"github.com/minio/mux"
|
|
"github.com/minio/pkg/v2/logger/message/log"
|
|
)
|
|
|
|
// To abstract a node over network.
|
|
type peerRESTServer struct{}
|
|
|
|
var (
|
|
// Types & Wrappers
|
|
aoBucketInfo = grid.NewArrayOf[*BucketInfo](func() *BucketInfo { return &BucketInfo{} })
|
|
aoMetricsGroup = grid.NewArrayOf[*MetricV2](func() *MetricV2 { return &MetricV2{} })
|
|
madminBgHealState = grid.NewJSONPool[madmin.BgHealState]()
|
|
madminCPUs = grid.NewJSONPool[madmin.CPUs]()
|
|
madminMemInfo = grid.NewJSONPool[madmin.MemInfo]()
|
|
madminNetInfo = grid.NewJSONPool[madmin.NetInfo]()
|
|
madminOSInfo = grid.NewJSONPool[madmin.OSInfo]()
|
|
madminPartitions = grid.NewJSONPool[madmin.Partitions]()
|
|
madminProcInfo = grid.NewJSONPool[madmin.ProcInfo]()
|
|
madminRealtimeMetrics = grid.NewJSONPool[madmin.RealtimeMetrics]()
|
|
madminServerProperties = grid.NewJSONPool[madmin.ServerProperties]()
|
|
madminStorageInfo = grid.NewJSONPool[madmin.StorageInfo]()
|
|
madminSysConfig = grid.NewJSONPool[madmin.SysConfig]()
|
|
madminSysErrors = grid.NewJSONPool[madmin.SysErrors]()
|
|
madminSysServices = grid.NewJSONPool[madmin.SysServices]()
|
|
|
|
// Request -> Response RPC calls
|
|
deleteBucketMetadataRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerDeleteBucketMetadata, grid.NewMSS, grid.NewNoPayload).IgnoreNilConn()
|
|
deleteBucketRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerDeleteBucket, grid.NewMSS, grid.NewNoPayload)
|
|
deletePolicyRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerDeletePolicy, grid.NewMSS, grid.NewNoPayload).IgnoreNilConn()
|
|
deleteSvcActRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerDeleteServiceAccount, grid.NewMSS, grid.NewNoPayload).IgnoreNilConn()
|
|
deleteUserRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerDeleteUser, grid.NewMSS, grid.NewNoPayload).IgnoreNilConn()
|
|
getAllBucketStatsRPC = grid.NewSingleHandler[*grid.MSS, *BucketStatsMap](grid.HandlerGetAllBucketStats, grid.NewMSS, func() *BucketStatsMap { return &BucketStatsMap{} })
|
|
getBackgroundHealStatusRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.BgHealState]](grid.HandlerBackgroundHealStatus, grid.NewMSS, madminBgHealState.NewJSON)
|
|
getBandwidthRPC = grid.NewSingleHandler[*grid.URLValues, *bandwidth.BucketBandwidthReport](grid.HandlerGetBandwidth, grid.NewURLValues, func() *bandwidth.BucketBandwidthReport { return &bandwidth.BucketBandwidthReport{} })
|
|
getBucketStatsRPC = grid.NewSingleHandler[*grid.MSS, *BucketStats](grid.HandlerGetBucketStats, grid.NewMSS, func() *BucketStats { return &BucketStats{} })
|
|
getCPUsHandler = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.CPUs]](grid.HandlerGetCPUs, grid.NewMSS, madminCPUs.NewJSON)
|
|
getLastDayTierStatsRPC = grid.NewSingleHandler[*grid.MSS, *DailyAllTierStats](grid.HandlerGetLastDayTierStats, grid.NewMSS, func() *DailyAllTierStats { return &DailyAllTierStats{} })
|
|
getLocksRPC = grid.NewSingleHandler[*grid.MSS, *localLockMap](grid.HandlerGetLocks, grid.NewMSS, func() *localLockMap { return &localLockMap{} })
|
|
getMemInfoRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.MemInfo]](grid.HandlerGetMemInfo, grid.NewMSS, madminMemInfo.NewJSON)
|
|
getMetacacheListingRPC = grid.NewSingleHandler[*listPathOptions, *metacache](grid.HandlerGetMetacacheListing, func() *listPathOptions { return &listPathOptions{} }, func() *metacache { return &metacache{} })
|
|
getMetricsRPC = grid.NewSingleHandler[*grid.URLValues, *grid.JSON[madmin.RealtimeMetrics]](grid.HandlerGetMetrics, grid.NewURLValues, madminRealtimeMetrics.NewJSON)
|
|
getNetInfoRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.NetInfo]](grid.HandlerGetNetInfo, grid.NewMSS, madminNetInfo.NewJSON)
|
|
getOSInfoRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.OSInfo]](grid.HandlerGetOSInfo, grid.NewMSS, madminOSInfo.NewJSON)
|
|
getPartitionsRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.Partitions]](grid.HandlerGetPartitions, grid.NewMSS, madminPartitions.NewJSON)
|
|
getPeerBucketMetricsRPC = grid.NewSingleHandler[*grid.MSS, *grid.Array[*MetricV2]](grid.HandlerGetPeerBucketMetrics, grid.NewMSS, aoMetricsGroup.New)
|
|
getPeerMetricsRPC = grid.NewSingleHandler[*grid.MSS, *grid.Array[*MetricV2]](grid.HandlerGetPeerMetrics, grid.NewMSS, aoMetricsGroup.New)
|
|
getResourceMetricsRPC = grid.NewSingleHandler[*grid.MSS, *grid.Array[*MetricV2]](grid.HandlerGetResourceMetrics, grid.NewMSS, aoMetricsGroup.New)
|
|
getProcInfoRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.ProcInfo]](grid.HandlerGetProcInfo, grid.NewMSS, madminProcInfo.NewJSON)
|
|
getSRMetricsRPC = grid.NewSingleHandler[*grid.MSS, *SRMetricsSummary](grid.HandlerGetSRMetrics, grid.NewMSS, func() *SRMetricsSummary { return &SRMetricsSummary{} })
|
|
getSysConfigRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.SysConfig]](grid.HandlerGetSysConfig, grid.NewMSS, madminSysConfig.NewJSON)
|
|
getSysErrorsRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.SysErrors]](grid.HandlerGetSysErrors, grid.NewMSS, madminSysErrors.NewJSON)
|
|
getSysServicesRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.SysServices]](grid.HandlerGetSysServices, grid.NewMSS, madminSysServices.NewJSON)
|
|
headBucketRPC = grid.NewSingleHandler[*grid.MSS, *VolInfo](grid.HandlerHeadBucket, grid.NewMSS, func() *VolInfo { return &VolInfo{} })
|
|
healBucketRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerHealBucket, grid.NewMSS, grid.NewNoPayload)
|
|
listBucketsRPC = grid.NewSingleHandler[*BucketOptions, *grid.Array[*BucketInfo]](grid.HandlerListBuckets, func() *BucketOptions { return &BucketOptions{} }, aoBucketInfo.New)
|
|
loadBucketMetadataRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerLoadBucketMetadata, grid.NewMSS, grid.NewNoPayload).IgnoreNilConn()
|
|
loadGroupRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerLoadGroup, grid.NewMSS, grid.NewNoPayload)
|
|
loadPolicyMappingRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerLoadPolicyMapping, grid.NewMSS, grid.NewNoPayload).IgnoreNilConn()
|
|
loadPolicyRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerLoadPolicy, grid.NewMSS, grid.NewNoPayload).IgnoreNilConn()
|
|
loadRebalanceMetaRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerLoadRebalanceMeta, grid.NewMSS, grid.NewNoPayload)
|
|
loadSvcActRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerLoadServiceAccount, grid.NewMSS, grid.NewNoPayload).IgnoreNilConn()
|
|
loadTransitionTierConfigRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerLoadTransitionTierConfig, grid.NewMSS, grid.NewNoPayload)
|
|
loadUserRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerLoadUser, grid.NewMSS, grid.NewNoPayload).IgnoreNilConn()
|
|
localStorageInfoRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.StorageInfo]](grid.HandlerStorageInfo, grid.NewMSS, madminStorageInfo.NewJSON)
|
|
makeBucketRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerMakeBucket, grid.NewMSS, grid.NewNoPayload)
|
|
reloadPoolMetaRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerReloadPoolMeta, grid.NewMSS, grid.NewNoPayload)
|
|
reloadSiteReplicationConfigRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerReloadSiteReplicationConfig, grid.NewMSS, grid.NewNoPayload)
|
|
serverInfoRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.ServerProperties]](grid.HandlerServerInfo, grid.NewMSS, madminServerProperties.NewJSON)
|
|
signalServiceRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerSignalService, grid.NewMSS, grid.NewNoPayload)
|
|
stopRebalanceRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerStopRebalance, grid.NewMSS, grid.NewNoPayload)
|
|
updateMetacacheListingRPC = grid.NewSingleHandler[*metacache, *metacache](grid.HandlerUpdateMetacacheListing, func() *metacache { return &metacache{} }, func() *metacache { return &metacache{} })
|
|
|
|
// STREAMS
|
|
// Set an output capacity of 100 for consoleLog and listenRPC
|
|
// There is another buffer that will buffer events.
|
|
consoleLogRPC = grid.NewStream[*grid.MSS, grid.NoPayload, *grid.Bytes](grid.HandlerConsoleLog, grid.NewMSS, nil, grid.NewBytes).WithOutCapacity(100)
|
|
listenRPC = grid.NewStream[*grid.URLValues, grid.NoPayload, *grid.Bytes](grid.HandlerListen, grid.NewURLValues, nil, grid.NewBytes).WithOutCapacity(100)
|
|
)
|
|
|
|
// GetLocksHandler - returns list of lock from the server.
|
|
func (s *peerRESTServer) GetLocksHandler(_ *grid.MSS) (*localLockMap, *grid.RemoteErr) {
|
|
res := globalLockServer.DupLockMap()
|
|
return &res, nil
|
|
}
|
|
|
|
// DeletePolicyHandler - deletes a policy on the server.
|
|
func (s *peerRESTServer) DeletePolicyHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return np, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
|
|
policyName := mss.Get(peerRESTPolicy)
|
|
if policyName == "" {
|
|
return np, grid.NewRemoteErr(errors.New("policyName is missing"))
|
|
}
|
|
|
|
if err := globalIAMSys.DeletePolicy(context.Background(), policyName, false); err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// LoadPolicyHandler - reloads a policy on the server.
|
|
func (s *peerRESTServer) LoadPolicyHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return np, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
|
|
policyName := mss.Get(peerRESTPolicy)
|
|
if policyName == "" {
|
|
return np, grid.NewRemoteErr(errors.New("policyName is missing"))
|
|
}
|
|
|
|
if err := globalIAMSys.LoadPolicy(context.Background(), objAPI, policyName); err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// LoadPolicyMappingHandler - reloads a policy mapping on the server.
|
|
func (s *peerRESTServer) LoadPolicyMappingHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return np, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
userOrGroup := mss.Get(peerRESTUserOrGroup)
|
|
if userOrGroup == "" {
|
|
return np, grid.NewRemoteErr(errors.New("user-or-group is missing"))
|
|
}
|
|
|
|
userType, err := strconv.Atoi(mss.Get(peerRESTUserType))
|
|
if err != nil {
|
|
return np, grid.NewRemoteErr(fmt.Errorf("user-type `%s` is invalid: %w", mss.Get(peerRESTUserType), err))
|
|
}
|
|
|
|
isGroup := mss.Get(peerRESTIsGroup) == "true"
|
|
if err := globalIAMSys.LoadPolicyMapping(context.Background(), objAPI, userOrGroup, IAMUserType(userType), isGroup); err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// DeleteServiceAccountHandler - deletes a service account on the server.
|
|
func (s *peerRESTServer) DeleteServiceAccountHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return np, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
|
|
accessKey := mss.Get(peerRESTUser)
|
|
if accessKey == "" {
|
|
return np, grid.NewRemoteErr(errors.New("service account name is missing"))
|
|
}
|
|
|
|
if err := globalIAMSys.DeleteServiceAccount(context.Background(), accessKey, false); err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// LoadServiceAccountHandler - reloads a service account on the server.
|
|
func (s *peerRESTServer) LoadServiceAccountHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return np, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
|
|
accessKey := mss.Get(peerRESTUser)
|
|
if accessKey == "" {
|
|
return np, grid.NewRemoteErr(errors.New("service account name is missing"))
|
|
}
|
|
|
|
if err := globalIAMSys.LoadServiceAccount(context.Background(), accessKey); err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// DeleteUserHandler - deletes a user on the server.
|
|
func (s *peerRESTServer) DeleteUserHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return np, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
|
|
accessKey := mss.Get(peerRESTUser)
|
|
if accessKey == "" {
|
|
return np, grid.NewRemoteErr(errors.New("username is missing"))
|
|
}
|
|
|
|
if err := globalIAMSys.DeleteUser(context.Background(), accessKey, false); err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// LoadUserHandler - reloads a user on the server.
|
|
func (s *peerRESTServer) LoadUserHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return np, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
|
|
accessKey := mss.Get(peerRESTUser)
|
|
if accessKey == "" {
|
|
return np, grid.NewRemoteErr(errors.New("username is missing"))
|
|
}
|
|
|
|
temp, err := strconv.ParseBool(mss.Get(peerRESTUserTemp))
|
|
if err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
|
|
userType := regUser
|
|
if temp {
|
|
userType = stsUser
|
|
}
|
|
|
|
if err = globalIAMSys.LoadUser(context.Background(), objAPI, accessKey, userType); err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// LoadGroupHandler - reloads group along with members list.
|
|
func (s *peerRESTServer) LoadGroupHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return np, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
|
|
group := mss.Get(peerRESTGroup)
|
|
if group == "" {
|
|
return np, grid.NewRemoteErr(errors.New("group is missing"))
|
|
}
|
|
|
|
err := globalIAMSys.LoadGroup(context.Background(), objAPI, group)
|
|
if err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// StartProfilingHandler - Issues the start profiling command.
|
|
func (s *peerRESTServer) StartProfilingHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
s.writeErrorResponse(w, errors.New("Invalid request"))
|
|
return
|
|
}
|
|
|
|
vars := mux.Vars(r)
|
|
profiles := strings.Split(vars[peerRESTProfiler], ",")
|
|
if len(profiles) == 0 {
|
|
s.writeErrorResponse(w, errors.New("profiler name is missing"))
|
|
return
|
|
}
|
|
globalProfilerMu.Lock()
|
|
defer globalProfilerMu.Unlock()
|
|
if globalProfiler == nil {
|
|
globalProfiler = make(map[string]minioProfiler, 10)
|
|
}
|
|
|
|
// Stop profiler of all types if already running
|
|
for k, v := range globalProfiler {
|
|
for _, p := range profiles {
|
|
if p == k {
|
|
v.Stop()
|
|
delete(globalProfiler, k)
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, profiler := range profiles {
|
|
prof, err := startProfiler(profiler)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
globalProfiler[profiler] = prof
|
|
}
|
|
}
|
|
|
|
// DownloadProfilingDataHandler - returns profiled data.
|
|
func (s *peerRESTServer) DownloadProfilingDataHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
s.writeErrorResponse(w, errors.New("Invalid request"))
|
|
return
|
|
}
|
|
|
|
ctx := newContext(r, w, "DownloadProfiling")
|
|
profileData, err := getProfileData()
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
peersLogIf(ctx, gob.NewEncoder(w).Encode(profileData))
|
|
}
|
|
|
|
func (s *peerRESTServer) LocalStorageInfoHandler(mss *grid.MSS) (*grid.JSON[madmin.StorageInfo], *grid.RemoteErr) {
|
|
objLayer := newObjectLayerFn()
|
|
if objLayer == nil {
|
|
return nil, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
|
|
metrics, err := strconv.ParseBool(mss.Get(peerRESTMetrics))
|
|
if err != nil {
|
|
return nil, grid.NewRemoteErr(err)
|
|
}
|
|
info := objLayer.LocalStorageInfo(context.Background(), metrics)
|
|
return madminStorageInfo.NewJSONWith(&info), nil
|
|
}
|
|
|
|
// ServerInfoHandler - returns Server Info
|
|
func (s *peerRESTServer) ServerInfoHandler(params *grid.MSS) (*grid.JSON[madmin.ServerProperties], *grid.RemoteErr) {
|
|
r := http.Request{Host: globalLocalNodeName}
|
|
metrics, err := strconv.ParseBool(params.Get(peerRESTMetrics))
|
|
if err != nil {
|
|
return nil, grid.NewRemoteErr(err)
|
|
}
|
|
info := getLocalServerProperty(globalEndpoints, &r, metrics)
|
|
return madminServerProperties.NewJSONWith(&info), nil
|
|
}
|
|
|
|
// GetCPUsHandler - returns CPU info.
|
|
func (s *peerRESTServer) GetCPUsHandler(_ *grid.MSS) (*grid.JSON[madmin.CPUs], *grid.RemoteErr) {
|
|
info := madmin.GetCPUs(context.Background(), globalLocalNodeName)
|
|
return madminCPUs.NewJSONWith(&info), nil
|
|
}
|
|
|
|
// GetNetInfoHandler - returns network information.
|
|
func (s *peerRESTServer) GetNetInfoHandler(_ *grid.MSS) (*grid.JSON[madmin.NetInfo], *grid.RemoteErr) {
|
|
info := madmin.GetNetInfo(globalLocalNodeName, globalInternodeInterface)
|
|
return madminNetInfo.NewJSONWith(&info), nil
|
|
}
|
|
|
|
// GetPartitionsHandler - returns disk partition information.
|
|
func (s *peerRESTServer) GetPartitionsHandler(_ *grid.MSS) (*grid.JSON[madmin.Partitions], *grid.RemoteErr) {
|
|
info := madmin.GetPartitions(context.Background(), globalLocalNodeName)
|
|
return madminPartitions.NewJSONWith(&info), nil
|
|
}
|
|
|
|
// GetOSInfoHandler - returns operating system's information.
|
|
func (s *peerRESTServer) GetOSInfoHandler(_ *grid.MSS) (*grid.JSON[madmin.OSInfo], *grid.RemoteErr) {
|
|
info := madmin.GetOSInfo(context.Background(), globalLocalNodeName)
|
|
return madminOSInfo.NewJSONWith(&info), nil
|
|
}
|
|
|
|
// GetProcInfoHandler - returns this MinIO process information.
|
|
func (s *peerRESTServer) GetProcInfoHandler(_ *grid.MSS) (*grid.JSON[madmin.ProcInfo], *grid.RemoteErr) {
|
|
info := madmin.GetProcInfo(context.Background(), globalLocalNodeName)
|
|
return madminProcInfo.NewJSONWith(&info), nil
|
|
}
|
|
|
|
// GetMemInfoHandler - returns memory information.
|
|
func (s *peerRESTServer) GetMemInfoHandler(_ *grid.MSS) (*grid.JSON[madmin.MemInfo], *grid.RemoteErr) {
|
|
info := madmin.GetMemInfo(context.Background(), globalLocalNodeName)
|
|
return madminMemInfo.NewJSONWith(&info), nil
|
|
}
|
|
|
|
// GetMetricsHandler - returns server metrics.
|
|
func (s *peerRESTServer) GetMetricsHandler(v *grid.URLValues) (*grid.JSON[madmin.RealtimeMetrics], *grid.RemoteErr) {
|
|
values := v.Values()
|
|
var types madmin.MetricType
|
|
if t, _ := strconv.ParseUint(values.Get(peerRESTMetricsTypes), 10, 64); t != 0 {
|
|
types = madmin.MetricType(t)
|
|
} else {
|
|
types = madmin.MetricsAll
|
|
}
|
|
|
|
diskMap := make(map[string]struct{})
|
|
for _, disk := range values[peerRESTDisk] {
|
|
diskMap[disk] = struct{}{}
|
|
}
|
|
|
|
hostMap := make(map[string]struct{})
|
|
for _, host := range values[peerRESTHost] {
|
|
hostMap[host] = struct{}{}
|
|
}
|
|
|
|
info := collectLocalMetrics(types, collectMetricsOpts{
|
|
disks: diskMap,
|
|
hosts: hostMap,
|
|
jobID: values.Get(peerRESTJobID),
|
|
depID: values.Get(peerRESTDepID),
|
|
})
|
|
return madminRealtimeMetrics.NewJSONWith(&info), nil
|
|
}
|
|
|
|
// GetSysConfigHandler - returns system config information.
|
|
// (only the config that are of concern to minio)
|
|
func (s *peerRESTServer) GetSysConfigHandler(_ *grid.MSS) (*grid.JSON[madmin.SysConfig], *grid.RemoteErr) {
|
|
info := madmin.GetSysConfig(context.Background(), globalLocalNodeName)
|
|
return madminSysConfig.NewJSONWith(&info), nil
|
|
}
|
|
|
|
// GetSysServicesHandler - returns system services information.
|
|
// (only the services that are of concern to minio)
|
|
func (s *peerRESTServer) GetSysServicesHandler(_ *grid.MSS) (*grid.JSON[madmin.SysServices], *grid.RemoteErr) {
|
|
info := madmin.GetSysServices(context.Background(), globalLocalNodeName)
|
|
return madminSysServices.NewJSONWith(&info), nil
|
|
}
|
|
|
|
// GetSysErrorsHandler - returns system level errors
|
|
func (s *peerRESTServer) GetSysErrorsHandler(_ *grid.MSS) (*grid.JSON[madmin.SysErrors], *grid.RemoteErr) {
|
|
info := madmin.GetSysErrors(context.Background(), globalLocalNodeName)
|
|
return madminSysErrors.NewJSONWith(&info), nil
|
|
}
|
|
|
|
// DeleteBucketMetadataHandler - Delete in memory bucket metadata
|
|
func (s *peerRESTServer) DeleteBucketMetadataHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
bucketName := mss.Get(peerRESTBucket)
|
|
if bucketName == "" {
|
|
return np, grid.NewRemoteErr(errors.New("Bucket name is missing"))
|
|
}
|
|
|
|
globalReplicationStats.Delete(bucketName)
|
|
globalBucketMetadataSys.Remove(bucketName)
|
|
globalBucketTargetSys.Delete(bucketName)
|
|
globalEventNotifier.RemoveNotification(bucketName)
|
|
globalBucketConnStats.delete(bucketName)
|
|
globalBucketHTTPStats.delete(bucketName)
|
|
if localMetacacheMgr != nil {
|
|
localMetacacheMgr.deleteBucketCache(bucketName)
|
|
}
|
|
return
|
|
}
|
|
|
|
// GetAllBucketStatsHandler - fetches bucket replication stats for all buckets from this peer.
|
|
func (s *peerRESTServer) GetAllBucketStatsHandler(mss *grid.MSS) (*BucketStatsMap, *grid.RemoteErr) {
|
|
replicationStats := globalReplicationStats.GetAll()
|
|
bucketStatsMap := make(map[string]BucketStats, len(replicationStats))
|
|
for k, v := range replicationStats {
|
|
bucketStatsMap[k] = BucketStats{
|
|
ReplicationStats: v,
|
|
ProxyStats: globalReplicationStats.getProxyStats(k),
|
|
}
|
|
}
|
|
return &BucketStatsMap{Stats: bucketStatsMap, Timestamp: time.Now()}, nil
|
|
}
|
|
|
|
// GetBucketStatsHandler - fetches current in-memory bucket stats, currently only
|
|
// returns BucketStats, that currently includes ReplicationStats.
|
|
func (s *peerRESTServer) GetBucketStatsHandler(vars *grid.MSS) (*BucketStats, *grid.RemoteErr) {
|
|
bucketName := vars.Get(peerRESTBucket)
|
|
if bucketName == "" {
|
|
return nil, grid.NewRemoteErrString("Bucket name is missing")
|
|
}
|
|
|
|
bs := BucketStats{
|
|
ReplicationStats: globalReplicationStats.Get(bucketName),
|
|
QueueStats: ReplicationQueueStats{Nodes: []ReplQNodeStats{globalReplicationStats.getNodeQueueStats(bucketName)}},
|
|
ProxyStats: globalReplicationStats.getProxyStats(bucketName),
|
|
}
|
|
return &bs, nil
|
|
}
|
|
|
|
// GetSRMetricsHandler - fetches current in-memory replication stats at site level from this peer
|
|
func (s *peerRESTServer) GetSRMetricsHandler(mss *grid.MSS) (*SRMetricsSummary, *grid.RemoteErr) {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return nil, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
|
|
sm := globalReplicationStats.getSRMetricsForNode()
|
|
return &sm, nil
|
|
}
|
|
|
|
// LoadBucketMetadataHandler - reloads in memory bucket metadata
|
|
func (s *peerRESTServer) LoadBucketMetadataHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
bucketName := mss.Get(peerRESTBucket)
|
|
if bucketName == "" {
|
|
return np, grid.NewRemoteErr(errors.New("Bucket name is missing"))
|
|
}
|
|
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return np, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
|
|
meta, err := loadBucketMetadata(context.Background(), objAPI, bucketName)
|
|
if err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
|
|
globalBucketMetadataSys.Set(bucketName, meta)
|
|
|
|
if meta.notificationConfig != nil {
|
|
globalEventNotifier.AddRulesMap(bucketName, meta.notificationConfig.ToRulesMap())
|
|
}
|
|
|
|
if meta.bucketTargetConfig != nil {
|
|
globalBucketTargetSys.UpdateAllTargets(bucketName, meta.bucketTargetConfig)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (s *peerRESTServer) GetMetacacheListingHandler(opts *listPathOptions) (*metacache, *grid.RemoteErr) {
|
|
resp := localMetacacheMgr.getBucket(context.Background(), opts.Bucket).findCache(*opts)
|
|
return &resp, nil
|
|
}
|
|
|
|
func (s *peerRESTServer) UpdateMetacacheListingHandler(req *metacache) (*metacache, *grid.RemoteErr) {
|
|
cache, err := localMetacacheMgr.updateCacheEntry(*req)
|
|
if err != nil {
|
|
return nil, grid.NewRemoteErr(err)
|
|
}
|
|
return &cache, nil
|
|
}
|
|
|
|
// PutBucketNotificationHandler - Set bucket policy.
|
|
func (s *peerRESTServer) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
s.writeErrorResponse(w, errors.New("Invalid request"))
|
|
return
|
|
}
|
|
|
|
vars := mux.Vars(r)
|
|
bucketName := vars[peerRESTBucket]
|
|
if bucketName == "" {
|
|
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
|
|
return
|
|
}
|
|
|
|
var rulesMap event.RulesMap
|
|
if r.ContentLength < 0 {
|
|
s.writeErrorResponse(w, errInvalidArgument)
|
|
return
|
|
}
|
|
|
|
err := gob.NewDecoder(r.Body).Decode(&rulesMap)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
|
|
globalEventNotifier.AddRulesMap(bucketName, rulesMap)
|
|
}
|
|
|
|
// HealthHandler - returns true of health
|
|
func (s *peerRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request) {
|
|
s.IsValid(w, r)
|
|
}
|
|
|
|
// VerifyBinary - verifies the downloaded binary is in-tact
|
|
func (s *peerRESTServer) VerifyBinaryHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
s.writeErrorResponse(w, errors.New("Invalid request"))
|
|
return
|
|
}
|
|
|
|
if r.ContentLength < 0 {
|
|
s.writeErrorResponse(w, errInvalidArgument)
|
|
return
|
|
}
|
|
|
|
u, err := url.Parse(r.Form.Get(peerRESTURL))
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
|
|
sha256Sum, err := hex.DecodeString(r.Form.Get(peerRESTSha256Sum))
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
releaseInfo := r.Form.Get(peerRESTReleaseInfo)
|
|
|
|
lrTime, err := releaseInfoToReleaseTime(releaseInfo)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
|
|
if lrTime.Sub(currentReleaseTime) <= 0 {
|
|
s.writeErrorResponse(w, fmt.Errorf("server is already running the latest version: %s", Version))
|
|
return
|
|
}
|
|
|
|
zr, err := zstd.NewReader(r.Body)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
defer zr.Close()
|
|
|
|
if err = verifyBinary(u, sha256Sum, releaseInfo, getMinioMode(), zr); err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
// CommitBinary - overwrites the current binary with the new one.
|
|
func (s *peerRESTServer) CommitBinaryHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
s.writeErrorResponse(w, errors.New("Invalid request"))
|
|
return
|
|
}
|
|
|
|
if err := commitBinary(); err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
var errUnsupportedSignal = fmt.Errorf("unsupported signal")
|
|
|
|
func waitingDrivesNode() map[string]madmin.DiskMetrics {
|
|
globalLocalDrivesMu.RLock()
|
|
localDrives := cloneDrives(globalLocalDrives)
|
|
globalLocalDrivesMu.RUnlock()
|
|
|
|
errs := make([]error, len(localDrives))
|
|
infos := make([]DiskInfo, len(localDrives))
|
|
for i, drive := range localDrives {
|
|
infos[i], errs[i] = drive.DiskInfo(GlobalContext, DiskInfoOptions{})
|
|
}
|
|
infoMaps := make(map[string]madmin.DiskMetrics)
|
|
for i := range infos {
|
|
if infos[i].Metrics.TotalWaiting >= 1 && errors.Is(errs[i], errFaultyDisk) {
|
|
infoMaps[infos[i].Endpoint] = madmin.DiskMetrics{
|
|
TotalWaiting: infos[i].Metrics.TotalWaiting,
|
|
}
|
|
}
|
|
}
|
|
return infoMaps
|
|
}
|
|
|
|
// SignalServiceHandler - signal service handler.
|
|
func (s *peerRESTServer) SignalServiceHandler(vars *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
signalString := vars.Get(peerRESTSignal)
|
|
if signalString == "" {
|
|
return np, grid.NewRemoteErrString("signal name is missing")
|
|
}
|
|
si, err := strconv.Atoi(signalString)
|
|
if err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
signal := serviceSignal(si)
|
|
switch signal {
|
|
case serviceRestart, serviceStop:
|
|
dryRun := vars.Get("dry-run") == "true" // This is only supported for `restart/stop`
|
|
|
|
waitingDisks := waitingDrivesNode()
|
|
if len(waitingDisks) > 0 {
|
|
buf, err := json.Marshal(waitingDisks)
|
|
if err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
return np, grid.NewRemoteErrString(string(buf))
|
|
}
|
|
if !dryRun {
|
|
globalServiceSignalCh <- signal
|
|
}
|
|
case serviceFreeze:
|
|
freezeServices()
|
|
case serviceUnFreeze:
|
|
unfreezeServices()
|
|
case serviceReloadDynamic:
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return np, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
srvCfg, err := getValidConfig(objAPI)
|
|
if err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
subSys := vars.Get(peerRESTSubSys)
|
|
// Apply dynamic values.
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
if subSys == "" {
|
|
err = applyDynamicConfig(ctx, objAPI, srvCfg)
|
|
} else {
|
|
err = applyDynamicConfigForSubSys(ctx, objAPI, srvCfg, subSys)
|
|
}
|
|
if err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
default:
|
|
return np, grid.NewRemoteErr(errUnsupportedSignal)
|
|
}
|
|
return np, nil
|
|
}
|
|
|
|
// ListenHandler sends http trace messages back to peer rest client
|
|
func (s *peerRESTServer) ListenHandler(ctx context.Context, v *grid.URLValues, out chan<- *grid.Bytes) *grid.RemoteErr {
|
|
values := v.Values()
|
|
defer v.Recycle()
|
|
var prefix string
|
|
if len(values[peerRESTListenPrefix]) > 1 {
|
|
return grid.NewRemoteErrString("invalid request (peerRESTListenPrefix)")
|
|
}
|
|
globalAPIConfig.getRequestsPoolCapacity()
|
|
if len(values[peerRESTListenPrefix]) == 1 {
|
|
if err := event.ValidateFilterRuleValue(values[peerRESTListenPrefix][0]); err != nil {
|
|
return grid.NewRemoteErr(err)
|
|
}
|
|
|
|
prefix = values[peerRESTListenPrefix][0]
|
|
}
|
|
|
|
var suffix string
|
|
if len(values[peerRESTListenSuffix]) > 1 {
|
|
return grid.NewRemoteErrString("invalid request (peerRESTListenSuffix)")
|
|
}
|
|
|
|
if len(values[peerRESTListenSuffix]) == 1 {
|
|
if err := event.ValidateFilterRuleValue(values[peerRESTListenSuffix][0]); err != nil {
|
|
return grid.NewRemoteErr(err)
|
|
}
|
|
|
|
suffix = values[peerRESTListenSuffix][0]
|
|
}
|
|
|
|
pattern := event.NewPattern(prefix, suffix)
|
|
|
|
var eventNames []event.Name
|
|
var mask pubsub.Mask
|
|
for _, ev := range values[peerRESTListenEvents] {
|
|
eventName, err := event.ParseName(ev)
|
|
if err != nil {
|
|
return grid.NewRemoteErr(err)
|
|
}
|
|
mask.MergeMaskable(eventName)
|
|
eventNames = append(eventNames, eventName)
|
|
}
|
|
|
|
rulesMap := event.NewRulesMap(eventNames, pattern, event.TargetID{ID: mustGetUUID()})
|
|
|
|
// Listen Publisher uses nonblocking publish and hence does not wait for slow subscribers.
|
|
// Use buffered channel to take care of burst sends or slow w.Write()
|
|
ch := make(chan event.Event, globalAPIConfig.getRequestsPoolCapacity())
|
|
err := globalHTTPListen.Subscribe(mask, ch, ctx.Done(), func(ev event.Event) bool {
|
|
if ev.S3.Bucket.Name != "" && values.Get(peerRESTListenBucket) != "" {
|
|
if ev.S3.Bucket.Name != values.Get(peerRESTListenBucket) {
|
|
return false
|
|
}
|
|
}
|
|
return rulesMap.MatchSimple(ev.EventName, ev.S3.Object.Key)
|
|
})
|
|
if err != nil {
|
|
return grid.NewRemoteErr(err)
|
|
}
|
|
|
|
// Process until remote disconnects.
|
|
// Blocks on upstream (out) congestion.
|
|
// We have however a dynamic downstream buffer (ch).
|
|
buf := bytes.NewBuffer(grid.GetByteBuffer())
|
|
enc := json.NewEncoder(buf)
|
|
tmpEvt := struct{ Records []event.Event }{[]event.Event{{}}}
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
grid.PutByteBuffer(buf.Bytes())
|
|
return nil
|
|
case ev := <-ch:
|
|
buf.Reset()
|
|
tmpEvt.Records[0] = ev
|
|
if err := enc.Encode(tmpEvt); err != nil {
|
|
peersLogOnceIf(ctx, err, "event: Encode failed")
|
|
continue
|
|
}
|
|
out <- grid.NewBytesWithCopyOf(buf.Bytes())
|
|
}
|
|
}
|
|
}
|
|
|
|
// TraceHandler sends http trace messages back to peer rest client
|
|
func (s *peerRESTServer) TraceHandler(ctx context.Context, payload []byte, _ <-chan []byte, out chan<- []byte) *grid.RemoteErr {
|
|
var traceOpts madmin.ServiceTraceOpts
|
|
err := json.Unmarshal(payload, &traceOpts)
|
|
if err != nil {
|
|
return grid.NewRemoteErr(err)
|
|
}
|
|
var wg sync.WaitGroup
|
|
|
|
// Trace Publisher uses nonblocking publish and hence does not wait for slow subscribers.
|
|
// Use buffered channel to take care of burst sends or slow w.Write()
|
|
err = globalTrace.SubscribeJSON(traceOpts.TraceTypes(), out, ctx.Done(), func(entry madmin.TraceInfo) bool {
|
|
return shouldTrace(entry, traceOpts)
|
|
}, &wg)
|
|
if err != nil {
|
|
return grid.NewRemoteErr(err)
|
|
}
|
|
|
|
// Publish bootstrap events that have already occurred before client could subscribe.
|
|
if traceOpts.TraceTypes().Contains(madmin.TraceBootstrap) {
|
|
go globalBootstrapTracer.Publish(ctx, globalTrace)
|
|
}
|
|
|
|
// Wait for remote to cancel and SubscribeJSON to exit.
|
|
wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
func (s *peerRESTServer) BackgroundHealStatusHandler(_ *grid.MSS) (*grid.JSON[madmin.BgHealState], *grid.RemoteErr) {
|
|
state, ok := getLocalBackgroundHealStatus(context.Background(), newObjectLayerFn())
|
|
if !ok {
|
|
return nil, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
return madminBgHealState.NewJSONWith(&state), nil
|
|
}
|
|
|
|
// ReloadSiteReplicationConfigHandler - reloads site replication configuration from the disks
|
|
func (s *peerRESTServer) ReloadSiteReplicationConfigHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return np, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
|
|
peersLogIf(context.Background(), globalSiteReplicationSys.Init(context.Background(), objAPI))
|
|
return
|
|
}
|
|
|
|
func (s *peerRESTServer) ReloadPoolMetaHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return np, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
|
|
pools, ok := objAPI.(*erasureServerPools)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
if err := pools.ReloadPoolMeta(context.Background()); err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (s *peerRESTServer) StopRebalanceHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return np, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
|
|
pools, ok := objAPI.(*erasureServerPools)
|
|
if !ok {
|
|
return np, grid.NewRemoteErr(errors.New("not a pooled setup"))
|
|
}
|
|
|
|
pools.StopRebalance()
|
|
return
|
|
}
|
|
|
|
func (s *peerRESTServer) LoadRebalanceMetaHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return np, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
|
|
pools, ok := objAPI.(*erasureServerPools)
|
|
if !ok {
|
|
return np, grid.NewRemoteErr(errors.New("not a pooled setup"))
|
|
}
|
|
|
|
startRebalance, err := strconv.ParseBool(mss.Get(peerRESTStartRebalance))
|
|
if err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
|
|
if err := pools.loadRebalanceMeta(context.Background()); err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
|
|
if startRebalance {
|
|
go pools.StartRebalance()
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (s *peerRESTServer) LoadTransitionTierConfigHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return np, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
|
|
go func() {
|
|
err := globalTierConfigMgr.Reload(context.Background(), newObjectLayerFn())
|
|
if err != nil {
|
|
peersLogIf(context.Background(), fmt.Errorf("Failed to reload remote tier config %s", err))
|
|
}
|
|
}()
|
|
|
|
return
|
|
}
|
|
|
|
// ConsoleLogHandler sends console logs of this node back to peer rest client
|
|
func (s *peerRESTServer) ConsoleLogHandler(ctx context.Context, params *grid.MSS, out chan<- *grid.Bytes) *grid.RemoteErr {
|
|
mask, err := strconv.Atoi(params.Get(peerRESTLogMask))
|
|
if err != nil {
|
|
mask = int(madmin.LogMaskAll)
|
|
}
|
|
ch := make(chan log.Info, 1000)
|
|
err = globalConsoleSys.Subscribe(ch, ctx.Done(), "", 0, madmin.LogMask(mask), nil)
|
|
if err != nil {
|
|
return grid.NewRemoteErr(err)
|
|
}
|
|
var buf bytes.Buffer
|
|
enc := json.NewEncoder(&buf)
|
|
for {
|
|
select {
|
|
case entry, ok := <-ch:
|
|
if !ok {
|
|
return grid.NewRemoteErrString("console log channel closed")
|
|
}
|
|
if !entry.SendLog("", madmin.LogMask(mask)) {
|
|
continue
|
|
}
|
|
buf.Reset()
|
|
if err := enc.Encode(entry); err != nil {
|
|
return grid.NewRemoteErr(err)
|
|
}
|
|
out <- grid.NewBytesWithCopyOf(buf.Bytes())
|
|
case <-ctx.Done():
|
|
return grid.NewRemoteErr(ctx.Err())
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *peerRESTServer) writeErrorResponse(w http.ResponseWriter, err error) {
|
|
w.WriteHeader(http.StatusForbidden)
|
|
w.Write([]byte(err.Error()))
|
|
}
|
|
|
|
// IsValid - To authenticate and verify the time difference.
|
|
func (s *peerRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool {
|
|
if err := storageServerRequestValidate(r); err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// GetBandwidth gets the bandwidth for the buckets requested.
|
|
func (s *peerRESTServer) GetBandwidth(params *grid.URLValues) (*bandwidth.BucketBandwidthReport, *grid.RemoteErr) {
|
|
buckets := params.Values().Get("buckets")
|
|
selectBuckets := b.SelectBuckets(buckets)
|
|
return globalBucketMonitor.GetReport(selectBuckets), nil
|
|
}
|
|
|
|
func (s *peerRESTServer) GetResourceMetrics(_ *grid.MSS) (*grid.Array[*MetricV2], *grid.RemoteErr) {
|
|
res := make([]*MetricV2, 0, len(resourceMetricsGroups))
|
|
populateAndPublish(resourceMetricsGroups, func(m MetricV2) bool {
|
|
if m.VariableLabels == nil {
|
|
m.VariableLabels = make(map[string]string, 1)
|
|
}
|
|
m.VariableLabels[serverName] = globalLocalNodeName
|
|
res = append(res, &m)
|
|
return true
|
|
})
|
|
return aoMetricsGroup.NewWith(res), nil
|
|
}
|
|
|
|
// GetPeerMetrics gets the metrics to be federated across peers.
|
|
func (s *peerRESTServer) GetPeerMetrics(_ *grid.MSS) (*grid.Array[*MetricV2], *grid.RemoteErr) {
|
|
res := make([]*MetricV2, 0, len(peerMetricsGroups))
|
|
populateAndPublish(peerMetricsGroups, func(m MetricV2) bool {
|
|
if m.VariableLabels == nil {
|
|
m.VariableLabels = make(map[string]string, 1)
|
|
}
|
|
m.VariableLabels[serverName] = globalLocalNodeName
|
|
res = append(res, &m)
|
|
return true
|
|
})
|
|
return aoMetricsGroup.NewWith(res), nil
|
|
}
|
|
|
|
// GetPeerBucketMetrics gets the metrics to be federated across peers.
|
|
func (s *peerRESTServer) GetPeerBucketMetrics(_ *grid.MSS) (*grid.Array[*MetricV2], *grid.RemoteErr) {
|
|
res := make([]*MetricV2, 0, len(bucketPeerMetricsGroups))
|
|
populateAndPublish(bucketPeerMetricsGroups, func(m MetricV2) bool {
|
|
if m.VariableLabels == nil {
|
|
m.VariableLabels = make(map[string]string, 1)
|
|
}
|
|
m.VariableLabels[serverName] = globalLocalNodeName
|
|
res = append(res, &m)
|
|
return true
|
|
})
|
|
return aoMetricsGroup.NewWith(res), nil
|
|
}
|
|
|
|
func (s *peerRESTServer) SpeedTestHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
s.writeErrorResponse(w, errors.New("invalid request"))
|
|
return
|
|
}
|
|
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
s.writeErrorResponse(w, errServerNotInitialized)
|
|
return
|
|
}
|
|
|
|
sizeStr := r.Form.Get(peerRESTSize)
|
|
durationStr := r.Form.Get(peerRESTDuration)
|
|
concurrentStr := r.Form.Get(peerRESTConcurrent)
|
|
storageClass := r.Form.Get(peerRESTStorageClass)
|
|
bucketName := r.Form.Get(peerRESTBucket)
|
|
enableSha256 := r.Form.Get(peerRESTEnableSha256) == "true"
|
|
|
|
size, err := strconv.Atoi(sizeStr)
|
|
if err != nil {
|
|
size = 64 * humanize.MiByte
|
|
}
|
|
|
|
concurrent, err := strconv.Atoi(concurrentStr)
|
|
if err != nil {
|
|
concurrent = 32
|
|
}
|
|
|
|
duration, err := time.ParseDuration(durationStr)
|
|
if err != nil {
|
|
duration = time.Second * 10
|
|
}
|
|
|
|
done := keepHTTPResponseAlive(w)
|
|
|
|
result, err := selfSpeedTest(r.Context(), speedTestOpts{
|
|
objectSize: size,
|
|
concurrency: concurrent,
|
|
duration: duration,
|
|
storageClass: storageClass,
|
|
bucketName: bucketName,
|
|
enableSha256: enableSha256,
|
|
})
|
|
if err != nil {
|
|
result.Error = err.Error()
|
|
}
|
|
|
|
done(nil)
|
|
peersLogIf(r.Context(), gob.NewEncoder(w).Encode(result))
|
|
}
|
|
|
|
// GetLastDayTierStatsHandler - returns per-tier stats in the last 24hrs for this server
|
|
func (s *peerRESTServer) GetLastDayTierStatsHandler(_ *grid.MSS) (*DailyAllTierStats, *grid.RemoteErr) {
|
|
if objAPI := newObjectLayerFn(); objAPI == nil || globalTransitionState == nil {
|
|
return nil, grid.NewRemoteErr(errServerNotInitialized)
|
|
}
|
|
|
|
result := globalTransitionState.getDailyAllTierStats()
|
|
return &result, nil
|
|
}
|
|
|
|
func (s *peerRESTServer) DriveSpeedTestHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
s.writeErrorResponse(w, errors.New("invalid request"))
|
|
return
|
|
}
|
|
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
s.writeErrorResponse(w, errServerNotInitialized)
|
|
return
|
|
}
|
|
|
|
serial := r.Form.Get("serial") == "true"
|
|
blockSizeStr := r.Form.Get("blocksize")
|
|
fileSizeStr := r.Form.Get("filesize")
|
|
|
|
blockSize, err := strconv.ParseUint(blockSizeStr, 10, 64)
|
|
if err != nil {
|
|
blockSize = 4 * humanize.MiByte // default value
|
|
}
|
|
|
|
fileSize, err := strconv.ParseUint(fileSizeStr, 10, 64)
|
|
if err != nil {
|
|
fileSize = 1 * humanize.GiByte // default value
|
|
}
|
|
|
|
opts := madmin.DriveSpeedTestOpts{
|
|
Serial: serial,
|
|
BlockSize: blockSize,
|
|
FileSize: fileSize,
|
|
}
|
|
|
|
done := keepHTTPResponseAlive(w)
|
|
result := driveSpeedTest(r.Context(), opts)
|
|
done(nil)
|
|
|
|
peersLogIf(r.Context(), gob.NewEncoder(w).Encode(result))
|
|
}
|
|
|
|
// GetReplicationMRFHandler - returns replication MRF for bucket
|
|
func (s *peerRESTServer) GetReplicationMRFHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
s.writeErrorResponse(w, errors.New("invalid request"))
|
|
return
|
|
}
|
|
|
|
vars := mux.Vars(r)
|
|
bucketName := vars[peerRESTBucket]
|
|
ctx := newContext(r, w, "GetReplicationMRF")
|
|
re, err := globalReplicationPool.getMRF(ctx, bucketName)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
enc := gob.NewEncoder(w)
|
|
|
|
for m := range re {
|
|
if err := enc.Encode(m); err != nil {
|
|
s.writeErrorResponse(w, errors.New("Encoding mrf failed: "+err.Error()))
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// DevNull - everything goes to io.Discard
|
|
func (s *peerRESTServer) DevNull(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
s.writeErrorResponse(w, errors.New("invalid request"))
|
|
return
|
|
}
|
|
|
|
globalNetPerfRX.Connect()
|
|
defer globalNetPerfRX.Disconnect()
|
|
|
|
connectTime := time.Now()
|
|
ctx := newContext(r, w, "DevNull")
|
|
for {
|
|
n, err := io.CopyN(xioutil.Discard, r.Body, 128*humanize.KiByte)
|
|
atomic.AddUint64(&globalNetPerfRX.RX, uint64(n))
|
|
if err != nil && err != io.EOF {
|
|
// If there is a disconnection before globalNetPerfMinDuration (we give a margin of error of 1 sec)
|
|
// would mean the network is not stable. Logging here will help in debugging network issues.
|
|
if time.Since(connectTime) < (globalNetPerfMinDuration - time.Second) {
|
|
peersLogIf(ctx, err)
|
|
}
|
|
}
|
|
if err != nil {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// NetSpeedTestHandlers - perform network speedtest
|
|
func (s *peerRESTServer) NetSpeedTestHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
s.writeErrorResponse(w, errors.New("invalid request"))
|
|
return
|
|
}
|
|
|
|
durationStr := r.Form.Get(peerRESTDuration)
|
|
duration, err := time.ParseDuration(durationStr)
|
|
if err != nil || duration.Seconds() == 0 {
|
|
duration = time.Second * 10
|
|
}
|
|
result := netperf(r.Context(), duration.Round(time.Second))
|
|
peersLogIf(r.Context(), gob.NewEncoder(w).Encode(result))
|
|
}
|
|
|
|
func (s *peerRESTServer) HealBucketHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
bucket := mss.Get(peerS3Bucket)
|
|
if isMinioMetaBucket(bucket) {
|
|
return np, grid.NewRemoteErr(errInvalidArgument)
|
|
}
|
|
|
|
bucketDeleted := mss.Get(peerS3BucketDeleted) == "true"
|
|
_, err := healBucketLocal(context.Background(), bucket, madmin.HealOpts{
|
|
Remove: bucketDeleted,
|
|
})
|
|
if err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
|
|
return np, nil
|
|
}
|
|
|
|
func (s *peerRESTServer) ListBucketsHandler(opts *BucketOptions) (*grid.Array[*BucketInfo], *grid.RemoteErr) {
|
|
buckets, err := listBucketsLocal(context.Background(), *opts)
|
|
if err != nil {
|
|
return nil, grid.NewRemoteErr(err)
|
|
}
|
|
res := aoBucketInfo.New()
|
|
for i := range buckets {
|
|
bucket := buckets[i]
|
|
res.Append(&bucket)
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
// HeadBucketHandler implements peer BucketInfo call, returns bucket create date.
|
|
func (s *peerRESTServer) HeadBucketHandler(mss *grid.MSS) (info *VolInfo, nerr *grid.RemoteErr) {
|
|
bucket := mss.Get(peerS3Bucket)
|
|
if isMinioMetaBucket(bucket) {
|
|
return info, grid.NewRemoteErr(errInvalidArgument)
|
|
}
|
|
|
|
bucketDeleted := mss.Get(peerS3BucketDeleted) == "true"
|
|
|
|
bucketInfo, err := getBucketInfoLocal(context.Background(), bucket, BucketOptions{
|
|
Deleted: bucketDeleted,
|
|
})
|
|
if err != nil {
|
|
return info, grid.NewRemoteErr(err)
|
|
}
|
|
|
|
return &VolInfo{
|
|
Name: bucketInfo.Name,
|
|
Created: bucketInfo.Created,
|
|
}, nil
|
|
}
|
|
|
|
// DeleteBucketHandler implements peer delete bucket call.
|
|
func (s *peerRESTServer) DeleteBucketHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
bucket := mss.Get(peerS3Bucket)
|
|
if isMinioMetaBucket(bucket) {
|
|
return np, grid.NewRemoteErr(errInvalidArgument)
|
|
}
|
|
|
|
forceDelete := mss.Get(peerS3BucketForceDelete) == "true"
|
|
err := deleteBucketLocal(context.Background(), bucket, DeleteBucketOptions{
|
|
Force: forceDelete,
|
|
})
|
|
if err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
|
|
return np, nil
|
|
}
|
|
|
|
// MakeBucketHandler implements peer create bucket call.
|
|
func (s *peerRESTServer) MakeBucketHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
|
|
bucket := mss.Get(peerS3Bucket)
|
|
if isMinioMetaBucket(bucket) {
|
|
return np, grid.NewRemoteErr(errInvalidArgument)
|
|
}
|
|
|
|
forceCreate := mss.Get(peerS3BucketForceCreate) == "true"
|
|
err := makeBucketLocal(context.Background(), bucket, MakeBucketOptions{
|
|
ForceCreate: forceCreate,
|
|
})
|
|
if err != nil {
|
|
return np, grid.NewRemoteErr(err)
|
|
}
|
|
|
|
return np, nil
|
|
}
|
|
|
|
// registerPeerRESTHandlers - register peer rest router.
|
|
func registerPeerRESTHandlers(router *mux.Router, gm *grid.Manager) {
|
|
h := func(f http.HandlerFunc) http.HandlerFunc {
|
|
return collectInternodeStats(httpTraceHdrs(f))
|
|
}
|
|
|
|
server := &peerRESTServer{}
|
|
subrouter := router.PathPrefix(peerRESTPrefix).Subrouter()
|
|
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodHealth).HandlerFunc(h(server.HealthHandler))
|
|
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodVerifyBinary).HandlerFunc(h(server.VerifyBinaryHandler)).Queries(restQueries(peerRESTURL, peerRESTSha256Sum, peerRESTReleaseInfo)...)
|
|
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCommitBinary).HandlerFunc(h(server.CommitBinaryHandler))
|
|
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetReplicationMRF).HandlerFunc(httpTraceHdrs(server.GetReplicationMRFHandler)).Queries(restQueries(peerRESTBucket)...)
|
|
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStartProfiling).HandlerFunc(h(server.StartProfilingHandler)).Queries(restQueries(peerRESTProfiler)...)
|
|
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDownloadProfilingData).HandlerFunc(h(server.DownloadProfilingDataHandler))
|
|
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSpeedTest).HandlerFunc(h(server.SpeedTestHandler))
|
|
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDriveSpeedTest).HandlerFunc(h(server.DriveSpeedTestHandler))
|
|
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodNetperf).HandlerFunc(h(server.NetSpeedTestHandler))
|
|
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDevNull).HandlerFunc(h(server.DevNull))
|
|
|
|
logger.FatalIf(consoleLogRPC.RegisterNoInput(gm, server.ConsoleLogHandler), "unable to register handler")
|
|
logger.FatalIf(deleteBucketMetadataRPC.Register(gm, server.DeleteBucketMetadataHandler), "unable to register handler")
|
|
logger.FatalIf(deleteBucketRPC.Register(gm, server.DeleteBucketHandler), "unable to register handler")
|
|
logger.FatalIf(deletePolicyRPC.Register(gm, server.DeletePolicyHandler), "unable to register handler")
|
|
logger.FatalIf(deleteSvcActRPC.Register(gm, server.DeleteServiceAccountHandler), "unable to register handler")
|
|
logger.FatalIf(deleteUserRPC.Register(gm, server.DeleteUserHandler), "unable to register handler")
|
|
logger.FatalIf(getAllBucketStatsRPC.Register(gm, server.GetAllBucketStatsHandler), "unable to register handler")
|
|
logger.FatalIf(getBackgroundHealStatusRPC.Register(gm, server.BackgroundHealStatusHandler), "unable to register handler")
|
|
logger.FatalIf(getBandwidthRPC.Register(gm, server.GetBandwidth), "unable to register handler")
|
|
logger.FatalIf(getBucketStatsRPC.Register(gm, server.GetBucketStatsHandler), "unable to register handler")
|
|
logger.FatalIf(getCPUsHandler.Register(gm, server.GetCPUsHandler), "unable to register handler")
|
|
logger.FatalIf(getLastDayTierStatsRPC.Register(gm, server.GetLastDayTierStatsHandler), "unable to register handler")
|
|
logger.FatalIf(getLocksRPC.Register(gm, server.GetLocksHandler), "unable to register handler")
|
|
logger.FatalIf(getMemInfoRPC.Register(gm, server.GetMemInfoHandler), "unable to register handler")
|
|
logger.FatalIf(getMetacacheListingRPC.Register(gm, server.GetMetacacheListingHandler), "unable to register handler")
|
|
logger.FatalIf(getMetricsRPC.Register(gm, server.GetMetricsHandler), "unable to register handler")
|
|
logger.FatalIf(getNetInfoRPC.Register(gm, server.GetNetInfoHandler), "unable to register handler")
|
|
logger.FatalIf(getOSInfoRPC.Register(gm, server.GetOSInfoHandler), "unable to register handler")
|
|
logger.FatalIf(getPartitionsRPC.Register(gm, server.GetPartitionsHandler), "unable to register handler")
|
|
logger.FatalIf(getPeerBucketMetricsRPC.Register(gm, server.GetPeerBucketMetrics), "unable to register handler")
|
|
logger.FatalIf(getPeerMetricsRPC.Register(gm, server.GetPeerMetrics), "unable to register handler")
|
|
logger.FatalIf(getProcInfoRPC.Register(gm, server.GetProcInfoHandler), "unable to register handler")
|
|
logger.FatalIf(getResourceMetricsRPC.Register(gm, server.GetResourceMetrics), "unable to register handler")
|
|
logger.FatalIf(getSRMetricsRPC.Register(gm, server.GetSRMetricsHandler), "unable to register handler")
|
|
logger.FatalIf(getSysConfigRPC.Register(gm, server.GetSysConfigHandler), "unable to register handler")
|
|
logger.FatalIf(getSysErrorsRPC.Register(gm, server.GetSysErrorsHandler), "unable to register handler")
|
|
logger.FatalIf(getSysServicesRPC.Register(gm, server.GetSysServicesHandler), "unable to register handler")
|
|
logger.FatalIf(headBucketRPC.Register(gm, server.HeadBucketHandler), "unable to register handler")
|
|
logger.FatalIf(healBucketRPC.Register(gm, server.HealBucketHandler), "unable to register handler")
|
|
logger.FatalIf(listBucketsRPC.Register(gm, server.ListBucketsHandler), "unable to register handler")
|
|
logger.FatalIf(listenRPC.RegisterNoInput(gm, server.ListenHandler), "unable to register handler")
|
|
logger.FatalIf(loadBucketMetadataRPC.Register(gm, server.LoadBucketMetadataHandler), "unable to register handler")
|
|
logger.FatalIf(loadGroupRPC.Register(gm, server.LoadGroupHandler), "unable to register handler")
|
|
logger.FatalIf(loadPolicyMappingRPC.Register(gm, server.LoadPolicyMappingHandler), "unable to register handler")
|
|
logger.FatalIf(loadPolicyRPC.Register(gm, server.LoadPolicyHandler), "unable to register handler")
|
|
logger.FatalIf(loadRebalanceMetaRPC.Register(gm, server.LoadRebalanceMetaHandler), "unable to register handler")
|
|
logger.FatalIf(loadSvcActRPC.Register(gm, server.LoadServiceAccountHandler), "unable to register handler")
|
|
logger.FatalIf(loadTransitionTierConfigRPC.Register(gm, server.LoadTransitionTierConfigHandler), "unable to register handler")
|
|
logger.FatalIf(loadUserRPC.Register(gm, server.LoadUserHandler), "unable to register handler")
|
|
logger.FatalIf(localStorageInfoRPC.Register(gm, server.LocalStorageInfoHandler), "unable to register handler")
|
|
logger.FatalIf(makeBucketRPC.Register(gm, server.MakeBucketHandler), "unable to register handler")
|
|
logger.FatalIf(reloadPoolMetaRPC.Register(gm, server.ReloadPoolMetaHandler), "unable to register handler")
|
|
logger.FatalIf(reloadSiteReplicationConfigRPC.Register(gm, server.ReloadSiteReplicationConfigHandler), "unable to register handler")
|
|
logger.FatalIf(serverInfoRPC.Register(gm, server.ServerInfoHandler), "unable to register handler")
|
|
logger.FatalIf(signalServiceRPC.Register(gm, server.SignalServiceHandler), "unable to register handler")
|
|
logger.FatalIf(stopRebalanceRPC.Register(gm, server.StopRebalanceHandler), "unable to register handler")
|
|
logger.FatalIf(updateMetacacheListingRPC.Register(gm, server.UpdateMetacacheListingHandler), "unable to register handler")
|
|
|
|
logger.FatalIf(gm.RegisterStreamingHandler(grid.HandlerTrace, grid.StreamHandler{
|
|
Handle: server.TraceHandler,
|
|
Subroute: "",
|
|
OutCapacity: 100000,
|
|
InCapacity: 0,
|
|
}), "unable to register handler")
|
|
}
|