// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

package elastic

import (
	"sync"
	"sync/atomic"
	"time"

	"golang.org/x/net/context"
)

// BulkProcessorService allows to easily process bulk requests. It allows setting
// policies when to flush new bulk requests, e.g. based on a number of actions,
// on the size of the actions, and/or to flush periodically. It also allows
// to control the number of concurrent bulk requests allowed to be executed
// in parallel.
//
// BulkProcessorService, by default, commits either every 1000 requests or when the
// (estimated) size of the bulk requests exceeds 5 MB. However, it does not
// commit periodically. BulkProcessorService also does retry by default, using
// an exponential backoff algorithm.
//
// The caller is responsible for setting the index and type on every
// bulk request added to BulkProcessorService.
//
// BulkProcessorService takes ideas from the BulkProcessor of the
// Elasticsearch Java API as documented in
// https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html.
type BulkProcessorService struct {
	c              *Client
	beforeFn       BulkBeforeFunc
	afterFn        BulkAfterFunc
	name           string        // name of processor
	numWorkers     int           // # of workers (>= 1)
	bulkActions    int           // # of requests after which to commit
	bulkSize       int           // # of bytes after which to commit
	flushInterval  time.Duration // periodic flush interval
	wantStats      bool          // indicates whether to gather statistics
	initialTimeout time.Duration // initial wait time before retry on errors
	maxTimeout     time.Duration // max time to wait for retry on errors
}

// NewBulkProcessorService creates a new BulkProcessorService.
func NewBulkProcessorService(client *Client) *BulkProcessorService {
	return &BulkProcessorService{
		c:              client,
		numWorkers:     1,
		bulkActions:    1000,
		bulkSize:       5 << 20, // 5 MB
		initialTimeout: time.Duration(200) * time.Millisecond,
		maxTimeout:     time.Duration(10000) * time.Millisecond,
	}
}

// BulkBeforeFunc defines the signature of callbacks that are executed
// before a commit to Elasticsearch.
type BulkBeforeFunc func(executionId int64, requests []BulkableRequest)

// BulkAfterFunc defines the signature of callbacks that are executed
// after a commit to Elasticsearch. The err parameter signals an error.
type BulkAfterFunc func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error)

// Before specifies a function to be executed before bulk requests get comitted
// to Elasticsearch.
func (s *BulkProcessorService) Before(fn BulkBeforeFunc) *BulkProcessorService {
	s.beforeFn = fn
	return s
}

// After specifies a function to be executed when bulk requests have been
// comitted to Elasticsearch. The After callback executes both when the
// commit was successful as well as on failures.
func (s *BulkProcessorService) After(fn BulkAfterFunc) *BulkProcessorService {
	s.afterFn = fn
	return s
}

// Name is an optional name to identify this bulk processor.
func (s *BulkProcessorService) Name(name string) *BulkProcessorService {
	s.name = name
	return s
}

// Workers is the number of concurrent workers allowed to be
// executed. Defaults to 1 and must be greater or equal to 1.
func (s *BulkProcessorService) Workers(num int) *BulkProcessorService {
	s.numWorkers = num
	return s
}

// BulkActions specifies when to flush based on the number of actions
// currently added. Defaults to 1000 and can be set to -1 to be disabled.
func (s *BulkProcessorService) BulkActions(bulkActions int) *BulkProcessorService {
	s.bulkActions = bulkActions
	return s
}

// BulkSize specifies when to flush based on the size (in bytes) of the actions
// currently added. Defaults to 5 MB and can be set to -1 to be disabled.
func (s *BulkProcessorService) BulkSize(bulkSize int) *BulkProcessorService {
	s.bulkSize = bulkSize
	return s
}

// FlushInterval specifies when to flush at the end of the given interval.
// This is disabled by default. If you want the bulk processor to
// operate completely asynchronously, set both BulkActions and BulkSize to
// -1 and set the FlushInterval to a meaningful interval.
func (s *BulkProcessorService) FlushInterval(interval time.Duration) *BulkProcessorService {
	s.flushInterval = interval
	return s
}

// Stats tells bulk processor to gather stats while running.
// Use Stats to return the stats. This is disabled by default.
func (s *BulkProcessorService) Stats(wantStats bool) *BulkProcessorService {
	s.wantStats = wantStats
	return s
}

