// Copyright (c) 2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

package store

import (
	"errors"
	"fmt"
	"sync"
)

// ErrBatchFull indicates that the batch is full
var ErrBatchFull = errors.New("batch is full")

type key interface {
	string | int | int64
}

// Batch represents an ordered batch
type Batch[K key, T any] struct {
	keys  []K
	items map[K]T
	limit uint32

	sync.Mutex
}

// Add adds the item to the batch
func (b *Batch[K, T]) Add(key K, item T) error {
	b.Lock()
	defer b.Unlock()

	if b.isFull() {
		return ErrBatchFull
	}

	if _, ok := b.items[key]; !ok {
		b.keys = append(b.keys, key)
	}
	b.items[key] = item

	return nil
}

// GetAll fetches the items and resets the batch
// Returned items are not referenced by the batch
func (b *Batch[K, T]) GetAll() (orderedKeys []K, orderedItems []T, err error) {
	b.Lock()
	defer b.Unlock()

	orderedKeys = append([]K(nil), b.keys...)
	for _, key := range orderedKeys {
		item, ok := b.items[key]
		if !ok {
			err = fmt.Errorf("item not found for the key: %v; should not happen;", key)
			return
		}
		orderedItems = append(orderedItems, item)
		delete(b.items, key)
	}

	b.keys = b.keys[:0]

	return
}

// GetByKey will get the batch item by the provided key
func (b *Batch[K, T]) GetByKey(key K) (T, bool) {
	b.Lock()
	defer b.Unlock()

	item, ok := b.items[key]
	return item, ok
}

// Len returns the no of items in the batch
func (b *Batch[K, T]) Len() int {
	b.Lock()
	defer b.Unlock()

	return len(b.keys)
}

// IsFull checks if the batch is full or not
func (b *Batch[K, T]) IsFull() bool {
	b.Lock()
	defer b.Unlock()

	return b.isFull()
}

func (b *Batch[K, T]) isFull() bool {
	return len(b.items) >= int(b.limit)
}

// NewBatch creates a new batch
func NewBatch[K key, T any](limit uint32) *Batch[K, T] {
	return &Batch[K, T]{
		keys:  make([]K, 0, limit),
		items: make(map[K]T, limit),
		limit: limit,
	}
}