// Copyright 2016 Apcera Inc. All rights reserved.

// Package stan is a Go client for the NATS Streaming messaging system (https://nats.io).
package stan

import (
	"errors"
	"sync"
	"time"

	"github.com/nats-io/go-nats"
	"github.com/nats-io/go-nats-streaming/pb"
)

const (
	// DefaultAckWait indicates how long the server should wait for an ACK before resending a message
	DefaultAckWait = 30 * time.Second
	// DefaultMaxInflight indicates how many messages with outstanding ACKs the server can send
	DefaultMaxInflight = 1024
)

// Msg is the client defined message, which includes proto, then back link to subscription.
type Msg struct {
	pb.MsgProto // MsgProto: Seq, Subject, Reply[opt], Data, Timestamp, CRC32[opt]
	Sub         Subscription
}

// Subscriptions and Options

// Subscription represents a subscription within the NATS Streaming cluster. Subscriptions
// will be rate matched and follow at-least delivery semantics.
type Subscription interface {
	ClearMaxPending() error
	Delivered() (int64, error)
	Dropped() (int, error)
	IsValid() bool
	MaxPending() (int, int, error)
	Pending() (int, int, error)
	PendingLimits() (int, int, error)
	SetPendingLimits(msgLimit, bytesLimit int) error
	// Unsubscribe removes interest in the subscription.
	// For durables, it means that the durable interest is also removed from
	// the server. Restarting a durable with the same name will not resume
	// the subscription, it will be considered a new one.
	Unsubscribe() error

	// Close removes this subscriber from the server, but unlike Unsubscribe(),
	// the durable interest is not removed. If the client has connected to a server
	// for which this feature is not available, Close() will return a ErrNoServerSupport
	// error.
	Close() error
}

// A subscription represents a subscription to a stan cluster.
type subscription struct {
	sync.RWMutex
	sc       *conn
	subject  string
	qgroup   string
	inbox    string
	ackInbox string
	inboxSub *nats.Subscription
	opts     SubscriptionOptions
	cb       MsgHandler
}

// SubscriptionOption is a function on the options for a subscription.
type SubscriptionOption func(*SubscriptionOptions) error

// MsgHandler is a callback function that processes messages delivered to
// asynchronous subscribers.
type MsgHandler func(msg *Msg)

// SubscriptionOptions are used to control the Subscription's behavior.
type SubscriptionOptions struct {
	// DurableName, if set will survive client restarts.
	DurableName string
	// Controls the number of messages the cluster will have inflight without an ACK.
	MaxInflight int
	// Controls the time the cluster will wait for an ACK for a given message.
	AckWait time.Duration
	// StartPosition enum from proto.
	StartAt pb.StartPosition
	// Optional start sequence number.
	StartSequence uint64
	// Optional start time.
	StartTime time.Time
	// Option to do Manual Acks
	ManualAcks bool
}

// DefaultSubscriptionOptions are the default subscriptions' options
var DefaultSubscriptionOptions = SubscriptionOptions{
	MaxInflight: DefaultMaxInflight,
	AckWait:     DefaultAckWait,
}

// MaxInflight is an Option to set the maximum number of messages the cluster will send
// without an ACK.
func MaxInflight(m int) SubscriptionOption {
	return func(o *SubscriptionOptions) error {
		o.MaxInflight = m
		return nil
	}
}

// AckWait is an Option to set the timeout for waiting for an ACK from the cluster's
// point of view for delivered messages.
func AckWait(t time.Duration) SubscriptionOption {
	return func(o *SubscriptionOptions) error {
		o.AckWait = t
		return nil
	}
}

// StartAt sets the desired start position for the message stream.
func StartAt(sp pb.StartPosition) SubscriptionOption {
	return func(o *SubscriptionOptions) error {
		o.StartAt = sp
		return nil
	}
}

// StartAtSequence sets the desired start sequence position and state.
func StartAtSequence(seq uint64) SubscriptionOption {
	return func(o *SubscriptionOptions) error {
		o.StartAt = pb.StartPosition_SequenceStart
		o.StartSequence = seq
		return nil
	}
}

// StartAtTime sets the desired start time position and state.
func StartAtTime(start time.Time) SubscriptionOption {
	return func(o *SubscriptionOptions) error {
		o.StartAt = pb.StartPosition_TimeDeltaStart
		o.StartTime = start
		return nil
	}
}

// StartAtTimeDelta sets the desired start time position and state using the delta.
func StartAtTimeDelta(ago time.Duration) SubscriptionOption {
	return func(o *SubscriptionOptions) error {
		o.StartAt = pb.StartPosition_TimeDeltaStart
		o.StartTime = time.Now().Add(-ago)
		return nil
	}
}

// StartWithLastReceived is a helper function to set start position to last received.
func StartWithLastReceived() SubscriptionOption {
	return func(o *SubscriptionOptions) error {
		o.StartAt = pb.StartPosition_LastReceived
		return nil
	}
}