// Do creates a new BulkProcessor and starts it.
// Consider the BulkProcessor as a running instance that accepts bulk requests
// and commits them to Elasticsearch, spreading the work across one or more
// workers.
//
// You can interoperate with the BulkProcessor returned by Do, e.g. Start and
// Stop (or Close) it.
//
// Context is an optional context that is passed into the bulk request
// service calls. In contrast to other operations, this context is used in
// a long running process. You could use it to pass e.g. loggers, but you
// shouldn't use it for cancellation.
//
// Calling Do several times returns new BulkProcessors. You probably don't
// want to do this. BulkProcessorService implements just a builder pattern.
func (s *BulkProcessorService) Do(ctx context.Context) (*BulkProcessor, error) {
	p := newBulkProcessor(
		s.c,
		s.beforeFn,
		s.afterFn,
		s.name,
		s.numWorkers,
		s.bulkActions,
		s.bulkSize,
		s.flushInterval,
		s.wantStats,
		s.initialTimeout,
		s.maxTimeout)

	err := p.Start(ctx)
	if err != nil {
		return nil, err
	}
	return p, nil
}

// -- Bulk Processor Statistics --

// BulkProcessorStats contains various statistics of a bulk processor
// while it is running. Use the Stats func to return it while running.
type BulkProcessorStats struct {
	Flushed   int64 // number of times the flush interval has been invoked
	Committed int64 // # of times workers committed bulk requests
	Indexed   int64 // # of requests indexed
	Created   int64 // # of requests that ES reported as creates (201)
	Updated   int64 // # of requests that ES reported as updates
	Deleted   int64 // # of requests that ES reported as deletes
	Succeeded int64 // # of requests that ES reported as successful
	Failed    int64 // # of requests that ES reported as failed

	Workers []*BulkProcessorWorkerStats // stats for each worker
}

// BulkProcessorWorkerStats represents per-worker statistics.
type BulkProcessorWorkerStats struct {
	Queued       int64         // # of requests queued in this worker
	LastDuration time.Duration // duration of last commit
}

// newBulkProcessorStats initializes and returns a BulkProcessorStats struct.
func newBulkProcessorStats(workers int) *BulkProcessorStats {
	stats := &BulkProcessorStats{
		Workers: make([]*BulkProcessorWorkerStats, workers),
	}
	for i := 0; i < workers; i++ {
		stats.Workers[i] = &BulkProcessorWorkerStats{}
	}
	return stats
}

func (st *BulkProcessorStats) dup() *BulkProcessorStats {
	dst := new(BulkProcessorStats)
	dst.Flushed = st.Flushed
	dst.Committed = st.Committed
	dst.Indexed = st.Indexed
	dst.Created = st.Created
	dst.Updated = st.Updated
	dst.Deleted = st.Deleted
	dst.Succeeded = st.Succeeded
	dst.Failed = st.Failed
	for _, src := range st.Workers {
		dst.Workers = append(dst.Workers, src.dup())
	}
	return dst
}

func (st *BulkProcessorWorkerStats) dup() *BulkProcessorWorkerStats {
	dst := new(BulkProcessorWorkerStats)
	dst.Queued = st.Queued
	dst.LastDuration = st.LastDuration
	return dst
}

// -- Bulk Processor --

// BulkProcessor encapsulates a task that accepts bulk requests and
// orchestrates committing them to Elasticsearch via one or more workers.
//
// BulkProcessor is returned by setting up a BulkProcessorService and
// calling the Do method.
type BulkProcessor struct {
	c              *Client
	beforeFn       BulkBeforeFunc
	afterFn        BulkAfterFunc
	name           string
	bulkActions    int
	bulkSize       int
	numWorkers     int
	executionId    int64
	requestsC      chan BulkableRequest
	workerWg       sync.WaitGroup
	workers        []*bulkWorker
	flushInterval  time.Duration
	flusherStopC   chan struct{}
	wantStats      bool
	initialTimeout time.Duration // initial wait time before retry on errors
	maxTimeout     time.Duration // max time to wait for retry on errors

	startedMu sync.Mutex // guards the following block
	started   bool

	statsMu sync.Mutex // guards the following block
	stats   *BulkProcessorStats
}

