2018-06-06 14:21:56 +05:30
/ *
* Minio Cloud Storage , ( C ) 2018 Minio , Inc .
*
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
* /
package cmd
import (
"context"
"fmt"
"path"
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/logger"
xrpc "github.com/minio/minio/cmd/rpc"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/policy"
)
const peerServiceName = "Peer"
const peerServiceSubPath = "/s3/remote"
var peerServicePath = path . Join ( minioReservedBucketPath , peerServiceSubPath )
// peerRPCReceiver - Peer RPC receiver for peer RPC server.
type peerRPCReceiver struct { }
// DeleteBucketArgs - delete bucket RPC arguments.
type DeleteBucketArgs struct {
AuthArgs
BucketName string
}
// DeleteBucket - handles delete bucket RPC call which removes all values of given bucket in global NotificationSys object.
func ( receiver * peerRPCReceiver ) DeleteBucket ( args * DeleteBucketArgs , reply * VoidReply ) error {
globalNotificationSys . RemoveNotification ( args . BucketName )
globalPolicySys . Remove ( args . BucketName )
return nil
}
// SetBucketPolicyArgs - set bucket policy RPC arguments.
type SetBucketPolicyArgs struct {
AuthArgs
BucketName string
Policy policy . Policy
}
// SetBucketPolicy - handles set bucket policy RPC call which adds bucket policy to globalPolicySys.
func ( receiver * peerRPCReceiver ) SetBucketPolicy ( args * SetBucketPolicyArgs , reply * VoidReply ) error {
globalPolicySys . Set ( args . BucketName , args . Policy )
return nil
}
// RemoveBucketPolicyArgs - delete bucket policy RPC arguments.
type RemoveBucketPolicyArgs struct {
AuthArgs
BucketName string
}
// RemoveBucketPolicy - handles delete bucket policy RPC call which removes bucket policy to globalPolicySys.
func ( receiver * peerRPCReceiver ) RemoveBucketPolicy ( args * RemoveBucketPolicyArgs , reply * VoidReply ) error {
globalPolicySys . Remove ( args . BucketName )
return nil
}
// PutBucketNotificationArgs - put bucket notification RPC arguments.
type PutBucketNotificationArgs struct {
AuthArgs
BucketName string
RulesMap event . RulesMap
}
// PutBucketNotification - handles put bucket notification RPC call which adds rules to given bucket to global NotificationSys object.
func ( receiver * peerRPCReceiver ) PutBucketNotification ( args * PutBucketNotificationArgs , reply * VoidReply ) error {
globalNotificationSys . AddRulesMap ( args . BucketName , args . RulesMap )
return nil
}
// ListenBucketNotificationArgs - listen bucket notification RPC arguments.
type ListenBucketNotificationArgs struct {
AuthArgs ` json:"-" `
BucketName string ` json:"-" `
EventNames [ ] event . Name ` json:"eventNames" `
Pattern string ` json:"pattern" `
TargetID event . TargetID ` json:"targetId" `
Addr xnet . Host ` json:"addr" `
}
// ListenBucketNotification - handles listen bucket notification RPC call. It creates PeerRPCClient target which pushes requested events to target in remote peer.
func ( receiver * peerRPCReceiver ) ListenBucketNotification ( args * ListenBucketNotificationArgs , reply * VoidReply ) error {
rpcClient := globalNotificationSys . GetPeerRPCClient ( args . Addr )
if rpcClient == nil {
return fmt . Errorf ( "unable to find PeerRPCClient for provided address %v. This happens only if remote and this minio run with different set of endpoints" , args . Addr )
}
target := NewPeerRPCClientTarget ( args . BucketName , args . TargetID , rpcClient )
rulesMap := event . NewRulesMap ( args . EventNames , args . Pattern , target . ID ( ) )
if err := globalNotificationSys . AddRemoteTarget ( args . BucketName , target , rulesMap ) ; err != nil {
reqInfo := & logger . ReqInfo { BucketName : target . bucketName }
reqInfo . AppendTags ( "target" , target . id . Name )
ctx := logger . SetReqInfo ( context . Background ( ) , reqInfo )
logger . LogIf ( ctx , err )
return err
}
return nil
}
// RemoteTargetExistArgs - remote target ID exist RPC arguments.
type RemoteTargetExistArgs struct {
AuthArgs
BucketName string
TargetID event . TargetID
}
// RemoteTargetExist - handles target ID exist RPC call which checks whether given target ID is a HTTP client target or not.
func ( receiver * peerRPCReceiver ) RemoteTargetExist ( args * RemoteTargetExistArgs , reply * bool ) error {
* reply = globalNotificationSys . RemoteTargetExist ( args . BucketName , args . TargetID )
return nil
}
// SendEventArgs - send event RPC arguments.
type SendEventArgs struct {
AuthArgs
Event event . Event
TargetID event . TargetID
BucketName string
}
// SendEvent - handles send event RPC call which sends given event to target by given target ID.
func ( receiver * peerRPCReceiver ) SendEvent ( args * SendEventArgs , reply * bool ) error {
// Set default to true to keep the target.
* reply = true
errs := globalNotificationSys . send ( args . BucketName , args . Event , args . TargetID )
for i := range errs {
reqInfo := ( & logger . ReqInfo { } ) . AppendTags ( "Event" , args . Event . EventName . String ( ) )
reqInfo . AppendTags ( "targetName" , args . TargetID . Name )
ctx := logger . SetReqInfo ( context . Background ( ) , reqInfo )
logger . LogIf ( ctx , errs [ i ] . Err )
* reply = false // send failed i.e. do not keep the target.
return errs [ i ] . Err
}
return nil
}
// SetCredentialsArgs - set credentials RPC arguments.
type SetCredentialsArgs struct {
AuthArgs
Credentials auth . Credentials
}
// SetCredentials - handles set credentials RPC call.
func ( receiver * peerRPCReceiver ) SetCredentials ( args * SetCredentialsArgs , reply * VoidReply ) error {
if ! args . Credentials . IsValid ( ) {
return fmt . Errorf ( "invalid credentials passed" )
}
// Acquire lock before updating global configuration.
globalServerConfigMu . Lock ( )
defer globalServerConfigMu . Unlock ( )
// Update credentials in memory
prevCred := globalServerConfig . SetCredential ( args . Credentials )
// Save credentials to config file
2018-05-16 06:50:22 +05:30
if err := globalServerConfig . Save ( getConfigFile ( ) ) ; err != nil {
2018-06-06 14:21:56 +05:30
// As saving configurstion failed, restore previous credential in memory.
globalServerConfig . SetCredential ( prevCred )
logger . LogIf ( context . Background ( ) , err )
return err
}
return nil
}
// NewPeerRPCServer - returns new peer RPC server.
func NewPeerRPCServer ( ) ( * xrpc . Server , error ) {
rpcServer := xrpc . NewServer ( )
if err := rpcServer . RegisterName ( peerServiceName , & peerRPCReceiver { } ) ; err != nil {
return nil , err
}
return rpcServer , nil
}
// registerPeerRPCRouter - creates and registers Peer RPC server and its router.
func registerPeerRPCRouter ( router * mux . Router ) {
rpcServer , err := NewPeerRPCServer ( )
2018-06-14 10:17:07 -07:00
logger . FatalIf ( err , "Unable to initialize peer RPC Server" , context . Background ( ) )
2018-06-06 14:21:56 +05:30
subrouter := router . PathPrefix ( minioReservedBucketPath ) . Subrouter ( )
subrouter . Path ( peerServiceSubPath ) . Handler ( rpcServer )
}