From eb69c4f946ce10f5996566f9ee4c550a3119aeae Mon Sep 17 00:00:00 2001 From: kannappanr <30541348+kannappanr@users.noreply.github.com> Date: Thu, 14 Mar 2019 16:27:31 -0700 Subject: [PATCH] Use REST api for inter node communication (#7205) --- cmd/notification.go | 255 ++++--- ...t-target.go => peer-rest-client-target.go} | 22 +- cmd/peer-rest-client.go | 425 ++++++++++++ cmd/peer-rest-common.go | 48 ++ cmd/peer-rest-server.go | 621 ++++++++++++++++++ cmd/peer-rpc-client.go | 261 -------- cmd/peer-rpc-server.go | 419 ------------ cmd/routers.go | 5 +- cmd/service.go | 8 +- 9 files changed, 1258 insertions(+), 806 deletions(-) rename cmd/{peer-rpc-client-target.go => peer-rest-client-target.go} (59%) create mode 100644 cmd/peer-rest-client.go create mode 100644 cmd/peer-rest-common.go create mode 100644 cmd/peer-rest-server.go delete mode 100644 cmd/peer-rpc-client.go delete mode 100644 cmd/peer-rpc-server.go diff --git a/cmd/notification.go b/cmd/notification.go index 666bfbee1..7366370cc 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -43,7 +43,7 @@ type NotificationSys struct { targetList *event.TargetList bucketRulesMap map[string]event.RulesMap bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap - peerRPCClientMap map[xnet.Host]*PeerRPCClient + peerClients []*peerRESTClient } // GetARNList - returns available ARNs. @@ -63,11 +63,6 @@ func (sys *NotificationSys) GetARNList() []string { return arns } -// GetPeerRPCClient - returns PeerRPCClient of addr. -func (sys *NotificationSys) GetPeerRPCClient(addr xnet.Host) *PeerRPCClient { - return sys.peerRPCClientMap[addr] -} - // NotificationPeerErr returns error associated for a remote peer. type NotificationPeerErr struct { Host xnet.Host // Remote host on which the rpc call was initiated @@ -78,15 +73,15 @@ type NotificationPeerErr struct { func (sys *NotificationSys) DeleteBucket(ctx context.Context, bucketName string) { go func() { var wg sync.WaitGroup - for addr, client := range sys.peerRPCClientMap { + for _, client := range sys.peerClients { wg.Add(1) - go func(addr xnet.Host, client *PeerRPCClient) { + go func(client *peerRESTClient) { defer wg.Done() if err := client.DeleteBucket(bucketName); err != nil { - logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) + logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name) logger.LogIf(ctx, err) } - }(addr, client) + }(client) } wg.Wait() }() @@ -146,52 +141,58 @@ func (g *NotificationGroup) Go(ctx context.Context, f func() error, index int, a }() } -// ReloadFormat - calls ReloadFormat RPC call on all peers. +// ReloadFormat - calls ReloadFormat REST call on all peers. func (sys *NotificationSys) ReloadFormat(dryRun bool) []NotificationPeerErr { - var idx = 0 - ng := WithNPeers(len(sys.peerRPCClientMap)) - for addr, client := range sys.peerRPCClientMap { + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { + if client == nil { + continue + } client := client ng.Go(context.Background(), func() error { return client.ReloadFormat(dryRun) - }, idx, addr) - idx++ + }, idx, *client.host) } return ng.Wait() } // LoadUsers - calls LoadUsers RPC call on all peers. func (sys *NotificationSys) LoadUsers() []NotificationPeerErr { - var idx = 0 - ng := WithNPeers(len(sys.peerRPCClientMap)) - for addr, client := range sys.peerRPCClientMap { - ng.Go(context.Background(), client.LoadUsers, idx, addr) - idx++ + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { + if client == nil { + continue + } + client := client + ng.Go(context.Background(), client.LoadUsers, idx, *client.host) } return ng.Wait() } // LoadCredentials - calls LoadCredentials RPC call on all peers. func (sys *NotificationSys) LoadCredentials() []NotificationPeerErr { - var idx = 0 - ng := WithNPeers(len(sys.peerRPCClientMap)) - for addr, client := range sys.peerRPCClientMap { - ng.Go(context.Background(), client.LoadCredentials, idx, addr) - idx++ + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { + if client == nil { + continue + } + client := client + ng.Go(context.Background(), client.LoadCredentials, idx, *client.host) } return ng.Wait() } // StartProfiling - start profiling on remote peers, by initiating a remote RPC. func (sys *NotificationSys) StartProfiling(profiler string) []NotificationPeerErr { - var idx = 0 - ng := WithNPeers(len(sys.peerRPCClientMap)) - for addr, client := range sys.peerRPCClientMap { + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { + if client == nil { + continue + } client := client ng.Go(context.Background(), func() error { return client.StartProfiling(profiler) - }, idx, addr) - idx++ + }, idx, *client.host) } return ng.Wait() } @@ -205,10 +206,13 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io zipWriter := zip.NewWriter(writer) defer zipWriter.Close() - for addr, client := range sys.peerRPCClientMap { - data, err := client.DownloadProfilingData() + for _, client := range sys.peerClients { + if client == nil { + continue + } + data, err := client.DownloadProfileData() if err != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String()) + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) ctx := logger.SetReqInfo(ctx, reqInfo) logger.LogIf(ctx, err) continue @@ -218,7 +222,7 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io // Send profiling data to zip as file header, zerr := zip.FileInfoHeader(dummyFileInfo{ - name: fmt.Sprintf("profiling-%s.pprof", addr), + name: fmt.Sprintf("profiling-%s.pprof", client.host.String()), size: int64(len(data)), mode: 0600, modTime: UTCNow(), @@ -226,20 +230,20 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io sys: nil, }) if zerr != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String()) + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) ctx := logger.SetReqInfo(ctx, reqInfo) logger.LogIf(ctx, zerr) continue } zwriter, zerr := zipWriter.CreateHeader(header) if zerr != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String()) + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) ctx := logger.SetReqInfo(ctx, reqInfo) logger.LogIf(ctx, zerr) continue } if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String()) + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) ctx := logger.SetReqInfo(ctx, reqInfo) logger.LogIf(ctx, err) continue @@ -289,45 +293,48 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io // SignalService - calls signal service RPC call on all peers. func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerErr { - var idx = 0 - ng := WithNPeers(len(sys.peerRPCClientMap)) - for addr, client := range sys.peerRPCClientMap { + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { + if client == nil { + continue + } client := client ng.Go(context.Background(), func() error { return client.SignalService(sig) - }, idx, addr) - idx++ + }, idx, *client.host) } return ng.Wait() } // ServerInfo - calls ServerInfo RPC call on all peers. func (sys *NotificationSys) ServerInfo(ctx context.Context) []ServerInfo { - var idx = 0 - serverInfo := make([]ServerInfo, len(sys.peerRPCClientMap)) + serverInfo := make([]ServerInfo, len(sys.peerClients)) var wg sync.WaitGroup - for addr, client := range sys.peerRPCClientMap { + for index, client := range sys.peerClients { + if client == nil { + continue + } wg.Add(1) - go func(idx int, addr xnet.Host, client *PeerRPCClient) { + go func(idx int, client *peerRESTClient) { defer wg.Done() // Try to fetch serverInfo remotely in three attempts. for i := 0; i < 3; i++ { info, err := client.ServerInfo() if err == nil { serverInfo[idx] = ServerInfo{ - Addr: addr.String(), + Addr: client.host.String(), Data: &info, } return } serverInfo[idx] = ServerInfo{ - Addr: addr.String(), + Addr: client.host.String(), Data: &info, Error: err.Error(), } // Last iteration log the error. if i == 2 { - reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String()) + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) ctx := logger.SetReqInfo(ctx, reqInfo) logger.LogIf(ctx, err) } @@ -336,8 +343,7 @@ func (sys *NotificationSys) ServerInfo(ctx context.Context) []ServerInfo { time.Sleep(1 * time.Second) } } - }(idx, addr, client) - idx++ + }(index, client) } wg.Wait() return serverInfo @@ -345,19 +351,22 @@ func (sys *NotificationSys) ServerInfo(ctx context.Context) []ServerInfo { // GetLocks - makes GetLocks RPC call on all peers. func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks { - var idx = 0 - locksResp := make([]*PeerLocks, len(sys.peerRPCClientMap)) + + locksResp := make([]*PeerLocks, len(sys.peerClients)) var wg sync.WaitGroup - for addr, client := range sys.peerRPCClientMap { + for index, client := range sys.peerClients { + if client == nil { + continue + } wg.Add(1) - go func(idx int, addr xnet.Host, client *PeerRPCClient) { + go func(idx int, client *peerRESTClient) { defer wg.Done() // Try to fetch serverInfo remotely in three attempts. for i := 0; i < 3; i++ { serverLocksResp, err := client.GetLocks() if err == nil { locksResp[idx] = &PeerLocks{ - Addr: addr.String(), + Addr: client.host.String(), Locks: serverLocksResp, } return @@ -365,17 +374,16 @@ func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks { // Last iteration log the error. if i == 2 { - reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String()) + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) ctx := logger.SetReqInfo(ctx, reqInfo) - logger.LogOnceIf(ctx, err, addr.String()) + logger.LogOnceIf(ctx, err, client.host.String()) } // Wait for one second and no need wait after last attempt. if i < 2 { time.Sleep(1 * time.Second) } } - }(idx, addr, client) - idx++ + }(index, client) } wg.Wait() return locksResp @@ -385,15 +393,18 @@ func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks { func (sys *NotificationSys) SetBucketPolicy(ctx context.Context, bucketName string, bucketPolicy *policy.Policy) { go func() { var wg sync.WaitGroup - for addr, client := range sys.peerRPCClientMap { + for _, client := range sys.peerClients { + if client == nil { + continue + } wg.Add(1) - go func(addr xnet.Host, client *PeerRPCClient) { + go func(client *peerRESTClient) { defer wg.Done() if err := client.SetBucketPolicy(bucketName, bucketPolicy); err != nil { - logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) + logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name) logger.LogIf(ctx, err) } - }(addr, client) + }(client) } wg.Wait() }() @@ -403,15 +414,18 @@ func (sys *NotificationSys) SetBucketPolicy(ctx context.Context, bucketName stri func (sys *NotificationSys) RemoveBucketPolicy(ctx context.Context, bucketName string) { go func() { var wg sync.WaitGroup - for addr, client := range sys.peerRPCClientMap { + for _, client := range sys.peerClients { + if client == nil { + continue + } wg.Add(1) - go func(addr xnet.Host, client *PeerRPCClient) { + go func(client *peerRESTClient) { defer wg.Done() if err := client.RemoveBucketPolicy(bucketName); err != nil { - logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) + logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name) logger.LogIf(ctx, err) } - }(addr, client) + }(client) } wg.Wait() }() @@ -421,15 +435,18 @@ func (sys *NotificationSys) RemoveBucketPolicy(ctx context.Context, bucketName s func (sys *NotificationSys) PutBucketNotification(ctx context.Context, bucketName string, rulesMap event.RulesMap) { go func() { var wg sync.WaitGroup - for addr, client := range sys.peerRPCClientMap { + for _, client := range sys.peerClients { + if client == nil { + continue + } wg.Add(1) - go func(addr xnet.Host, client *PeerRPCClient, rulesMap event.RulesMap) { + go func(client *peerRESTClient, rulesMap event.RulesMap) { defer wg.Done() if err := client.PutBucketNotification(bucketName, rulesMap); err != nil { - logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) + logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name) logger.LogIf(ctx, err) } - }(addr, client, rulesMap.Clone()) + }(client, rulesMap.Clone()) } wg.Wait() }() @@ -440,15 +457,18 @@ func (sys *NotificationSys) ListenBucketNotification(ctx context.Context, bucket targetID event.TargetID, localPeer xnet.Host) { go func() { var wg sync.WaitGroup - for addr, client := range sys.peerRPCClientMap { + for _, client := range sys.peerClients { + if client == nil { + continue + } wg.Add(1) - go func(addr xnet.Host, client *PeerRPCClient) { + go func(client *peerRESTClient) { defer wg.Done() if err := client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer); err != nil { - logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) + logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name) logger.LogIf(ctx, err) } - }(addr, client) + }(client) } wg.Wait() }() @@ -492,7 +512,17 @@ func (sys *NotificationSys) RemoteTargetExist(bucketName string, targetID event. return ok } -// initListeners - initializes PeerRPC clients available in listener.json. +// ListenBucketNotificationArgs - listen bucket notification RPC arguments. +type ListenBucketNotificationArgs struct { + AuthArgs `json:"-"` + BucketName string `json:"-"` + EventNames []event.Name `json:"eventNames"` + Pattern string `json:"pattern"` + TargetID event.TargetID `json:"targetId"` + Addr xnet.Host `json:"addr"` +} + +// initListeners - initializes PeerREST clients available in listener.json. func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLayer, bucketName string) error { // listener.json is available/applicable only in DistXL mode. if !globalIsDistXL { @@ -542,12 +572,12 @@ func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLaye continue } - rpcClient := sys.GetPeerRPCClient(args.Addr) - if rpcClient == nil { - return fmt.Errorf("unable to find PeerRPCClient by address %v in listener.json for bucket %v", args.Addr, bucketName) + client, err := newPeerRESTClient(&args.Addr) + if err != nil { + return fmt.Errorf("unable to find PeerHost by address %v in listener.json for bucket %v", args.Addr, bucketName) } - exist, err := rpcClient.RemoteTargetExist(bucketName, args.TargetID) + exist, err := client.RemoteTargetExist(bucketName, args.TargetID) if err != nil { logger.GetReqInfo(ctx).AppendTags("targetID", args.TargetID.Name) logger.LogIf(ctx, err) @@ -558,7 +588,7 @@ func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLaye continue } - target := NewPeerRPCClientTarget(bucketName, args.TargetID, rpcClient) + target := NewPeerRESTClientTarget(bucketName, args.TargetID, client) rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID()) if err = sys.AddRemoteTarget(bucketName, target, rulesMap); err != nil { logger.GetReqInfo(ctx).AppendTags("targetName", target.id.Name) @@ -721,24 +751,25 @@ func (sys *NotificationSys) Send(args eventArgs) []event.TargetIDErr { // DrivePerfInfo - Drive speed (read and write) information func (sys *NotificationSys) DrivePerfInfo() []ServerDrivesPerfInfo { - reply := make([]ServerDrivesPerfInfo, len(sys.peerRPCClientMap)) + reply := make([]ServerDrivesPerfInfo, len(sys.peerClients)) var wg sync.WaitGroup - var i int - for addr, client := range sys.peerRPCClientMap { + for i, client := range sys.peerClients { + if client == nil { + continue + } wg.Add(1) - go func(addr xnet.Host, client *PeerRPCClient, idx int) { + go func(client *peerRESTClient, idx int) { defer wg.Done() di, err := client.DrivePerfInfo() if err != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr.String()) + reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String()) ctx := logger.SetReqInfo(context.Background(), reqInfo) logger.LogIf(ctx, err) - di.Addr = addr.String() + di.Addr = client.host.String() di.Error = err.Error() } reply[idx] = di - }(addr, client, i) - i++ + }(client, i) } wg.Wait() return reply @@ -746,24 +777,25 @@ func (sys *NotificationSys) DrivePerfInfo() []ServerDrivesPerfInfo { // MemUsageInfo - Mem utilization information func (sys *NotificationSys) MemUsageInfo() []ServerMemUsageInfo { - reply := make([]ServerMemUsageInfo, len(sys.peerRPCClientMap)) + reply := make([]ServerMemUsageInfo, len(sys.peerClients)) var wg sync.WaitGroup - var i int - for addr, client := range sys.peerRPCClientMap { + for i, client := range sys.peerClients { + if client == nil { + continue + } wg.Add(1) - go func(addr xnet.Host, client *PeerRPCClient, idx int) { + go func(client *peerRESTClient, idx int) { defer wg.Done() memi, err := client.MemUsageInfo() if err != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr.String()) + reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String()) ctx := logger.SetReqInfo(context.Background(), reqInfo) logger.LogIf(ctx, err) - memi.Addr = addr.String() + memi.Addr = client.host.String() memi.Error = err.Error() } reply[idx] = memi - }(addr, client, i) - i++ + }(client, i) } wg.Wait() return reply @@ -771,24 +803,25 @@ func (sys *NotificationSys) MemUsageInfo() []ServerMemUsageInfo { // CPULoadInfo - CPU utilization information func (sys *NotificationSys) CPULoadInfo() []ServerCPULoadInfo { - reply := make([]ServerCPULoadInfo, len(sys.peerRPCClientMap)) + reply := make([]ServerCPULoadInfo, len(sys.peerClients)) var wg sync.WaitGroup - var i int - for addr, client := range sys.peerRPCClientMap { + for i, client := range sys.peerClients { + if client == nil { + continue + } wg.Add(1) - go func(addr xnet.Host, client *PeerRPCClient, idx int) { + go func(client *peerRESTClient, idx int) { defer wg.Done() cpui, err := client.CPULoadInfo() if err != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr.String()) + reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String()) ctx := logger.SetReqInfo(context.Background(), reqInfo) logger.LogIf(ctx, err) - cpui.Addr = addr.String() + cpui.Addr = client.host.String() cpui.Error = err.Error() } reply[idx] = cpui - }(addr, client, i) - i++ + }(client, i) } wg.Wait() return reply @@ -797,14 +830,18 @@ func (sys *NotificationSys) CPULoadInfo() []ServerCPULoadInfo { // NewNotificationSys - creates new notification system object. func NewNotificationSys(config *serverConfig, endpoints EndpointList) *NotificationSys { targetList := getNotificationTargets(config) - peerRPCClientMap := makeRemoteRPCClients(endpoints) + remoteHosts := getRemoteHosts(endpoints) + remoteClients, err := getRestClients(remoteHosts) + if err != nil { + logger.FatalIf(err, "Unable to start notification sub system") + } // bucketRulesMap/bucketRemoteTargetRulesMap are initialized by NotificationSys.Init() return &NotificationSys{ targetList: targetList, bucketRulesMap: make(map[string]event.RulesMap), bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap), - peerRPCClientMap: peerRPCClientMap, + peerClients: remoteClients, } } diff --git a/cmd/peer-rpc-client-target.go b/cmd/peer-rest-client-target.go similarity index 59% rename from cmd/peer-rpc-client-target.go rename to cmd/peer-rest-client-target.go index 3d0996989..6b72d41bd 100644 --- a/cmd/peer-rpc-client-target.go +++ b/cmd/peer-rest-client-target.go @@ -18,35 +18,35 @@ package cmd import "github.com/minio/minio/pkg/event" -// PeerRPCClientTarget - RPCClient is an event.Target which sends event to target of remote peer. -type PeerRPCClientTarget struct { +// PeerRESTClientTarget - RPCClient is an event.Target which sends event to target of remote peer. +type PeerRESTClientTarget struct { id event.TargetID remoteTargetID event.TargetID - rpcClient *PeerRPCClient + restClient *peerRESTClient bucketName string } // ID - returns target ID. -func (target *PeerRPCClientTarget) ID() event.TargetID { +func (target *PeerRESTClientTarget) ID() event.TargetID { return target.id } // Send - sends event to remote peer by making RPC call. -func (target *PeerRPCClientTarget) Send(eventData event.Event) error { - return target.rpcClient.SendEvent(target.bucketName, target.id, target.remoteTargetID, eventData) +func (target *PeerRESTClientTarget) Send(eventData event.Event) error { + return target.restClient.SendEvent(target.bucketName, target.id, target.remoteTargetID, eventData) } // Close - does nothing and available for interface compatibility. -func (target *PeerRPCClientTarget) Close() error { +func (target *PeerRESTClientTarget) Close() error { return nil } -// NewPeerRPCClientTarget - creates RPCClient target with given target ID available in remote peer. -func NewPeerRPCClientTarget(bucketName string, targetID event.TargetID, rpcClient *PeerRPCClient) *PeerRPCClientTarget { - return &PeerRPCClientTarget{ +// NewPeerRESTClientTarget - creates RPCClient target with given target ID available in remote peer. +func NewPeerRESTClientTarget(bucketName string, targetID event.TargetID, restClient *peerRESTClient) *PeerRESTClientTarget { + return &PeerRESTClientTarget{ id: event.TargetID{ID: targetID.ID, Name: targetID.Name + "+" + mustGetUUID()}, remoteTargetID: targetID, bucketName: bucketName, - rpcClient: rpcClient, + restClient: restClient, } } diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go new file mode 100644 index 000000000..5e2dceb28 --- /dev/null +++ b/cmd/peer-rest-client.go @@ -0,0 +1,425 @@ +/* + * Minio Cloud Storage, (C) 2019 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/gob" + "io" + "net/url" + + "github.com/minio/minio/cmd/http" + "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/cmd/rest" + "github.com/minio/minio/pkg/event" + xnet "github.com/minio/minio/pkg/net" + "github.com/minio/minio/pkg/policy" +) + +// client to talk to peer Nodes. +type peerRESTClient struct { + host *xnet.Host + restClient *rest.Client + connected bool +} + +// Reconnect to a peer rest server. +func (client *peerRESTClient) reConnect() error { + // correct (intelligent) retry logic will be + // implemented in subsequent PRs. + client.connected = true + return nil +} + +// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected +// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() +// after verifying format.json +func (client *peerRESTClient) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { + if !client.connected { + err := client.reConnect() + logger.LogIf(context.Background(), err) + if err != nil { + return nil, err + } + } + + if values == nil { + values = make(url.Values) + } + + respBody, err = client.restClient.Call(method, values, body, length) + if err == nil { + return respBody, nil + } + + if isNetworkError(err) { + client.connected = false + } + + return nil, err +} + +// Stringer provides a canonicalized representation of node. +func (client *peerRESTClient) String() string { + return client.host.String() +} + +// IsOnline - returns whether RPC client failed to connect or not. +func (client *peerRESTClient) IsOnline() bool { + return client.connected +} + +// Close - marks the client as closed. +func (client *peerRESTClient) Close() error { + client.connected = false + client.restClient.Close() + return nil +} + +// GetLocksResp stores various info from the client for each lock that is requested. +type GetLocksResp map[string][]lockRequesterInfo + +// GetLocks - fetch older locks for a remote node. +func (client *peerRESTClient) GetLocks() (locks GetLocksResp, err error) { + respBody, err := client.call(peerRESTMethodGetLocks, nil, nil, -1) + if err != nil { + return + } + defer http.DrainBody(respBody) + err = gob.NewDecoder(respBody).Decode(&locks) + return locks, err +} + +// ServerInfo - fetch server information for a remote node. +func (client *peerRESTClient) ServerInfo() (info ServerInfoData, err error) { + respBody, err := client.call(peerRESTMethodServerInfo, nil, nil, -1) + if err != nil { + return + } + defer http.DrainBody(respBody) + err = gob.NewDecoder(respBody).Decode(&info) + return info, err +} + +// CPULoadInfo - fetch CPU information for a remote node. +func (client *peerRESTClient) CPULoadInfo() (info ServerCPULoadInfo, err error) { + respBody, err := client.call(peerRESTMethodCPULoadInfo, nil, nil, -1) + if err != nil { + return + } + defer http.DrainBody(respBody) + err = gob.NewDecoder(respBody).Decode(&info) + return info, err +} + +// DrivePerfInfo - fetch Drive performance information for a remote node. +func (client *peerRESTClient) DrivePerfInfo() (info ServerDrivesPerfInfo, err error) { + respBody, err := client.call(peerRESTMethodDrivePerfInfo, nil, nil, -1) + if err != nil { + return + } + defer http.DrainBody(respBody) + err = gob.NewDecoder(respBody).Decode(&info) + return info, err +} + +// MemUsageInfo - fetch memory usage information for a remote node. +func (client *peerRESTClient) MemUsageInfo() (info ServerMemUsageInfo, err error) { + respBody, err := client.call(peerRESTMethodMemUsageInfo, nil, nil, -1) + if err != nil { + return + } + defer http.DrainBody(respBody) + err = gob.NewDecoder(respBody).Decode(&info) + return info, err +} + +// StartProfiling - Issues profiling command on the peer node. +func (client *peerRESTClient) StartProfiling(profiler string) error { + values := make(url.Values) + values.Set(peerRESTProfiler, profiler) + respBody, err := client.call(peerRESTMethodStartProfiling, values, nil, -1) + if err != nil { + return err + } + defer http.DrainBody(respBody) + return nil +} + +// DownloadProfileData - download profiled data from a remote node. +func (client *peerRESTClient) DownloadProfileData() (data []byte, err error) { + respBody, err := client.call(peerRESTMethodDownloadProfilingData, nil, nil, -1) + if err != nil { + return + } + defer http.DrainBody(respBody) + err = gob.NewDecoder(respBody).Decode(&data) + return data, err +} + +// DeleteBucket - Delete notification and policies related to the bucket. +func (client *peerRESTClient) DeleteBucket(bucket string) error { + values := make(url.Values) + values.Set(peerRESTBucket, bucket) + respBody, err := client.call(peerRESTMethodDeleteBucket, values, nil, -1) + if err != nil { + return err + } + defer http.DrainBody(respBody) + return nil +} + +// ReloadFormat - reload format on the peer node. +func (client *peerRESTClient) ReloadFormat(dryRun bool) error { + values := make(url.Values) + if dryRun { + values.Set(peerRESTDryRun, "true") + } else { + values.Set(peerRESTDryRun, "false") + } + + respBody, err := client.call(peerRESTMethodReloadFormat, values, nil, -1) + if err != nil { + return err + } + defer http.DrainBody(respBody) + return nil +} + +// ListenBucketNotification - send listent bucket notification to peer nodes. +func (client *peerRESTClient) ListenBucketNotification(bucket string, eventNames []event.Name, + pattern string, targetID event.TargetID, addr xnet.Host) error { + args := listenBucketNotificationReq{ + EventNames: eventNames, + Pattern: pattern, + TargetID: targetID, + Addr: addr, + } + + values := make(url.Values) + values.Set(peerRESTBucket, bucket) + + var reader bytes.Buffer + err := gob.NewEncoder(&reader).Encode(args) + if err != nil { + return err + } + + respBody, err := client.call(peerRESTMethodBucketNotificationListen, values, &reader, -1) + if err != nil { + return err + } + + defer http.DrainBody(respBody) + return nil +} + +// SendEvent - calls send event RPC. +func (client *peerRESTClient) SendEvent(bucket string, targetID, remoteTargetID event.TargetID, eventData event.Event) error { + args := sendEventRequest{ + TargetID: remoteTargetID, + Event: eventData, + } + + values := make(url.Values) + values.Set(peerRESTBucket, bucket) + + var reader bytes.Buffer + err := gob.NewEncoder(&reader).Encode(args) + if err != nil { + return err + } + respBody, err := client.call(peerRESTMethodSendEvent, values, &reader, -1) + if err != nil { + return err + } + + var eventResp sendEventResp + defer http.DrainBody(respBody) + err = gob.NewDecoder(respBody).Decode(&eventResp) + + if err != nil || !eventResp.Success { + reqInfo := &logger.ReqInfo{BucketName: bucket} + reqInfo.AppendTags("targetID", targetID.Name) + reqInfo.AppendTags("event", eventData.EventName.String()) + ctx := logger.SetReqInfo(context.Background(), reqInfo) + logger.LogIf(ctx, err) + globalNotificationSys.RemoveRemoteTarget(bucket, targetID) + } + + return err +} + +// RemoteTargetExist - calls remote target ID exist REST API. +func (client *peerRESTClient) RemoteTargetExist(bucket string, targetID event.TargetID) (bool, error) { + values := make(url.Values) + values.Set(peerRESTBucket, bucket) + + var reader bytes.Buffer + err := gob.NewEncoder(&reader).Encode(targetID) + if err != nil { + return false, err + } + + respBody, err := client.call(peerRESTMethodTargetExists, values, &reader, -1) + if err != nil { + return false, err + } + defer http.DrainBody(respBody) + var targetExists remoteTargetExistsResp + err = gob.NewDecoder(respBody).Decode(&targetExists) + return targetExists.Exists, err +} + +// RemoveBucketPolicy - Remove bucket policy on the peer node. +func (client *peerRESTClient) RemoveBucketPolicy(bucket string) error { + values := make(url.Values) + values.Set(peerRESTBucket, bucket) + respBody, err := client.call(peerRESTMethodBucketPolicyRemove, values, nil, -1) + if err != nil { + return err + } + defer http.DrainBody(respBody) + return nil +} + +// SetBucketPolicy - Set bucket policy on the peer node. +func (client *peerRESTClient) SetBucketPolicy(bucket string, bucketPolicy *policy.Policy) error { + values := make(url.Values) + values.Set(peerRESTBucket, bucket) + + var reader bytes.Buffer + err := gob.NewEncoder(&reader).Encode(bucketPolicy) + if err != nil { + return err + } + + respBody, err := client.call(peerRESTMethodBucketPolicySet, values, &reader, -1) + if err != nil { + return err + } + defer http.DrainBody(respBody) + return nil +} + +// PutBucketNotification - Put bucket notification on the peer node. +func (client *peerRESTClient) PutBucketNotification(bucket string, rulesMap event.RulesMap) error { + values := make(url.Values) + values.Set(peerRESTBucket, bucket) + + var reader bytes.Buffer + err := gob.NewEncoder(&reader).Encode(&rulesMap) + if err != nil { + return err + } + + respBody, err := client.call(peerRESTMethodBucketNotificationPut, values, &reader, -1) + if err != nil { + return err + } + defer http.DrainBody(respBody) + return nil +} + +// LoadUsers - send load users command to peer nodes. +func (client *peerRESTClient) LoadUsers() (err error) { + respBody, err := client.call(peerRESTMethodLoadUsers, nil, nil, -1) + if err != nil { + return + } + defer http.DrainBody(respBody) + return nil +} + +// LoadCredentials - send load credentials command to peer nodes. +func (client *peerRESTClient) LoadCredentials() (err error) { + respBody, err := client.call(peerRESTMethodLoadCredentials, nil, nil, -1) + if err != nil { + return + } + defer http.DrainBody(respBody) + return nil +} + +// SignalService - sends signal to peer nodes. +func (client *peerRESTClient) SignalService(sig serviceSignal) error { + values := make(url.Values) + values.Set(peerRESTSignal, string(sig)) + respBody, err := client.call(peerRESTMethodSignalService, values, nil, -1) + if err != nil { + return err + } + defer http.DrainBody(respBody) + return nil +} + +func getRemoteHosts(endpoints EndpointList) []*xnet.Host { + var remoteHosts []*xnet.Host + for _, hostStr := range GetRemotePeers(endpoints) { + host, err := xnet.ParseHost(hostStr) + logger.FatalIf(err, "Unable to parse peer Host") + remoteHosts = append(remoteHosts, host) + } + + return remoteHosts +} + +func getRestClients(peerHosts []*xnet.Host) ([]*peerRESTClient, error) { + restClients := make([]*peerRESTClient, len(peerHosts)) + for i, host := range peerHosts { + client, err := newPeerRESTClient(host) + if err != nil { + logger.LogIf(context.Background(), err) + } + restClients[i] = client + } + + return restClients, nil +} + +// Returns a peer rest client. +func newPeerRESTClient(peer *xnet.Host) (*peerRESTClient, error) { + + scheme := "http" + if globalIsSSL { + scheme = "https" + } + + serverURL := &url.URL{ + Scheme: scheme, + Host: peer.String(), + Path: peerRESTPath, + } + + var tlsConfig *tls.Config + if globalIsSSL { + tlsConfig = &tls.Config{ + ServerName: peer.String(), + RootCAs: globalRootCAs, + } + } + + restClient, err := rest.NewClient(serverURL, tlsConfig, rest.DefaultRESTTimeout, newAuthToken) + + if err != nil { + return &peerRESTClient{host: peer, restClient: restClient, connected: false}, err + } + + return &peerRESTClient{host: peer, restClient: restClient, connected: true}, nil +} diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go new file mode 100644 index 000000000..1c573525c --- /dev/null +++ b/cmd/peer-rest-common.go @@ -0,0 +1,48 @@ +/* + * Minio Cloud Storage, (C) 2019 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +const peerRESTVersion = "v1" +const peerRESTPath = minioReservedBucketPath + "/peer/" + peerRESTVersion + +const ( + peerRESTMethodServerInfo = "serverinfo" + peerRESTMethodCPULoadInfo = "cpuloadinfo" + peerRESTMethodMemUsageInfo = "memusageinfo" + peerRESTMethodDrivePerfInfo = "driveperfinfo" + peerRESTMethodDeleteBucket = "deletebucket" + peerRESTMethodSignalService = "signalservice" + peerRESTMethodGetLocks = "getlocks" + peerRESTMethodBucketPolicyRemove = "removebucketpolicy" + peerRESTMethodLoadUsers = "loadusers" + peerRESTMethodStartProfiling = "startprofiling" + peerRESTMethodDownloadProfilingData = "downloadprofilingdata" + peerRESTMethodLoadCredentials = "loadcredentials" + peerRESTMethodBucketPolicySet = "setbucketpolicy" + peerRESTMethodBucketNotificationPut = "putbucketnotification" + peerRESTMethodBucketNotificationListen = "listenbucketnotification" + peerRESTMethodReloadFormat = "reloadformat" + peerRESTMethodTargetExists = "targetexists" + peerRESTMethodSendEvent = "sendevent" +) + +const ( + peerRESTBucket = "bucket" + peerRESTSignal = "signal" + peerRESTProfiler = "profiler" + peerRESTDryRun = "dry-run" +) diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go new file mode 100644 index 000000000..9f734d282 --- /dev/null +++ b/cmd/peer-rest-server.go @@ -0,0 +1,621 @@ +/* + * Minio Cloud Storage, (C) 2019 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "context" + "encoding/gob" + "errors" + "fmt" + "net/http" + "path" + "sort" + "strings" + "time" + + "github.com/gorilla/mux" + "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/event" + xnet "github.com/minio/minio/pkg/net" + "github.com/minio/minio/pkg/policy" +) + +// To abstract a node over network. +type peerRESTServer struct { +} + +func getServerInfo() (*ServerInfoData, error) { + if globalBootTime.IsZero() { + return nil, errServerNotInitialized + } + + objLayer := newObjectLayerFn() + if objLayer == nil { + return nil, errServerNotInitialized + } + // Server info data. + return &ServerInfoData{ + StorageInfo: objLayer.StorageInfo(context.Background()), + ConnStats: globalConnStats.toServerConnStats(), + HTTPStats: globalHTTPStats.toServerHTTPStats(), + Properties: ServerProperties{ + Uptime: UTCNow().Sub(globalBootTime), + Version: Version, + CommitID: CommitID, + SQSARN: globalNotificationSys.GetARNList(), + Region: globalServerConfig.GetRegion(), + }, + }, nil +} + +// uptimes - used to sort uptimes in chronological order. +type uptimes []time.Duration + +func (ts uptimes) Len() int { + return len(ts) +} + +func (ts uptimes) Less(i, j int) bool { + return ts[i] < ts[j] +} + +func (ts uptimes) Swap(i, j int) { + ts[i], ts[j] = ts[j], ts[i] +} + +// getPeerUptimes - returns the uptime. +func getPeerUptimes(serverInfo []ServerInfo) time.Duration { + // In a single node Erasure or FS backend setup the uptime of + // the setup is the uptime of the single minio server + // instance. + if !globalIsDistXL { + return UTCNow().Sub(globalBootTime) + } + + var times []time.Duration + + for _, info := range serverInfo { + if info.Error != "" { + continue + } + times = append(times, info.Data.Properties.Uptime) + } + + // Sort uptimes in chronological order. + sort.Sort(uptimes(times)) + + // Return the latest time as the uptime. + return times[0] +} + +// GetLocksHandler - returns list of older lock from the server. +func (s *peerRESTServer) GetLocksHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + ctx := newContext(r, w, "GetLocks") + locks := globalLockServer.ll.DupLockMap() + logger.LogIf(ctx, gob.NewEncoder(w).Encode(locks)) + + w.(http.Flusher).Flush() + +} + +// LoadUsersHandler - returns server info. +func (s *peerRESTServer) LoadUsersHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + objAPI := newObjectLayerFn() + if objAPI == nil { + s.writeErrorResponse(w, errServerNotInitialized) + return + } + + err := globalIAMSys.Load(objAPI) + if err != nil { + s.writeErrorResponse(w, err) + return + } + + w.(http.Flusher).Flush() +} + +// StartProfilingHandler - Issues the start profiling command. +func (s *peerRESTServer) StartProfilingHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + vars := mux.Vars(r) + profiler := vars[peerRESTProfiler] + if profiler == "" { + s.writeErrorResponse(w, errors.New("profiler name is missing")) + return + } + + if globalProfiler != nil { + globalProfiler.Stop() + } + + var err error + globalProfiler, err = startProfiler(profiler, "") + if err != nil { + s.writeErrorResponse(w, err) + return + } + + w.(http.Flusher).Flush() +} + +// ServerInfoHandler - returns server info. +func (s *peerRESTServer) ServerInfoHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + ctx := newContext(r, w, "ServerInfo") + info, err := getServerInfo() + if err != nil { + s.writeErrorResponse(w, err) + return + } + + defer w.(http.Flusher).Flush() + logger.LogIf(ctx, gob.NewEncoder(w).Encode(info)) +} + +// DownloadProflingDataHandler - returns proflied data. +func (s *peerRESTServer) DownloadProflingDataHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + ctx := newContext(r, w, "DownloadProfiling") + profileData, err := getProfileData() + if err != nil { + s.writeErrorResponse(w, err) + return + } + + defer w.(http.Flusher).Flush() + logger.LogIf(ctx, gob.NewEncoder(w).Encode(profileData)) +} + +// CPULoadInfoHandler - returns CPU Load info. +func (s *peerRESTServer) CPULoadInfoHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + ctx := newContext(r, w, "CPULoadInfo") + info := localEndpointsCPULoad(globalEndpoints) + + defer w.(http.Flusher).Flush() + logger.LogIf(ctx, gob.NewEncoder(w).Encode(info)) +} + +// DrivePerfInfoHandler - returns Drive Performance info. +func (s *peerRESTServer) DrivePerfInfoHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + ctx := newContext(r, w, "DrivePerfInfo") + info := localEndpointsDrivePerf(globalEndpoints) + + defer w.(http.Flusher).Flush() + logger.LogIf(ctx, gob.NewEncoder(w).Encode(info)) +} + +// MemUsageInfoHandler - returns Memory Usage info. +func (s *peerRESTServer) MemUsageInfoHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + ctx := newContext(r, w, "MemUsageInfo") + info := localEndpointsMemUsage(globalEndpoints) + + defer w.(http.Flusher).Flush() + logger.LogIf(ctx, gob.NewEncoder(w).Encode(info)) +} + +// LoadCredentialsHandler - loads credentials. +func (s *peerRESTServer) LoadCredentialsHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + // Construct path to config.json for the given bucket. + configFile := path.Join(bucketConfigPrefix, minioConfigFile) + transactionConfigFile := configFile + ".transaction" + + // As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket + // and configFile, take a transaction lock to avoid race. + objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile) + if err := objLock.GetRLock(globalOperationTimeout); err != nil { + s.writeErrorResponse(w, err) + return + } + objLock.RUnlock() + + if err := globalConfigSys.Load(newObjectLayerFn()); err != nil { + s.writeErrorResponse(w, err) + return + } + + w.(http.Flusher).Flush() +} + +// DeleteBucketHandler - Delete notification and policies related to the bucket. +func (s *peerRESTServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + vars := mux.Vars(r) + bucketName := vars[peerRESTBucket] + if bucketName == "" { + s.writeErrorResponse(w, errors.New("Bucket name is missing")) + return + } + + globalNotificationSys.RemoveNotification(bucketName) + globalPolicySys.Remove(bucketName) + + w.(http.Flusher).Flush() +} + +// ReloadFormatHandler - Reload Format. +func (s *peerRESTServer) ReloadFormatHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + vars := mux.Vars(r) + dryRunString := vars[peerRESTDryRun] + if dryRunString == "" { + s.writeErrorResponse(w, errors.New("dry run parameter is missing")) + return + } + + var dryRun bool + switch strings.ToLower(dryRunString) { + case "true": + dryRun = true + case "false": + dryRun = false + default: + s.writeErrorResponse(w, errInvalidArgument) + return + } + + objAPI := newObjectLayerFn() + if objAPI == nil { + s.writeErrorResponse(w, errServerNotInitialized) + return + } + err := objAPI.ReloadFormat(context.Background(), dryRun) + if err != nil { + s.writeErrorResponse(w, err) + return + } + w.(http.Flusher).Flush() +} + +// RemoveBucketPolicyHandler - Remove bucket policy. +func (s *peerRESTServer) RemoveBucketPolicyHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + vars := mux.Vars(r) + bucketName := vars[peerRESTBucket] + if bucketName == "" { + s.writeErrorResponse(w, errors.New("Bucket name is missing")) + return + } + + globalPolicySys.Remove(bucketName) + w.(http.Flusher).Flush() +} + +// SetBucketPolicyHandler - Set bucket policy. +func (s *peerRESTServer) SetBucketPolicyHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + vars := mux.Vars(r) + bucketName := vars[peerRESTBucket] + if bucketName == "" { + s.writeErrorResponse(w, errors.New("Bucket name is missing")) + return + } + var policyData policy.Policy + if r.ContentLength < 0 { + s.writeErrorResponse(w, errInvalidArgument) + return + } + + err := gob.NewDecoder(r.Body).Decode(&policyData) + if err != nil { + s.writeErrorResponse(w, err) + return + } + globalPolicySys.Set(bucketName, policyData) + w.(http.Flusher).Flush() +} + +type remoteTargetExistsResp struct { + Exists bool +} + +// TargetExistsHandler - Check if Target exists. +func (s *peerRESTServer) TargetExistsHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "TargetExists") + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + vars := mux.Vars(r) + bucketName := vars[peerRESTBucket] + if bucketName == "" { + s.writeErrorResponse(w, errors.New("Bucket name is missing")) + return + } + var targetID event.TargetID + if r.ContentLength <= 0 { + s.writeErrorResponse(w, errInvalidArgument) + return + } + + err := gob.NewDecoder(r.Body).Decode(&targetID) + if err != nil { + s.writeErrorResponse(w, err) + return + } + + var targetExists remoteTargetExistsResp + targetExists.Exists = globalNotificationSys.RemoteTargetExist(bucketName, targetID) + + defer w.(http.Flusher).Flush() + logger.LogIf(ctx, gob.NewEncoder(w).Encode(&targetExists)) +} + +type sendEventRequest struct { + Event event.Event + TargetID event.TargetID +} + +type sendEventResp struct { + Success bool +} + +// SendEventHandler - Send Event. +func (s *peerRESTServer) SendEventHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + ctx := newContext(r, w, "SendEvent") + + vars := mux.Vars(r) + bucketName := vars[peerRESTBucket] + if bucketName == "" { + s.writeErrorResponse(w, errors.New("Bucket name is missing")) + return + } + var eventReq sendEventRequest + if r.ContentLength <= 0 { + s.writeErrorResponse(w, errInvalidArgument) + return + } + + err := gob.NewDecoder(r.Body).Decode(&eventReq) + if err != nil { + s.writeErrorResponse(w, err) + return + } + + var eventResp sendEventResp + eventResp.Success = true + errs := globalNotificationSys.send(bucketName, eventReq.Event, eventReq.TargetID) + + for i := range errs { + reqInfo := (&logger.ReqInfo{}).AppendTags("Event", eventReq.Event.EventName.String()) + reqInfo.AppendTags("targetName", eventReq.TargetID.Name) + ctx := logger.SetReqInfo(context.Background(), reqInfo) + logger.LogIf(ctx, errs[i].Err) + + eventResp.Success = false + s.writeErrorResponse(w, errs[i].Err) + return + } + logger.LogIf(ctx, gob.NewEncoder(w).Encode(&eventResp)) + w.(http.Flusher).Flush() +} + +// PutBucketNotificationHandler - Set bucket policy. +func (s *peerRESTServer) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + vars := mux.Vars(r) + bucketName := vars[peerRESTBucket] + if bucketName == "" { + s.writeErrorResponse(w, errors.New("Bucket name is missing")) + return + } + + var rulesMap event.RulesMap + if r.ContentLength < 0 { + s.writeErrorResponse(w, errInvalidArgument) + return + } + + err := gob.NewDecoder(r.Body).Decode(&rulesMap) + if err != nil { + s.writeErrorResponse(w, err) + return + } + + globalNotificationSys.AddRulesMap(bucketName, rulesMap) + w.(http.Flusher).Flush() +} + +type listenBucketNotificationReq struct { + EventNames []event.Name `json:"eventNames"` + Pattern string `json:"pattern"` + TargetID event.TargetID `json:"targetId"` + Addr xnet.Host `json:"addr"` +} + +// ListenBucketNotificationHandler - Listen bucket notification handler. +func (s *peerRESTServer) ListenBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + vars := mux.Vars(r) + bucketName := vars[peerRESTBucket] + if bucketName == "" { + s.writeErrorResponse(w, errors.New("Bucket name is missing")) + return + } + + var args listenBucketNotificationReq + if r.ContentLength <= 0 { + s.writeErrorResponse(w, errInvalidArgument) + return + } + + err := gob.NewDecoder(r.Body).Decode(&args) + if err != nil { + s.writeErrorResponse(w, err) + return + } + + restClient, err := newPeerRESTClient(&args.Addr) + if err != nil { + s.writeErrorResponse(w, fmt.Errorf("unable to find PeerRESTClient for provided address %v. This happens only if remote and this minio run with different set of endpoints", args.Addr)) + return + } + + target := NewPeerRESTClientTarget(bucketName, args.TargetID, restClient) + rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID()) + if err := globalNotificationSys.AddRemoteTarget(bucketName, target, rulesMap); err != nil { + reqInfo := &logger.ReqInfo{BucketName: target.bucketName} + reqInfo.AppendTags("target", target.id.Name) + ctx := logger.SetReqInfo(context.Background(), reqInfo) + logger.LogIf(ctx, err) + s.writeErrorResponse(w, err) + return + } + w.(http.Flusher).Flush() +} + +var errUnsupportedSignal = fmt.Errorf("unsupported signal: only restart and stop signals are supported") + +// SignalServiceHandler - signal service handler. +func (s *peerRESTServer) SignalServiceHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + vars := mux.Vars(r) + signalString := vars[peerRESTSignal] + if signalString == "" { + s.writeErrorResponse(w, errors.New("signal name is missing")) + return + } + signal := serviceSignal(signalString) + defer w.(http.Flusher).Flush() + switch signal { + case serviceRestart, serviceStop: + globalServiceSignalCh <- signal + default: + s.writeErrorResponse(w, errUnsupportedSignal) + return + } +} + +func (s *peerRESTServer) writeErrorResponse(w http.ResponseWriter, err error) { + w.WriteHeader(http.StatusForbidden) + w.Write([]byte(err.Error())) +} + +// IsValid - To authenticate and verify the time difference. +func (s *peerRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool { + if err := storageServerRequestValidate(r); err != nil { + s.writeErrorResponse(w, err) + return false + } + return true +} + +// registerPeerRESTHandlers - register peer rest router. +func registerPeerRESTHandlers(router *mux.Router) { + server := &peerRESTServer{} + subrouter := router.PathPrefix(peerRESTPath).Subrouter() + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodGetLocks).HandlerFunc(httpTraceHdrs(server.GetLocksHandler)) + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodServerInfo).HandlerFunc(httpTraceHdrs(server.ServerInfoHandler)) + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodCPULoadInfo).HandlerFunc(httpTraceHdrs(server.CPULoadInfoHandler)) + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodMemUsageInfo).HandlerFunc(httpTraceHdrs(server.MemUsageInfoHandler)) + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodDrivePerfInfo).HandlerFunc(httpTraceHdrs(server.DrivePerfInfoHandler)) + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodDeleteBucket).HandlerFunc(httpTraceHdrs(server.DeleteBucketHandler)).Queries(restQueries(peerRESTBucket)...) + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodSignalService).HandlerFunc(httpTraceHdrs(server.SignalServiceHandler)).Queries(restQueries(peerRESTSignal)...) + + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodBucketPolicyRemove).HandlerFunc(httpTraceAll(server.RemoveBucketPolicyHandler)).Queries(restQueries(peerRESTBucket)...) + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodBucketPolicySet).HandlerFunc(httpTraceHdrs(server.SetBucketPolicyHandler)).Queries(restQueries(peerRESTBucket)...) + + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodLoadUsers).HandlerFunc(httpTraceAll(server.LoadUsersHandler)) + + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodStartProfiling).HandlerFunc(httpTraceAll(server.StartProfilingHandler)).Queries(restQueries(peerRESTProfiler)...) + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodDownloadProfilingData).HandlerFunc(httpTraceHdrs(server.DownloadProflingDataHandler)) + + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodLoadCredentials).HandlerFunc(httpTraceHdrs(server.LoadCredentialsHandler)) + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodTargetExists).HandlerFunc(httpTraceHdrs(server.TargetExistsHandler)).Queries(restQueries(peerRESTBucket)...) + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodSendEvent).HandlerFunc(httpTraceHdrs(server.SendEventHandler)).Queries(restQueries(peerRESTBucket)...) + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodBucketNotificationPut).HandlerFunc(httpTraceHdrs(server.PutBucketNotificationHandler)).Queries(restQueries(peerRESTBucket)...) + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodBucketNotificationListen).HandlerFunc(httpTraceHdrs(server.ListenBucketNotificationHandler)).Queries(restQueries(peerRESTBucket)...) + + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodReloadFormat).HandlerFunc(httpTraceHdrs(server.ReloadFormatHandler)).Queries(restQueries(peerRESTDryRun)...) + + router.NotFoundHandler = http.HandlerFunc(httpTraceAll(notFoundHandler)) +} diff --git a/cmd/peer-rpc-client.go b/cmd/peer-rpc-client.go deleted file mode 100644 index 16aae60d7..000000000 --- a/cmd/peer-rpc-client.go +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2018, 2019 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "context" - "crypto/tls" - - "github.com/minio/minio/cmd/logger" - "github.com/minio/minio/pkg/event" - xnet "github.com/minio/minio/pkg/net" - "github.com/minio/minio/pkg/policy" -) - -// PeerRPCClient - peer RPC client talks to peer RPC server. -type PeerRPCClient struct { - *RPCClient -} - -// DeleteBucket - calls delete bucket RPC. -func (rpcClient *PeerRPCClient) DeleteBucket(bucketName string) error { - args := DeleteBucketArgs{BucketName: bucketName} - reply := VoidReply{} - return rpcClient.Call(peerServiceName+".DeleteBucket", &args, &reply) -} - -// SetBucketPolicy - calls set bucket policy RPC. -func (rpcClient *PeerRPCClient) SetBucketPolicy(bucketName string, bucketPolicy *policy.Policy) error { - args := SetBucketPolicyArgs{ - BucketName: bucketName, - Policy: *bucketPolicy, - } - reply := VoidReply{} - return rpcClient.Call(peerServiceName+".SetBucketPolicy", &args, &reply) -} - -// RemoveBucketPolicy - calls remove bucket policy RPC. -func (rpcClient *PeerRPCClient) RemoveBucketPolicy(bucketName string) error { - args := RemoveBucketPolicyArgs{ - BucketName: bucketName, - } - reply := VoidReply{} - return rpcClient.Call(peerServiceName+".RemoveBucketPolicy", &args, &reply) -} - -// PutBucketNotification - calls put bukcet notification RPC. -func (rpcClient *PeerRPCClient) PutBucketNotification(bucketName string, rulesMap event.RulesMap) error { - args := PutBucketNotificationArgs{ - BucketName: bucketName, - RulesMap: rulesMap, - } - reply := VoidReply{} - return rpcClient.Call(peerServiceName+".PutBucketNotification", &args, &reply) -} - -// ListenBucketNotification - calls listen bucket notification RPC. -func (rpcClient *PeerRPCClient) ListenBucketNotification(bucketName string, eventNames []event.Name, - pattern string, targetID event.TargetID, addr xnet.Host) error { - args := ListenBucketNotificationArgs{ - BucketName: bucketName, - EventNames: eventNames, - Pattern: pattern, - TargetID: targetID, - Addr: addr, - } - reply := VoidReply{} - return rpcClient.Call(peerServiceName+".ListenBucketNotification", &args, &reply) -} - -// RemoteTargetExist - calls remote target ID exist RPC. -func (rpcClient *PeerRPCClient) RemoteTargetExist(bucketName string, targetID event.TargetID) (bool, error) { - args := RemoteTargetExistArgs{ - BucketName: bucketName, - TargetID: targetID, - } - var reply bool - - err := rpcClient.Call(peerServiceName+".RemoteTargetExist", &args, &reply) - return reply, err -} - -// SendEvent - calls send event RPC. -func (rpcClient *PeerRPCClient) SendEvent(bucketName string, targetID, remoteTargetID event.TargetID, eventData event.Event) error { - args := SendEventArgs{ - BucketName: bucketName, - TargetID: remoteTargetID, - Event: eventData, - } - var reply bool - - err := rpcClient.Call(peerServiceName+".SendEvent", &args, &reply) - if err != nil && !reply { - reqInfo := &logger.ReqInfo{BucketName: bucketName} - reqInfo.AppendTags("targetID", targetID.Name) - reqInfo.AppendTags("event", eventData.EventName.String()) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) - globalNotificationSys.RemoveRemoteTarget(bucketName, targetID) - } - - return err -} - -// ReloadFormat - calls reload format RPC. -func (rpcClient *PeerRPCClient) ReloadFormat(dryRun bool) error { - args := ReloadFormatArgs{ - DryRun: dryRun, - } - reply := VoidReply{} - - return rpcClient.Call(peerServiceName+".ReloadFormat", &args, &reply) -} - -// LoadUsers - calls load users RPC. -func (rpcClient *PeerRPCClient) LoadUsers() error { - args := AuthArgs{} - reply := VoidReply{} - - return rpcClient.Call(peerServiceName+".LoadUsers", &args, &reply) -} - -// LoadCredentials - calls load credentials RPC. -func (rpcClient *PeerRPCClient) LoadCredentials() error { - args := AuthArgs{} - reply := VoidReply{} - - return rpcClient.Call(peerServiceName+".LoadCredentials", &args, &reply) -} - -// DrivePerfInfo - returns drive performance info for remote server. -func (rpcClient *PeerRPCClient) DrivePerfInfo() (ServerDrivesPerfInfo, error) { - args := AuthArgs{} - var reply ServerDrivesPerfInfo - - err := rpcClient.Call(peerServiceName+".DrivePerfInfo", &args, &reply) - return reply, err -} - -// MemUsageInfo - returns mem utilization info for remote server -func (rpcClient *PeerRPCClient) MemUsageInfo() (ServerMemUsageInfo, error) { - args := AuthArgs{} - var reply ServerMemUsageInfo - - err := rpcClient.Call(peerServiceName+".MemUsageInfo", &args, &reply) - return reply, err -} - -// CPULoadInfo - returns cpu performance info for remote server -func (rpcClient *PeerRPCClient) CPULoadInfo() (ServerCPULoadInfo, error) { - args := AuthArgs{} - var reply ServerCPULoadInfo - - err := rpcClient.Call(peerServiceName+".CPULoadInfo", &args, &reply) - return reply, err -} - -// StartProfiling - starts profiling on the remote server. -func (rpcClient *PeerRPCClient) StartProfiling(profiler string) error { - args := StartProfilingArgs{Profiler: profiler} - reply := VoidReply{} - return rpcClient.Call(peerServiceName+".StartProfiling", &args, &reply) -} - -// DownloadProfilingData - download already running profiling on the remote server. -func (rpcClient *PeerRPCClient) DownloadProfilingData() ([]byte, error) { - args := AuthArgs{} - var reply []byte - err := rpcClient.Call(peerServiceName+".DownloadProfilingData", &args, &reply) - return reply, err -} - -// SignalService - calls load server info RPC. -func (rpcClient *PeerRPCClient) SignalService(sig serviceSignal) error { - args := SignalServiceArgs{Sig: sig} - reply := VoidReply{} - - return rpcClient.Call(peerServiceName+".SignalService", &args, &reply) -} - -// ServerInfo - calls load server info RPC. -func (rpcClient *PeerRPCClient) ServerInfo() (ServerInfoData, error) { - args := AuthArgs{} - reply := ServerInfoData{} - - err := rpcClient.Call(peerServiceName+".ServerInfo", &args, &reply) - return reply, err -} - -// GetLocksResp stores various info from the client for each lock that is requested. -type GetLocksResp map[string][]lockRequesterInfo - -// GetLocks - returns the lock information on the server to which the RPC call is made. -func (rpcClient *PeerRPCClient) GetLocks() (resp GetLocksResp, err error) { - err = rpcClient.Call(peerServiceName+".GetLocks", &AuthArgs{}, &resp) - return resp, err -} - -// NewPeerRPCClient - returns new peer RPC client. -func NewPeerRPCClient(host *xnet.Host) (*PeerRPCClient, error) { - scheme := "http" - if globalIsSSL { - scheme = "https" - } - - serviceURL := &xnet.URL{ - Scheme: scheme, - Host: host.String(), - Path: peerServicePath, - } - - var tlsConfig *tls.Config - if globalIsSSL { - tlsConfig = &tls.Config{ - ServerName: host.Name, - RootCAs: globalRootCAs, - } - } - - rpcClient, err := NewRPCClient( - RPCClientArgs{ - NewAuthTokenFunc: newAuthToken, - RPCVersion: globalRPCAPIVersion, - ServiceName: peerServiceName, - ServiceURL: serviceURL, - TLSConfig: tlsConfig, - }, - ) - if err != nil { - return nil, err - } - - return &PeerRPCClient{rpcClient}, nil -} - -// makeRemoteRPCClients - creates Peer RPCClients for given endpoint list. -func makeRemoteRPCClients(endpoints EndpointList) map[xnet.Host]*PeerRPCClient { - peerRPCClientMap := make(map[xnet.Host]*PeerRPCClient) - for _, hostStr := range GetRemotePeers(endpoints) { - host, err := xnet.ParseHost(hostStr) - logger.FatalIf(err, "Unable to parse peer RPC Host") - rpcClient, err := NewPeerRPCClient(host) - logger.FatalIf(err, "Unable to parse peer RPC Client") - peerRPCClientMap[*host] = rpcClient - } - - return peerRPCClientMap -} diff --git a/cmd/peer-rpc-server.go b/cmd/peer-rpc-server.go deleted file mode 100644 index fc8fbbdaa..000000000 --- a/cmd/peer-rpc-server.go +++ /dev/null @@ -1,419 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2018, 2019 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "context" - "fmt" - "path" - "sort" - "time" - - "github.com/gorilla/mux" - "github.com/minio/minio/cmd/logger" - xrpc "github.com/minio/minio/cmd/rpc" - "github.com/minio/minio/pkg/event" - xnet "github.com/minio/minio/pkg/net" - "github.com/minio/minio/pkg/policy" -) - -const peerServiceName = "Peer" -const peerServiceSubPath = "/peer/remote" - -var peerServicePath = path.Join(minioReservedBucketPath, peerServiceSubPath) - -// peerRPCReceiver - Peer RPC receiver for peer RPC server. -type peerRPCReceiver struct{} - -// DeleteBucketArgs - delete bucket RPC arguments. -type DeleteBucketArgs struct { - AuthArgs - BucketName string -} - -// DeleteBucket - handles delete bucket RPC call which removes all values of given bucket in global NotificationSys object. -func (receiver *peerRPCReceiver) DeleteBucket(args *DeleteBucketArgs, reply *VoidReply) error { - objAPI := newObjectLayerFn() - if objAPI == nil { - return errServerNotInitialized - } - - globalNotificationSys.RemoveNotification(args.BucketName) - globalPolicySys.Remove(args.BucketName) - return nil -} - -// SetBucketPolicyArgs - set bucket policy RPC arguments. -type SetBucketPolicyArgs struct { - AuthArgs - BucketName string - Policy policy.Policy -} - -// SetBucketPolicy - handles set bucket policy RPC call which adds bucket policy to globalPolicySys. -func (receiver *peerRPCReceiver) SetBucketPolicy(args *SetBucketPolicyArgs, reply *VoidReply) error { - objAPI := newObjectLayerFn() - if objAPI == nil { - return errServerNotInitialized - } - - globalPolicySys.Set(args.BucketName, args.Policy) - return nil -} - -// RemoveBucketPolicyArgs - delete bucket policy RPC arguments. -type RemoveBucketPolicyArgs struct { - AuthArgs - BucketName string -} - -// RemoveBucketPolicy - handles delete bucket policy RPC call which removes bucket policy to globalPolicySys. -func (receiver *peerRPCReceiver) RemoveBucketPolicy(args *RemoveBucketPolicyArgs, reply *VoidReply) error { - objAPI := newObjectLayerFn() - if objAPI == nil { - return errServerNotInitialized - } - - globalPolicySys.Remove(args.BucketName) - return nil -} - -// PutBucketNotificationArgs - put bucket notification RPC arguments. -type PutBucketNotificationArgs struct { - AuthArgs - BucketName string - RulesMap event.RulesMap -} - -// PutBucketNotification - handles put bucket notification RPC call which adds rules to given bucket to global NotificationSys object. -func (receiver *peerRPCReceiver) PutBucketNotification(args *PutBucketNotificationArgs, reply *VoidReply) error { - objAPI := newObjectLayerFn() - if objAPI == nil { - return errServerNotInitialized - } - - globalNotificationSys.AddRulesMap(args.BucketName, args.RulesMap) - return nil -} - -// ListenBucketNotificationArgs - listen bucket notification RPC arguments. -type ListenBucketNotificationArgs struct { - AuthArgs `json:"-"` - BucketName string `json:"-"` - EventNames []event.Name `json:"eventNames"` - Pattern string `json:"pattern"` - TargetID event.TargetID `json:"targetId"` - Addr xnet.Host `json:"addr"` -} - -// ListenBucketNotification - handles listen bucket notification RPC call. -// It creates PeerRPCClient target which pushes requested events to target in remote peer. -func (receiver *peerRPCReceiver) ListenBucketNotification(args *ListenBucketNotificationArgs, reply *VoidReply) error { - objAPI := newObjectLayerFn() - if objAPI == nil { - return errServerNotInitialized - } - - rpcClient := globalNotificationSys.GetPeerRPCClient(args.Addr) - if rpcClient == nil { - return fmt.Errorf("unable to find PeerRPCClient for provided address %v. This happens only if remote and this minio run with different set of endpoints", args.Addr) - } - - target := NewPeerRPCClientTarget(args.BucketName, args.TargetID, rpcClient) - rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID()) - if err := globalNotificationSys.AddRemoteTarget(args.BucketName, target, rulesMap); err != nil { - reqInfo := &logger.ReqInfo{BucketName: target.bucketName} - reqInfo.AppendTags("target", target.id.Name) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) - return err - } - return nil -} - -// RemoteTargetExistArgs - remote target ID exist RPC arguments. -type RemoteTargetExistArgs struct { - AuthArgs - BucketName string - TargetID event.TargetID -} - -// RemoteTargetExist - handles target ID exist RPC call which checks whether given target ID is a HTTP client target or not. -func (receiver *peerRPCReceiver) RemoteTargetExist(args *RemoteTargetExistArgs, reply *bool) error { - objAPI := newObjectLayerFn() - if objAPI == nil { - return errServerNotInitialized - } - - *reply = globalNotificationSys.RemoteTargetExist(args.BucketName, args.TargetID) - return nil -} - -// SendEventArgs - send event RPC arguments. -type SendEventArgs struct { - AuthArgs - Event event.Event - TargetID event.TargetID - BucketName string -} - -// SendEvent - handles send event RPC call which sends given event to target by given target ID. -func (receiver *peerRPCReceiver) SendEvent(args *SendEventArgs, reply *bool) error { - objAPI := newObjectLayerFn() - if objAPI == nil { - return errServerNotInitialized - } - - // Set default to true to keep the target. - *reply = true - errs := globalNotificationSys.send(args.BucketName, args.Event, args.TargetID) - - for i := range errs { - reqInfo := (&logger.ReqInfo{}).AppendTags("Event", args.Event.EventName.String()) - reqInfo.AppendTags("targetName", args.TargetID.Name) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, errs[i].Err) - - *reply = false // send failed i.e. do not keep the target. - return errs[i].Err - } - - return nil -} - -// ReloadFormatArgs - send event RPC arguments. -type ReloadFormatArgs struct { - AuthArgs - DryRun bool -} - -// ReloadFormat - handles reload format RPC call, reloads latest `format.json` -func (receiver *peerRPCReceiver) ReloadFormat(args *ReloadFormatArgs, reply *VoidReply) error { - objAPI := newObjectLayerFn() - if objAPI == nil { - return errServerNotInitialized - } - return objAPI.ReloadFormat(context.Background(), args.DryRun) -} - -// LoadUsers - handles load users RPC call. -func (receiver *peerRPCReceiver) LoadUsers(args *AuthArgs, reply *VoidReply) error { - objAPI := newObjectLayerFn() - if objAPI == nil { - return errServerNotInitialized - } - return globalIAMSys.Load(objAPI) -} - -// LoadCredentials - handles load credentials RPC call. -func (receiver *peerRPCReceiver) LoadCredentials(args *AuthArgs, reply *VoidReply) error { - objAPI := newObjectLayerFn() - if objAPI == nil { - return errServerNotInitialized - } - - // Construct path to config.json for the given bucket. - configFile := path.Join(bucketConfigPrefix, minioConfigFile) - transactionConfigFile := configFile + ".transaction" - - // As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket - // and configFile, take a transaction lock to avoid race. - objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile) - if err := objLock.GetRLock(globalOperationTimeout); err != nil { - return err - } - objLock.RUnlock() - - return globalConfigSys.Load(newObjectLayerFn()) -} - -// DrivePerfInfo - handles drive performance RPC call -func (receiver *peerRPCReceiver) DrivePerfInfo(args *AuthArgs, reply *ServerDrivesPerfInfo) error { - objAPI := newObjectLayerFn() - if objAPI == nil { - return errServerNotInitialized - } - - *reply = localEndpointsDrivePerf(globalEndpoints) - return nil -} - -// CPULoadInfo - handles cpu performance RPC call -func (receiver *peerRPCReceiver) CPULoadInfo(args *AuthArgs, reply *ServerCPULoadInfo) error { - objAPI := newObjectLayerFn() - if objAPI == nil { - return errServerNotInitialized - } - *reply = localEndpointsCPULoad(globalEndpoints) - return nil -} - -// MemUsageInfo - handles mem utilization RPC call -func (receiver *peerRPCReceiver) MemUsageInfo(args *AuthArgs, reply *ServerMemUsageInfo) error { - objAPI := newObjectLayerFn() - if objAPI == nil { - return errServerNotInitialized - } - *reply = localEndpointsMemUsage(globalEndpoints) - return nil -} - -// uptimes - used to sort uptimes in chronological order. -type uptimes []time.Duration - -func (ts uptimes) Len() int { - return len(ts) -} - -func (ts uptimes) Less(i, j int) bool { - return ts[i] < ts[j] -} - -func (ts uptimes) Swap(i, j int) { - ts[i], ts[j] = ts[j], ts[i] -} - -// getPeerUptimes - returns the uptime. -func getPeerUptimes(serverInfo []ServerInfo) time.Duration { - // In a single node Erasure or FS backend setup the uptime of - // the setup is the uptime of the single minio server - // instance. - if !globalIsDistXL { - return UTCNow().Sub(globalBootTime) - } - - var times []time.Duration - - for _, info := range serverInfo { - if info.Error != "" { - continue - } - times = append(times, info.Data.Properties.Uptime) - } - - // Sort uptimes in chronological order. - sort.Sort(uptimes(times)) - - // Return the latest time as the uptime. - return times[0] -} - -// StartProfilingArgs - holds the RPC argument for StartingProfiling RPC call -type StartProfilingArgs struct { - AuthArgs - Profiler string -} - -// StartProfiling - profiling server receiver. -func (receiver *peerRPCReceiver) StartProfiling(args *StartProfilingArgs, reply *VoidReply) error { - if globalProfiler != nil { - globalProfiler.Stop() - } - var err error - globalProfiler, err = startProfiler(args.Profiler, "") - return err -} - -// DownloadProfilingData - download profiling data. -func (receiver *peerRPCReceiver) DownloadProfilingData(args *AuthArgs, reply *[]byte) error { - var err error - *reply, err = getProfileData() - return err -} - -var errUnsupportedSignal = fmt.Errorf("unsupported signal: only restart and stop signals are supported") - -// SignalServiceArgs - send event RPC arguments. -type SignalServiceArgs struct { - AuthArgs - Sig serviceSignal -} - -// SignalService - signal service receiver. -func (receiver *peerRPCReceiver) SignalService(args *SignalServiceArgs, reply *VoidReply) error { - switch args.Sig { - case serviceRestart, serviceStop: - globalServiceSignalCh <- args.Sig - default: - return errUnsupportedSignal - } - return nil -} - -// ServerInfo - server info receiver. -func (receiver *peerRPCReceiver) ServerInfo(args *AuthArgs, reply *ServerInfoData) error { - if globalBootTime.IsZero() { - return errServerNotInitialized - } - - // Build storage info - objLayer := newObjectLayerFn() - if objLayer == nil { - return errServerNotInitialized - } - - // Server info data. - *reply = ServerInfoData{ - StorageInfo: objLayer.StorageInfo(context.Background()), - ConnStats: globalConnStats.toServerConnStats(), - HTTPStats: globalHTTPStats.toServerHTTPStats(), - Properties: ServerProperties{ - Uptime: UTCNow().Sub(globalBootTime), - Version: Version, - CommitID: CommitID, - SQSARN: globalNotificationSys.GetARNList(), - Region: globalServerConfig.GetRegion(), - }, - } - - return nil -} - -// GetLocks - Get Locks receiver. -func (receiver *peerRPCReceiver) GetLocks(args *AuthArgs, reply *GetLocksResp) error { - if globalBootTime.IsZero() { - return errServerNotInitialized - } - - // Build storage info - objLayer := newObjectLayerFn() - if objLayer == nil { - return errServerNotInitialized - } - - // Locks data. - *reply = globalLockServer.ll.DupLockMap() - - return nil -} - -// NewPeerRPCServer - returns new peer RPC server. -func NewPeerRPCServer() (*xrpc.Server, error) { - rpcServer := xrpc.NewServer() - if err := rpcServer.RegisterName(peerServiceName, &peerRPCReceiver{}); err != nil { - return nil, err - } - return rpcServer, nil -} - -// registerPeerRPCRouter - creates and registers Peer RPC server and its router. -func registerPeerRPCRouter(router *mux.Router) { - rpcServer, err := NewPeerRPCServer() - logger.FatalIf(err, "Unable to initialize peer RPC Server") - subrouter := router.PathPrefix(minioReservedBucketPath).Subrouter() - subrouter.Path(peerServiceSubPath).HandlerFunc(httpTraceHdrs(rpcServer.ServeHTTP)) -} diff --git a/cmd/routers.go b/cmd/routers.go index 7952992ce..c050c2e4a 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -38,11 +38,12 @@ func registerDistXLRouters(router *mux.Router, endpoints EndpointList) { // Register storage rpc router only if its a distributed setup. registerStorageRESTHandlers(router, endpoints) + // Register peer REST router only if its a distributed setup. + registerPeerRESTHandlers(router) + // Register distributed namespace lock. registerDistNSLockRouter(router) - // Register peer communication router. - registerPeerRPCRouter(router) } // List of some generic handlers which are applied for all incoming requests. diff --git a/cmd/service.go b/cmd/service.go index fb37052a5..1a31a9516 100644 --- a/cmd/service.go +++ b/cmd/service.go @@ -22,12 +22,12 @@ import ( ) // Type of service signals currently supported. -type serviceSignal int +type serviceSignal string const ( - serviceStatus = iota // Gets status about the service. - serviceRestart // Restarts the service. - serviceStop // Stops the server. + serviceStatus serviceSignal = "serviceStatus" // Gets status about the service. + serviceRestart = "serviceRestart" // Restarts the service. + serviceStop = "serviceStop" // Stops the server. // Add new service requests here. )