func newBulkProcessor(
	client *Client,
	beforeFn BulkBeforeFunc,
	afterFn BulkAfterFunc,
	name string,
	numWorkers int,
	bulkActions int,
	bulkSize int,
	flushInterval time.Duration,
	wantStats bool,
	initialTimeout time.Duration,
	maxTimeout time.Duration) *BulkProcessor {
	return &BulkProcessor{
		c:              client,
		beforeFn:       beforeFn,
		afterFn:        afterFn,
		name:           name,
		numWorkers:     numWorkers,
		bulkActions:    bulkActions,
		bulkSize:       bulkSize,
		flushInterval:  flushInterval,
		wantStats:      wantStats,
		initialTimeout: initialTimeout,
		maxTimeout:     maxTimeout,
	}
}

// Start starts the bulk processor. If the processor is already started,
// nil is returned.
func (p *BulkProcessor) Start(ctx context.Context) error {
	p.startedMu.Lock()
	defer p.startedMu.Unlock()

	if p.started {
		return nil
	}

	// We must have at least one worker.
	if p.numWorkers < 1 {
		p.numWorkers = 1
	}

	p.requestsC = make(chan BulkableRequest)
	p.executionId = 0
	p.stats = newBulkProcessorStats(p.numWorkers)

	// Create and start up workers.
	p.workers = make([]*bulkWorker, p.numWorkers)
	for i := 0; i < p.numWorkers; i++ {
		p.workerWg.Add(1)
		p.workers[i] = newBulkWorker(p, i)
		go p.workers[i].work(ctx)
	}

	// Start the ticker for flush (if enabled)
	if int64(p.flushInterval) > 0 {
		p.flusherStopC = make(chan struct{})
		go p.flusher(p.flushInterval)
	}

	p.started = true

	return nil
}

// Stop is an alias for Close.
func (p *BulkProcessor) Stop() error {
	return p.Close()
}

// Close stops the bulk processor previously started with Do.
// If it is already stopped, this is a no-op and nil is returned.
//
// By implementing Close, BulkProcessor implements the io.Closer interface.
func (p *BulkProcessor) Close() error {
	p.startedMu.Lock()
	defer p.startedMu.Unlock()

	// Already stopped? Do nothing.
	if !p.started {
		return nil
	}

	// Stop flusher (if enabled)
	if p.flusherStopC != nil {
		p.flusherStopC <- struct{}{}
		<-p.flusherStopC
		close(p.flusherStopC)
		p.flusherStopC = nil
	}

	// Stop all workers.
	close(p.requestsC)
	p.workerWg.Wait()

	p.started = false

	return nil
}

// Stats returns the latest bulk processor statistics.
// Collecting stats must be enabled first by calling Stats(true) on
// the service that created this processor.
func (p *BulkProcessor) Stats() BulkProcessorStats {
	p.statsMu.Lock()
	defer p.statsMu.Unlock()
	return *p.stats.dup()
}

// Add adds a single request to commit by the BulkProcessorService.
//
// The caller is responsible for setting the index and type on the request.
func (p *BulkProcessor) Add(request BulkableRequest) {
	p.requestsC <- request
}

// Flush manually asks all workers to commit their outstanding requests.
// It returns only when all workers acknowledge completion.
func (p *BulkProcessor) Flush() error {
	p.statsMu.Lock()
	p.stats.Flushed++
	p.statsMu.Unlock()

	for _, w := range p.workers {
		w.flushC <- struct{}{}
		<-w.flushAckC // wait for completion
	}
	return nil
}

// flusher is a single goroutine that periodically asks all workers to
// commit their outstanding bulk requests. It is only started if
// FlushInterval is greater than 0.
func (p *BulkProcessor) flusher(interval time.Duration) {
	ticker := time.NewTicker(interval)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C: // Periodic flush
			p.Flush() // TODO swallow errors here?

		case <-p.flusherStopC:
			p.flusherStopC <- struct{}{}
			return
		}
	}
}

// -- Bulk Worker --

// bulkWorker encapsulates a single worker, running in a goroutine,
// receiving bulk requests and eventually committing them to Elasticsearch.
// It is strongly bound to a BulkProcessor.
type bulkWorker struct {
	p           *BulkProcessor
	i           int
	bulkActions int
	bulkSize    int
	service     *BulkService
	flushC      chan struct{}
	flushAckC   chan struct{}
}