// DeliverAllAvailable will deliver all messages available.
func DeliverAllAvailable() SubscriptionOption {
	return func(o *SubscriptionOptions) error {
		o.StartAt = pb.StartPosition_First
		return nil
	}
}

// SetManualAckMode will allow clients to control their own acks to delivered messages.
func SetManualAckMode() SubscriptionOption {
	return func(o *SubscriptionOptions) error {
		o.ManualAcks = true
		return nil
	}
}

// DurableName sets the DurableName for the subcriber.
func DurableName(name string) SubscriptionOption {
	return func(o *SubscriptionOptions) error {
		o.DurableName = name
		return nil
	}
}

// Subscribe will perform a subscription with the given options to the NATS Streaming cluster.
func (sc *conn) Subscribe(subject string, cb MsgHandler, options ...SubscriptionOption) (Subscription, error) {
	return sc.subscribe(subject, "", cb, options...)
}

// QueueSubscribe will perform a queue subscription with the given options to the NATS Streaming cluster.
func (sc *conn) QueueSubscribe(subject, qgroup string, cb MsgHandler, options ...SubscriptionOption) (Subscription, error) {
	return sc.subscribe(subject, qgroup, cb, options...)
}

// subscribe will perform a subscription with the given options to the NATS Streaming cluster.
func (sc *conn) subscribe(subject, qgroup string, cb MsgHandler, options ...SubscriptionOption) (Subscription, error) {
	sub := &subscription{subject: subject, qgroup: qgroup, inbox: nats.NewInbox(), cb: cb, sc: sc, opts: DefaultSubscriptionOptions}
	for _, opt := range options {
		if err := opt(&sub.opts); err != nil {
			return nil, err
		}
	}
	sc.Lock()
	if sc.nc == nil {
		sc.Unlock()
		return nil, ErrConnectionClosed
	}

	// Register subscription.
	sc.subMap[sub.inbox] = sub
	nc := sc.nc
	sc.Unlock()

	// Hold lock throughout.
	sub.Lock()
	defer sub.Unlock()

	// Listen for actual messages.
	nsub, err := nc.Subscribe(sub.inbox, sc.processMsg)
	if err != nil {
		return nil, err
	}
	sub.inboxSub = nsub

	// Create a subscription request
	// FIXME(dlc) add others.
	sr := &pb.SubscriptionRequest{
		ClientID:      sc.clientID,
		Subject:       subject,
		QGroup:        qgroup,
		Inbox:         sub.inbox,
		MaxInFlight:   int32(sub.opts.MaxInflight),
		AckWaitInSecs: int32(sub.opts.AckWait / time.Second),
		StartPosition: sub.opts.StartAt,
		DurableName:   sub.opts.DurableName,
	}

	// Conditionals
	switch sr.StartPosition {
	case pb.StartPosition_TimeDeltaStart:
		sr.StartTimeDelta = time.Now().UnixNano() - sub.opts.StartTime.UnixNano()
	case pb.StartPosition_SequenceStart:
		sr.StartSequence = sub.opts.StartSequence
	}

	b, _ := sr.Marshal()
	reply, err := sc.nc.Request(sc.subRequests, b, sc.opts.ConnectTimeout)
	if err != nil {
		sub.inboxSub.Unsubscribe()
		if err == nats.ErrTimeout {
			err = ErrSubReqTimeout
		}
		return nil, err
	}
	r := &pb.SubscriptionResponse{}
	if err := r.Unmarshal(reply.Data); err != nil {
		sub.inboxSub.Unsubscribe()
		return nil, err
	}
	if r.Error != "" {
		sub.inboxSub.Unsubscribe()
		return nil, errors.New(r.Error)
	}
	sub.ackInbox = r.AckInbox

	return sub, nil
}

// ClearMaxPending resets the maximums seen so far.
func (sub *subscription) ClearMaxPending() error {
	sub.Lock()
	defer sub.Unlock()
	if sub.inboxSub == nil {
		return ErrBadSubscription
	}
	return sub.inboxSub.ClearMaxPending()
}

// Delivered returns the number of delivered messages for this subscription.
func (sub *subscription) Delivered() (int64, error) {
	sub.Lock()
	defer sub.Unlock()
	if sub.inboxSub == nil {
		return -1, ErrBadSubscription
	}
	return sub.inboxSub.Delivered()
}

// Dropped returns the number of known dropped messages for this subscription.
// This will correspond to messages dropped by violations of PendingLimits. If
// the server declares the connection a SlowConsumer, this number may not be
// valid.
func (sub *subscription) Dropped() (int, error) {
	sub.Lock()
	defer sub.Unlock()
	if sub.inboxSub == nil {
		return -1, ErrBadSubscription
	}
	return sub.inboxSub.Dropped()
}

