/*
 * Minio Cloud Storage, (C) 2018 Minio, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package cmd

import (
	"context"
	"fmt"
	"path"

	"github.com/gorilla/mux"
	xerrors "github.com/minio/minio/pkg/errors"
	"github.com/minio/minio/pkg/event"
	xnet "github.com/minio/minio/pkg/net"
)

const s3Path = "/s3/remote"

// PeerRPCReceiver - Peer RPC receiver for peer RPC server.
type PeerRPCReceiver struct {
	AuthRPCServer
}

// DeleteBucketArgs - delete bucket RPC arguments.
type DeleteBucketArgs struct {
	AuthRPCArgs
	BucketName string
}

// DeleteBucket - handles delete bucket RPC call which removes all values of given bucket in global NotificationSys object.
func (receiver *PeerRPCReceiver) DeleteBucket(args *DeleteBucketArgs, reply *AuthRPCArgs) error {
	globalNotificationSys.RemoveNotification(args.BucketName)
	return nil
}

// UpdateBucketPolicyArgs - update bucket policy RPC arguments.
type UpdateBucketPolicyArgs struct {
	AuthRPCArgs
	BucketName string
}

// UpdateBucketPolicy - handles update bucket policy RPC call which sets bucket policies to given bucket in global BucketPolicies object.
func (receiver *PeerRPCReceiver) UpdateBucketPolicy(args *UpdateBucketPolicyArgs, reply *AuthRPCArgs) error {
	objectAPI := newObjectLayerFn()
	if objectAPI == nil {
		// If the object layer is just coming up then it will load the policy from the disk.
		return nil
	}
	return objectAPI.RefreshBucketPolicy(context.Background(), args.BucketName)
}

// PutBucketNotificationArgs - put bucket notification RPC arguments.
type PutBucketNotificationArgs struct {
	AuthRPCArgs
	BucketName string
	RulesMap   event.RulesMap
}

// PutBucketNotification - handles put bucket notification RPC call which adds rules to given bucket to global NotificationSys object.
func (receiver *PeerRPCReceiver) PutBucketNotification(args *PutBucketNotificationArgs, reply *AuthRPCReply) error {
	if err := args.IsAuthenticated(); err != nil {
		return err
	}

	globalNotificationSys.AddRulesMap(args.BucketName, args.RulesMap)
	return nil
}

// ListenBucketNotificationArgs - listen bucket notification RPC arguments.
type ListenBucketNotificationArgs struct {
	AuthRPCArgs `json:"-"`
	BucketName  string         `json:"-"`
	EventNames  []event.Name   `json:"eventNames"`
	Pattern     string         `json:"pattern"`
	TargetID    event.TargetID `json:"targetId"`
	Addr        xnet.Host      `json:"addr"`
}

// ListenBucketNotification - handles listen bucket notification RPC call. It creates PeerRPCClient target which pushes requested events to target in remote peer.
func (receiver *PeerRPCReceiver) ListenBucketNotification(args *ListenBucketNotificationArgs, reply *AuthRPCReply) error {
	if err := args.IsAuthenticated(); err != nil {
		return err
	}

	rpcClient := globalNotificationSys.GetPeerRPCClient(args.Addr)
	if rpcClient == nil {
		return fmt.Errorf("unable to find PeerRPCClient for provided address %v. This happens only if remote and this minio run with different set of endpoints", args.Addr)
	}

	target := NewPeerRPCClientTarget(args.BucketName, args.TargetID, rpcClient)
	rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID())
	if err := globalNotificationSys.AddRemoteTarget(args.BucketName, target, rulesMap); err != nil {
		errorIf(err, "Unable to add PeerRPCClientTarget %v to globalNotificationSys.targetList.", target)
		return err
	}
	return nil
}

// RemoteTargetExistArgs - remote target ID exist RPC arguments.
type RemoteTargetExistArgs struct {
	AuthRPCArgs
	BucketName string
	TargetID   event.TargetID
}

// RemoteTargetExistReply - remote target ID exist RPC reply.
type RemoteTargetExistReply struct {
	AuthRPCReply
	Exist bool
}

// RemoteTargetExist - handles target ID exist RPC call which checks whether given target ID is a HTTP client target or not.
func (receiver *PeerRPCReceiver) RemoteTargetExist(args *RemoteTargetExistArgs, reply *RemoteTargetExistReply) error {
	reply.Exist = globalNotificationSys.RemoteTargetExist(args.BucketName, args.TargetID)
	return nil
}

// SendEventArgs - send event RPC arguments.
type SendEventArgs struct {
	AuthRPCArgs
	Event      event.Event
	TargetID   event.TargetID
	BucketName string
}

// SendEventReply - send event RPC reply.
type SendEventReply struct {
	AuthRPCReply
	Error error
}

// SendEvent - handles send event RPC call which sends given event to target by given target ID.
func (receiver *PeerRPCReceiver) SendEvent(args *SendEventArgs, reply *SendEventReply) error {
	if err := args.IsAuthenticated(); err != nil {
		return err
	}

	var err error
	if errMap := globalNotificationSys.send(args.BucketName, args.Event, args.TargetID); len(errMap) != 0 {
		var found bool
		if err, found = errMap[args.TargetID]; !found {
			// errMap must be zero or one element map because we sent to only one target ID.
			panic(fmt.Errorf("error for target %v not found in error map %+v", args.TargetID, errMap))
		}
	}

	if err != nil {
		errorIf(err, "unable to send event %v to target %v", args.Event, args.TargetID)
	}

	reply.Error = err
	return nil
}

// registerS3PeerRPCRouter - creates and registers Peer RPC server and its router.
func registerS3PeerRPCRouter(router *mux.Router) error {
	peerRPCServer := newRPCServer()
	if err := peerRPCServer.RegisterName("Peer", &PeerRPCReceiver{}); err != nil {
		return xerrors.Trace(err)
	}

	subrouter := router.NewRoute().PathPrefix(minioReservedBucketPath).Subrouter()
	subrouter.Path(s3Path).Handler(peerRPCServer)
	return nil
}

// PeerRPCClient - peer RPC client talks to peer RPC server.
type PeerRPCClient struct {
	*AuthRPCClient
}

// DeleteBucket - calls delete bucket RPC.
func (rpcClient *PeerRPCClient) DeleteBucket(bucketName string) error {
	args := DeleteBucketArgs{BucketName: bucketName}
	reply := AuthRPCReply{}
	return rpcClient.Call("Peer.DeleteBucket", &args, &reply)
}

// UpdateBucketPolicy - calls update bucket policy RPC.
func (rpcClient *PeerRPCClient) UpdateBucketPolicy(bucketName string) error {
	args := UpdateBucketPolicyArgs{
		BucketName: bucketName,
	}
	reply := AuthRPCReply{}
	return rpcClient.Call("Peer.UpdateBucketPolicy", &args, &reply)
}

// PutBucketNotification - calls put bukcet notification RPC.
func (rpcClient *PeerRPCClient) PutBucketNotification(bucketName string, rulesMap event.RulesMap) error {
	args := PutBucketNotificationArgs{
		BucketName: bucketName,
		RulesMap:   rulesMap,
	}
	reply := AuthRPCReply{}
	return rpcClient.Call("Peer.PutBucketNotification", &args, &reply)
}

// ListenBucketNotification - calls listen bucket notification RPC.
func (rpcClient *PeerRPCClient) ListenBucketNotification(bucketName string, eventNames []event.Name,
	pattern string, targetID event.TargetID, addr xnet.Host) error {
	args := ListenBucketNotificationArgs{
		BucketName: bucketName,
		EventNames: eventNames,
		Pattern:    pattern,
		TargetID:   targetID,
		Addr:       addr,
	}
	reply := AuthRPCReply{}
	return rpcClient.Call("Peer.ListenBucketNotification", &args, &reply)
}

// RemoteTargetExist - calls remote target ID exist RPC.
func (rpcClient *PeerRPCClient) RemoteTargetExist(bucketName string, targetID event.TargetID) (bool, error) {
	args := RemoteTargetExistArgs{
		BucketName: bucketName,
		TargetID:   targetID,
	}

	reply := RemoteTargetExistReply{}
	if err := rpcClient.Call("Peer.RemoteTargetExist", &args, &reply); err != nil {
		return false, err
	}

	return reply.Exist, nil
}

// SendEvent - calls send event RPC.
func (rpcClient *PeerRPCClient) SendEvent(bucketName string, targetID, remoteTargetID event.TargetID, eventData event.Event) error {
	args := SendEventArgs{
		BucketName: bucketName,
		TargetID:   remoteTargetID,
		Event:      eventData,
	}
	reply := SendEventReply{}
	if err := rpcClient.Call("Peer.SendEvent", &args, &reply); err != nil {
		return err
	}

	if reply.Error != nil {
		errorIf(reply.Error, "unable to send event %v to rpc target %v of bucket %v", args, targetID, bucketName)
		globalNotificationSys.RemoveRemoteTarget(bucketName, targetID)
	}

	return reply.Error
}

// makeRemoteRPCClients - creates Peer RPCClients for given endpoint list.
func makeRemoteRPCClients(endpoints EndpointList) map[xnet.Host]*PeerRPCClient {
	peerRPCClientMap := make(map[xnet.Host]*PeerRPCClient)

	cred := globalServerConfig.GetCredential()
	serviceEndpoint := path.Join(minioReservedBucketPath, s3Path)
	for _, hostStr := range GetRemotePeers(endpoints) {
		host := xnet.MustParseHost(hostStr)
		peerRPCClientMap[*host] = &PeerRPCClient{newAuthRPCClient(authConfig{
			accessKey:       cred.AccessKey,
			secretKey:       cred.SecretKey,
			serverAddr:      hostStr,
			serviceEndpoint: serviceEndpoint,
			secureConn:      globalIsSSL,
			serviceName:     "Peer",
		})}
	}

	return peerRPCClientMap
}

// PeerRPCClientTarget - RPCClient is an event.Target which sends event to target of remote peer.
type PeerRPCClientTarget struct {
	id             event.TargetID
	remoteTargetID event.TargetID
	rpcClient      *PeerRPCClient
	bucketName     string
}

// ID - returns target ID.
func (target *PeerRPCClientTarget) ID() event.TargetID {
	return target.id
}

// Send - sends event to remote peer by making RPC call.
func (target *PeerRPCClientTarget) Send(eventData event.Event) error {
	return target.rpcClient.SendEvent(target.bucketName, target.id, target.remoteTargetID, eventData)
}

// Close - does nothing and available for interface compatibility.
func (target *PeerRPCClientTarget) Close() error {
	return nil
}

// NewPeerRPCClientTarget - creates RPCClient target with given target ID available in remote peer.
func NewPeerRPCClientTarget(bucketName string, targetID event.TargetID, rpcClient *PeerRPCClient) *PeerRPCClientTarget {
	return &PeerRPCClientTarget{
		id:             event.TargetID{targetID.ID, targetID.Name + "+" + mustGetUUID()},
		remoteTargetID: targetID,
		bucketName:     bucketName,
		rpcClient:      rpcClient,
	}
}