mirror of
https://github.com/minio/minio.git
synced 2025-11-22 02:35:30 -05:00
Implement HTTP POST based RPC (#5840)
Added support for new RPC support using HTTP POST. RPC's arguments and reply are Gob encoded and sent as HTTP request/response body. This patch also removes Go RPC based implementation.
This commit is contained in:
207
cmd/peer-rpc-server.go
Normal file
207
cmd/peer-rpc-server.go
Normal file
@@ -0,0 +1,207 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2018 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
xrpc "github.com/minio/minio/cmd/rpc"
|
||||
"github.com/minio/minio/pkg/auth"
|
||||
"github.com/minio/minio/pkg/event"
|
||||
xnet "github.com/minio/minio/pkg/net"
|
||||
"github.com/minio/minio/pkg/policy"
|
||||
)
|
||||
|
||||
const peerServiceName = "Peer"
|
||||
const peerServiceSubPath = "/s3/remote"
|
||||
|
||||
var peerServicePath = path.Join(minioReservedBucketPath, peerServiceSubPath)
|
||||
|
||||
// peerRPCReceiver - Peer RPC receiver for peer RPC server.
|
||||
type peerRPCReceiver struct{}
|
||||
|
||||
// DeleteBucketArgs - delete bucket RPC arguments.
|
||||
type DeleteBucketArgs struct {
|
||||
AuthArgs
|
||||
BucketName string
|
||||
}
|
||||
|
||||
// DeleteBucket - handles delete bucket RPC call which removes all values of given bucket in global NotificationSys object.
|
||||
func (receiver *peerRPCReceiver) DeleteBucket(args *DeleteBucketArgs, reply *VoidReply) error {
|
||||
globalNotificationSys.RemoveNotification(args.BucketName)
|
||||
globalPolicySys.Remove(args.BucketName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetBucketPolicyArgs - set bucket policy RPC arguments.
|
||||
type SetBucketPolicyArgs struct {
|
||||
AuthArgs
|
||||
BucketName string
|
||||
Policy policy.Policy
|
||||
}
|
||||
|
||||
// SetBucketPolicy - handles set bucket policy RPC call which adds bucket policy to globalPolicySys.
|
||||
func (receiver *peerRPCReceiver) SetBucketPolicy(args *SetBucketPolicyArgs, reply *VoidReply) error {
|
||||
globalPolicySys.Set(args.BucketName, args.Policy)
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveBucketPolicyArgs - delete bucket policy RPC arguments.
|
||||
type RemoveBucketPolicyArgs struct {
|
||||
AuthArgs
|
||||
BucketName string
|
||||
}
|
||||
|
||||
// RemoveBucketPolicy - handles delete bucket policy RPC call which removes bucket policy to globalPolicySys.
|
||||
func (receiver *peerRPCReceiver) RemoveBucketPolicy(args *RemoveBucketPolicyArgs, reply *VoidReply) error {
|
||||
globalPolicySys.Remove(args.BucketName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// PutBucketNotificationArgs - put bucket notification RPC arguments.
|
||||
type PutBucketNotificationArgs struct {
|
||||
AuthArgs
|
||||
BucketName string
|
||||
RulesMap event.RulesMap
|
||||
}
|
||||
|
||||
// PutBucketNotification - handles put bucket notification RPC call which adds rules to given bucket to global NotificationSys object.
|
||||
func (receiver *peerRPCReceiver) PutBucketNotification(args *PutBucketNotificationArgs, reply *VoidReply) error {
|
||||
globalNotificationSys.AddRulesMap(args.BucketName, args.RulesMap)
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListenBucketNotificationArgs - listen bucket notification RPC arguments.
|
||||
type ListenBucketNotificationArgs struct {
|
||||
AuthArgs `json:"-"`
|
||||
BucketName string `json:"-"`
|
||||
EventNames []event.Name `json:"eventNames"`
|
||||
Pattern string `json:"pattern"`
|
||||
TargetID event.TargetID `json:"targetId"`
|
||||
Addr xnet.Host `json:"addr"`
|
||||
}
|
||||
|
||||
// ListenBucketNotification - handles listen bucket notification RPC call. It creates PeerRPCClient target which pushes requested events to target in remote peer.
|
||||
func (receiver *peerRPCReceiver) ListenBucketNotification(args *ListenBucketNotificationArgs, reply *VoidReply) error {
|
||||
rpcClient := globalNotificationSys.GetPeerRPCClient(args.Addr)
|
||||
if rpcClient == nil {
|
||||
return fmt.Errorf("unable to find PeerRPCClient for provided address %v. This happens only if remote and this minio run with different set of endpoints", args.Addr)
|
||||
}
|
||||
|
||||
target := NewPeerRPCClientTarget(args.BucketName, args.TargetID, rpcClient)
|
||||
rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID())
|
||||
if err := globalNotificationSys.AddRemoteTarget(args.BucketName, target, rulesMap); err != nil {
|
||||
reqInfo := &logger.ReqInfo{BucketName: target.bucketName}
|
||||
reqInfo.AppendTags("target", target.id.Name)
|
||||
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoteTargetExistArgs - remote target ID exist RPC arguments.
|
||||
type RemoteTargetExistArgs struct {
|
||||
AuthArgs
|
||||
BucketName string
|
||||
TargetID event.TargetID
|
||||
}
|
||||
|
||||
// RemoteTargetExist - handles target ID exist RPC call which checks whether given target ID is a HTTP client target or not.
|
||||
func (receiver *peerRPCReceiver) RemoteTargetExist(args *RemoteTargetExistArgs, reply *bool) error {
|
||||
*reply = globalNotificationSys.RemoteTargetExist(args.BucketName, args.TargetID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendEventArgs - send event RPC arguments.
|
||||
type SendEventArgs struct {
|
||||
AuthArgs
|
||||
Event event.Event
|
||||
TargetID event.TargetID
|
||||
BucketName string
|
||||
}
|
||||
|
||||
// SendEvent - handles send event RPC call which sends given event to target by given target ID.
|
||||
func (receiver *peerRPCReceiver) SendEvent(args *SendEventArgs, reply *bool) error {
|
||||
// Set default to true to keep the target.
|
||||
*reply = true
|
||||
errs := globalNotificationSys.send(args.BucketName, args.Event, args.TargetID)
|
||||
|
||||
for i := range errs {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("Event", args.Event.EventName.String())
|
||||
reqInfo.AppendTags("targetName", args.TargetID.Name)
|
||||
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
||||
logger.LogIf(ctx, errs[i].Err)
|
||||
|
||||
*reply = false // send failed i.e. do not keep the target.
|
||||
return errs[i].Err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetCredentialsArgs - set credentials RPC arguments.
|
||||
type SetCredentialsArgs struct {
|
||||
AuthArgs
|
||||
Credentials auth.Credentials
|
||||
}
|
||||
|
||||
// SetCredentials - handles set credentials RPC call.
|
||||
func (receiver *peerRPCReceiver) SetCredentials(args *SetCredentialsArgs, reply *VoidReply) error {
|
||||
if !args.Credentials.IsValid() {
|
||||
return fmt.Errorf("invalid credentials passed")
|
||||
}
|
||||
|
||||
// Acquire lock before updating global configuration.
|
||||
globalServerConfigMu.Lock()
|
||||
defer globalServerConfigMu.Unlock()
|
||||
|
||||
// Update credentials in memory
|
||||
prevCred := globalServerConfig.SetCredential(args.Credentials)
|
||||
|
||||
// Save credentials to config file
|
||||
if err := globalServerConfig.Save(); err != nil {
|
||||
// As saving configurstion failed, restore previous credential in memory.
|
||||
globalServerConfig.SetCredential(prevCred)
|
||||
|
||||
logger.LogIf(context.Background(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewPeerRPCServer - returns new peer RPC server.
|
||||
func NewPeerRPCServer() (*xrpc.Server, error) {
|
||||
rpcServer := xrpc.NewServer()
|
||||
if err := rpcServer.RegisterName(peerServiceName, &peerRPCReceiver{}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rpcServer, nil
|
||||
}
|
||||
|
||||
// registerPeerRPCRouter - creates and registers Peer RPC server and its router.
|
||||
func registerPeerRPCRouter(router *mux.Router) {
|
||||
rpcServer, err := NewPeerRPCServer()
|
||||
logger.CriticalIf(context.Background(), err)
|
||||
subrouter := router.PathPrefix(minioReservedBucketPath).Subrouter()
|
||||
subrouter.Path(peerServiceSubPath).Handler(rpcServer)
|
||||
}
|
||||
Reference in New Issue
Block a user