// IsValid returns a boolean indicating whether the subscription
// is still active. This will return false if the subscription has
// already been closed.
func (sub *subscription) IsValid() bool {
	sub.Lock()
	defer sub.Unlock()
	if sub.inboxSub == nil {
		return false
	}
	return sub.inboxSub.IsValid()
}

// MaxPending returns the maximum number of queued messages and queued bytes seen so far.
func (sub *subscription) MaxPending() (int, int, error) {
	sub.Lock()
	defer sub.Unlock()
	if sub.inboxSub == nil {
		return -1, -1, ErrBadSubscription
	}
	return sub.inboxSub.MaxPending()
}

// Pending returns the number of queued messages and queued bytes in the client for this subscription.
func (sub *subscription) Pending() (int, int, error) {
	sub.Lock()
	defer sub.Unlock()
	if sub.inboxSub == nil {
		return -1, -1, ErrBadSubscription
	}
	return sub.inboxSub.Pending()
}

// PendingLimits returns the current limits for this subscription.
// If no error is returned, a negative value indicates that the
// given metric is not limited.
func (sub *subscription) PendingLimits() (int, int, error) {
	sub.Lock()
	defer sub.Unlock()
	if sub.inboxSub == nil {
		return -1, -1, ErrBadSubscription
	}
	return sub.inboxSub.PendingLimits()
}

// SetPendingLimits sets the limits for pending msgs and bytes for this subscription.
// Zero is not allowed. Any negative value means that the given metric is not limited.
func (sub *subscription) SetPendingLimits(msgLimit, bytesLimit int) error {
	sub.Lock()
	defer sub.Unlock()
	if sub.inboxSub == nil {
		return ErrBadSubscription
	}
	return sub.inboxSub.SetPendingLimits(msgLimit, bytesLimit)
}

// closeOrUnsubscribe performs either close or unsubsribe based on
// given boolean.
func (sub *subscription) closeOrUnsubscribe(doClose bool) error {
	if sub == nil {
		return ErrBadSubscription
	}
	sub.Lock()
	sc := sub.sc
	if sc == nil {
		// Already closed.
		sub.Unlock()
		return ErrBadSubscription
	}
	sub.sc = nil
	sub.inboxSub.Unsubscribe()
	sub.inboxSub = nil
	sub.Unlock()

	if sc == nil {
		return ErrBadSubscription
	}

	sc.Lock()
	if sc.nc == nil {
		sc.Unlock()
		return ErrConnectionClosed
	}

	delete(sc.subMap, sub.inbox)
	reqSubject := sc.unsubRequests
	if doClose {
		reqSubject = sc.subCloseRequests
		if reqSubject == "" {
			sc.Unlock()
			return ErrNoServerSupport
		}
	}

	// Snapshot connection to avoid data race, since the connection may be
	// closing while we try to send the request
	nc := sc.nc
	sc.Unlock()

	usr := &pb.UnsubscribeRequest{
		ClientID: sc.clientID,
		Subject:  sub.subject,
		Inbox:    sub.ackInbox,
	}
	b, _ := usr.Marshal()
	reply, err := nc.Request(reqSubject, b, sc.opts.ConnectTimeout)
	if err != nil {
		if err == nats.ErrTimeout {
			if doClose {
				return ErrCloseReqTimeout
			}
			return ErrUnsubReqTimeout
		}
		return err
	}
	r := &pb.SubscriptionResponse{}
	if err := r.Unmarshal(reply.Data); err != nil {
		return err
	}
	if r.Error != "" {
		return errors.New(r.Error)
	}

	return nil
}

// Unsubscribe implements the Subscription interface
func (sub *subscription) Unsubscribe() error {
	return sub.closeOrUnsubscribe(false)
}

// Close implements the Subscription interface
func (sub *subscription) Close() error {
	return sub.closeOrUnsubscribe(true)
}

// Ack manually acknowledges a message.
// The subscriber had to be created with SetManualAckMode() option.
func (msg *Msg) Ack() error {
	if msg == nil {
		return ErrNilMsg
	}
	// Look up subscription
	sub := msg.Sub.(*subscription)
	if sub == nil {
		return ErrBadSubscription
	}

	sub.RLock()
	ackSubject := sub.ackInbox
	isManualAck := sub.opts.ManualAcks
	sc := sub.sc
	sub.RUnlock()

	// Check for error conditions.
	if sc == nil {
		return ErrBadSubscription
	}
	// Get nc from the connection (needs locking to avoid race)
	sc.RLock()
	nc := sc.nc
	sc.RUnlock()
	if nc == nil {
		return ErrBadConnection
	}
	if !isManualAck {
		return ErrManualAck
	}

	// Ack here.
	ack := &pb.Ack{Subject: msg.Subject, Sequence: msg.Sequence}
	b, _ := ack.Marshal()
	return nc.Publish(ackSubject, b)
}