/*
 * MinIO Cloud Storage, (C) 2018 MinIO, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package target

import (
	"encoding/json"
	"errors"
	"net/url"

	"github.com/minio/minio/pkg/event"
	xnet "github.com/minio/minio/pkg/net"
	"github.com/nats-io/nats.go"
	"github.com/nats-io/stan.go"
)

// NATSArgs - NATS target arguments.
type NATSArgs struct {
	Enable       bool      `json:"enable"`
	Address      xnet.Host `json:"address"`
	Subject      string    `json:"subject"`
	Username     string    `json:"username"`
	Password     string    `json:"password"`
	Token        string    `json:"token"`
	Secure       bool      `json:"secure"`
	PingInterval int64     `json:"pingInterval"`
	Streaming    struct {
		Enable             bool   `json:"enable"`
		ClusterID          string `json:"clusterID"`
		Async              bool   `json:"async"`
		MaxPubAcksInflight int    `json:"maxPubAcksInflight"`
	} `json:"streaming"`
}

// Validate NATSArgs fields
func (n NATSArgs) Validate() error {
	if !n.Enable {
		return nil
	}

	if n.Address.IsEmpty() {
		return errors.New("empty address")
	}

	if n.Subject == "" {
		return errors.New("empty subject")
	}

	if n.Streaming.Enable {
		if n.Streaming.ClusterID == "" {
			return errors.New("empty cluster id")
		}
	}

	return nil
}

// NATSTarget - NATS target.
type NATSTarget struct {
	id       event.TargetID
	args     NATSArgs
	natsConn *nats.Conn
	stanConn stan.Conn
}

// ID - returns target ID.
func (target *NATSTarget) ID() event.TargetID {
	return target.id
}

// Save - Sends event directly without persisting.
func (target *NATSTarget) Save(eventData event.Event) error {
	return target.send(eventData)
}

func (target *NATSTarget) send(eventData event.Event) error {
	objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
	if err != nil {
		return err
	}
	key := eventData.S3.Bucket.Name + "/" + objectName

	data, err := json.Marshal(event.Log{EventName: eventData.EventName, Key: key, Records: []event.Event{eventData}})
	if err != nil {
		return err
	}

	if target.stanConn != nil {
		if target.args.Streaming.Async {
			_, err = target.stanConn.PublishAsync(target.args.Subject, data, nil)
		} else {
			err = target.stanConn.Publish(target.args.Subject, data)
		}
	} else {
		err = target.natsConn.Publish(target.args.Subject, data)
	}

	return err
}

// Send - interface compatible method does no-op.
func (target *NATSTarget) Send(eventKey string) error {
	return nil
}

// Close - closes underneath connections to NATS server.
func (target *NATSTarget) Close() (err error) {
	if target.stanConn != nil {
		err = target.stanConn.Close()
	}

	if target.natsConn != nil {
		target.natsConn.Close()
	}

	return err
}

// NewNATSTarget - creates new NATS target.
func NewNATSTarget(id string, args NATSArgs) (*NATSTarget, error) {
	var natsConn *nats.Conn
	var stanConn stan.Conn
	var clientID string
	var err error

	if args.Streaming.Enable {
		scheme := "nats"
		if args.Secure {
			scheme = "tls"
		}
		addressURL := scheme + "://" + args.Username + ":" + args.Password + "@" + args.Address.String()

		clientID, err = getNewUUID()
		if err != nil {
			return nil, err
		}

		connOpts := []stan.Option{stan.NatsURL(addressURL)}
		if args.Streaming.MaxPubAcksInflight > 0 {
			connOpts = append(connOpts, stan.MaxPubAcksInflight(args.Streaming.MaxPubAcksInflight))
		}

		stanConn, err = stan.Connect(args.Streaming.ClusterID, clientID, connOpts...)
	} else {
		options := nats.DefaultOptions
		options.Url = "nats://" + args.Address.String()
		options.User = args.Username
		options.Password = args.Password
		options.Token = args.Token
		options.Secure = args.Secure
		natsConn, err = options.Connect()
	}
	if err != nil {
		return nil, err
	}

	return &NATSTarget{
		id:       event.TargetID{ID: id, Name: "nats"},
		args:     args,
		stanConn: stanConn,
		natsConn: natsConn,
	}, nil
}