// newBulkWorker creates a new bulkWorker instance.
func newBulkWorker(p *BulkProcessor, i int) *bulkWorker {
	return &bulkWorker{
		p:           p,
		i:           i,
		bulkActions: p.bulkActions,
		bulkSize:    p.bulkSize,
		service:     NewBulkService(p.c),
		flushC:      make(chan struct{}),
		flushAckC:   make(chan struct{}),
	}
}

// work waits for bulk requests and manual flush calls on the respective
// channels and is invoked as a goroutine when the bulk processor is started.
func (w *bulkWorker) work(ctx context.Context) {
	defer func() {
		w.p.workerWg.Done()
		close(w.flushAckC)
		close(w.flushC)
	}()

	var stop bool
	for !stop {
		select {
		case req, open := <-w.p.requestsC:
			if open {
				// Received a new request
				w.service.Add(req)
				if w.commitRequired() {
					w.commit(ctx) // TODO swallow errors here?
				}
			} else {
				// Channel closed: Stop.
				stop = true
				if w.service.NumberOfActions() > 0 {
					w.commit(ctx) // TODO swallow errors here?
				}
			}

		case <-w.flushC:
			// Commit outstanding requests
			if w.service.NumberOfActions() > 0 {
				w.commit(ctx) // TODO swallow errors here?
			}
			w.flushAckC <- struct{}{}
		}
	}
}

// commit commits the bulk requests in the given service,
// invoking callbacks as specified.
func (w *bulkWorker) commit(ctx context.Context) error {
	var res *BulkResponse

	// commitFunc will commit bulk requests and, on failure, be retried
	// via exponential backoff
	commitFunc := func() error {
		var err error
		res, err = w.service.Do(ctx)
		return err
	}
	// notifyFunc will be called if retry fails
	notifyFunc := func(err error) {
		w.p.c.errorf("elastic: bulk processor %q failed but will retry: %v", w.p.name, err)
	}

	id := atomic.AddInt64(&w.p.executionId, 1)

	// Update # documents in queue before eventual retries
	w.p.statsMu.Lock()
	if w.p.wantStats {
		w.p.stats.Workers[w.i].Queued = int64(len(w.service.requests))
	}
	w.p.statsMu.Unlock()

	// Save requests because they will be reset in commitFunc
	reqs := w.service.requests

	// Invoke before callback
	if w.p.beforeFn != nil {
		w.p.beforeFn(id, reqs)
	}

	// Commit bulk requests
	policy := NewExponentialBackoff(w.p.initialTimeout, w.p.maxTimeout)
	err := RetryNotify(commitFunc, policy, notifyFunc)
	w.updateStats(res)
	if err != nil {
		w.p.c.errorf("elastic: bulk processor %q failed: %v", w.p.name, err)
	}

	// Invoke after callback
	if w.p.afterFn != nil {
		w.p.afterFn(id, reqs, res, err)
	}

	return err
}

func (w *bulkWorker) updateStats(res *BulkResponse) {
	// Update stats
	if res != nil {
		w.p.statsMu.Lock()
		if w.p.wantStats {
			w.p.stats.Committed++
			if res != nil {
				w.p.stats.Indexed += int64(len(res.Indexed()))
				w.p.stats.Created += int64(len(res.Created()))
				w.p.stats.Updated += int64(len(res.Updated()))
				w.p.stats.Deleted += int64(len(res.Deleted()))
				w.p.stats.Succeeded += int64(len(res.Succeeded()))
				w.p.stats.Failed += int64(len(res.Failed()))
			}
			w.p.stats.Workers[w.i].Queued = int64(len(w.service.requests))
			w.p.stats.Workers[w.i].LastDuration = time.Duration(int64(res.Took)) * time.Millisecond
		}
		w.p.statsMu.Unlock()
	}
}

// commitRequired returns true if the service has to commit its
// bulk requests. This can be either because the number of actions
// or the estimated size in bytes is larger than specified in the
// BulkProcessorService.
func (w *bulkWorker) commitRequired() bool {
	if w.bulkActions >= 0 && w.service.NumberOfActions() >= w.bulkActions {
		return true
	}
	if w.bulkSize >= 0 && w.service.EstimatedSizeInBytes() >= int64(w.bulkSize) {
		return true
	}
	return false
}