minio/internal/store/batch.go

141 lines
2.9 KiB
Go
Raw Normal View History

// Copyright (c) 2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package store
import (
"context"
"errors"
"sync"
"time"
)
// ErrBatchFull indicates that the batch is full
var ErrBatchFull = errors.New("batch is full")
const defaultCommitTimeout = 30 * time.Second
// Batch represents an ordered batch
type Batch[I any] struct {
items []I
limit uint32
store Store[I]
quitCh chan struct{}
sync.Mutex
}
// BatchConfig represents the batch config
type BatchConfig[I any] struct {
Limit uint32
Store Store[I]
CommitTimeout time.Duration
Log logger
}
// Add adds the item to the batch
func (b *Batch[I]) Add(item I) error {
b.Lock()
defer b.Unlock()
if b.isFull() {
if b.store == nil {
return ErrBatchFull
}
// commit batch to store
if err := b.commit(); err != nil {
return err
}
}
b.items = append(b.items, item)
return nil
}
// Len returns the no of items in the batch
func (b *Batch[_]) Len() int {
b.Lock()
defer b.Unlock()
return len(b.items)
}
func (b *Batch[_]) isFull() bool {
return len(b.items) >= int(b.limit)
}
func (b *Batch[I]) commit() error {
switch len(b.items) {
case 0:
return nil
case 1:
_, err := b.store.Put(b.items[0])
return err
default:
}
if _, err := b.store.PutMultiple(b.items); err != nil {
return err
}
b.items = make([]I, 0, b.limit)
return nil
}
// Close commits the pending items and quits the goroutines
func (b *Batch[I]) Close() error {
defer func() {
close(b.quitCh)
}()
b.Lock()
defer b.Unlock()
return b.commit()
}
// NewBatch creates a new batch
func NewBatch[I any](config BatchConfig[I]) *Batch[I] {
if config.CommitTimeout == 0 {
config.CommitTimeout = defaultCommitTimeout
}
quitCh := make(chan struct{})
batch := &Batch[I]{
items: make([]I, 0, config.Limit),
limit: config.Limit,
store: config.Store,
quitCh: quitCh,
}
if batch.store != nil {
go func() {
commitTicker := time.NewTicker(config.CommitTimeout)
defer commitTicker.Stop()
for {
select {
case <-commitTicker.C:
case <-batch.quitCh:
return
}
batch.Lock()
err := batch.commit()
batch.Unlock()
if err != nil {
config.Log(context.Background(), err, "")
}
}
}()
}
return batch
}