diff --git a/internal/config/notify/help.go b/internal/config/notify/help.go index 4d07af917..343f46c7b 100644 --- a/internal/config/notify/help.go +++ b/internal/config/notify/help.go @@ -274,6 +274,18 @@ var ( Optional: true, Type: "number", }, + config.HelpKV{ + Key: target.KafkaBatchSize, + Description: "batch size of the events; used only when queue_dir is set", + Optional: true, + Type: "number", + }, + config.HelpKV{ + Key: target.KafkaBatchCommitTimeout, + Description: "commit timeout set for the batch; used only when batch_size > 1", + Optional: true, + Type: "duration", + }, } HelpMQTT = config.HelpKVS{ diff --git a/internal/config/notify/parse.go b/internal/config/notify/parse.go index 75091a67c..0f7dc4404 100644 --- a/internal/config/notify/parse.go +++ b/internal/config/notify/parse.go @@ -374,6 +374,10 @@ var ( Key: target.KafkaBatchSize, Value: "0", }, + config.KV{ + Key: target.KafkaBatchCommitTimeout, + Value: "0s", + }, config.KV{ Key: target.KafkaCompressionCodec, Value: "", @@ -463,14 +467,23 @@ func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs return nil, err } + batchCommitTimeoutEnv := target.EnvKafkaBatchCommitTimeout + if k != config.Default { + batchCommitTimeoutEnv = batchCommitTimeoutEnv + config.Default + k + } + batchCommitTimeout, err := time.ParseDuration(env.Get(batchCommitTimeoutEnv, kv.Get(target.KafkaBatchCommitTimeout))) + if err != nil { + return nil, err + } kafkaArgs := target.KafkaArgs{ - Enable: enabled, - Brokers: brokers, - Topic: env.Get(topicEnv, kv.Get(target.KafkaTopic)), - QueueDir: env.Get(queueDirEnv, kv.Get(target.KafkaQueueDir)), - QueueLimit: queueLimit, - Version: env.Get(versionEnv, kv.Get(target.KafkaVersion)), - BatchSize: uint32(batchSize), + Enable: enabled, + Brokers: brokers, + Topic: env.Get(topicEnv, kv.Get(target.KafkaTopic)), + QueueDir: env.Get(queueDirEnv, kv.Get(target.KafkaQueueDir)), + QueueLimit: queueLimit, + Version: env.Get(versionEnv, kv.Get(target.KafkaVersion)), + BatchSize: uint32(batchSize), + BatchCommitTimeout: batchCommitTimeout, } tlsEnableEnv := target.EnvKafkaTLS diff --git a/internal/event/target/amqp.go b/internal/event/target/amqp.go index 86f48f21a..c987088da 100644 --- a/internal/event/target/amqp.go +++ b/internal/event/target/amqp.go @@ -276,7 +276,8 @@ func (target *AMQPTarget) send(eventData event.Event, ch *amqp091.Channel, confi // Save - saves the events to the store which will be replayed when the amqp connection is active. func (target *AMQPTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { return err @@ -302,7 +303,7 @@ func (target *AMQPTarget) SendFromStore(key store.Key) error { } defer ch.Close() - eventData, eErr := target.store.Get(key.Name) + eventData, eErr := target.store.Get(key) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and wouldve been already been sent successfully. @@ -317,7 +318,7 @@ func (target *AMQPTarget) SendFromStore(key store.Key) error { } // Delete the event from store. - return target.store.Del(key.Name) + return target.store.Del(key) } // Close - does nothing and available for interface compatibility. diff --git a/internal/event/target/elasticsearch.go b/internal/event/target/elasticsearch.go index 69116d39a..35532334e 100644 --- a/internal/event/target/elasticsearch.go +++ b/internal/event/target/elasticsearch.go @@ -202,7 +202,8 @@ func (target *ElasticsearchTarget) isActive() (bool, error) { // Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active. func (target *ElasticsearchTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { return err @@ -278,7 +279,7 @@ func (target *ElasticsearchTarget) SendFromStore(key store.Key) error { return err } - eventData, eErr := target.store.Get(key.Name) + eventData, eErr := target.store.Get(key) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and wouldve been already been sent successfully. @@ -296,7 +297,7 @@ func (target *ElasticsearchTarget) SendFromStore(key store.Key) error { } // Delete the event from store. - return target.store.Del(key.Name) + return target.store.Del(key) } // Close - does nothing and available for interface compatibility. diff --git a/internal/event/target/kafka.go b/internal/event/target/kafka.go index 2ebddd321..d6af69b8b 100644 --- a/internal/event/target/kafka.go +++ b/internal/event/target/kafka.go @@ -43,23 +43,24 @@ import ( // Kafka input constants const ( - KafkaBrokers = "brokers" - KafkaTopic = "topic" - KafkaQueueDir = "queue_dir" - KafkaQueueLimit = "queue_limit" - KafkaTLS = "tls" - KafkaTLSSkipVerify = "tls_skip_verify" - KafkaTLSClientAuth = "tls_client_auth" - KafkaSASL = "sasl" - KafkaSASLUsername = "sasl_username" - KafkaSASLPassword = "sasl_password" - KafkaSASLMechanism = "sasl_mechanism" - KafkaClientTLSCert = "client_tls_cert" - KafkaClientTLSKey = "client_tls_key" - KafkaVersion = "version" - KafkaBatchSize = "batch_size" - KafkaCompressionCodec = "compression_codec" - KafkaCompressionLevel = "compression_level" + KafkaBrokers = "brokers" + KafkaTopic = "topic" + KafkaQueueDir = "queue_dir" + KafkaQueueLimit = "queue_limit" + KafkaTLS = "tls" + KafkaTLSSkipVerify = "tls_skip_verify" + KafkaTLSClientAuth = "tls_client_auth" + KafkaSASL = "sasl" + KafkaSASLUsername = "sasl_username" + KafkaSASLPassword = "sasl_password" + KafkaSASLMechanism = "sasl_mechanism" + KafkaClientTLSCert = "client_tls_cert" + KafkaClientTLSKey = "client_tls_key" + KafkaVersion = "version" + KafkaBatchSize = "batch_size" + KafkaBatchCommitTimeout = "batch_commit_timeout" + KafkaCompressionCodec = "compression_codec" + KafkaCompressionLevel = "compression_level" EnvKafkaEnable = "MINIO_NOTIFY_KAFKA_ENABLE" EnvKafkaBrokers = "MINIO_NOTIFY_KAFKA_BROKERS" @@ -77,6 +78,7 @@ const ( EnvKafkaClientTLSKey = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY" EnvKafkaVersion = "MINIO_NOTIFY_KAFKA_VERSION" EnvKafkaBatchSize = "MINIO_NOTIFY_KAFKA_BATCH_SIZE" + EnvKafkaBatchCommitTimeout = "MINIO_NOTIFY_KAFKA_BATCH_COMMIT_TIMEOUT" EnvKafkaProducerCompressionCodec = "MINIO_NOTIFY_KAFKA_PRODUCER_COMPRESSION_CODEC" EnvKafkaProducerCompressionLevel = "MINIO_NOTIFY_KAFKA_PRODUCER_COMPRESSION_LEVEL" ) @@ -91,14 +93,15 @@ var codecs = map[string]sarama.CompressionCodec{ // KafkaArgs - Kafka target arguments. type KafkaArgs struct { - Enable bool `json:"enable"` - Brokers []xnet.Host `json:"brokers"` - Topic string `json:"topic"` - QueueDir string `json:"queueDir"` - QueueLimit uint64 `json:"queueLimit"` - Version string `json:"version"` - BatchSize uint32 `json:"batchSize"` - TLS struct { + Enable bool `json:"enable"` + Brokers []xnet.Host `json:"brokers"` + Topic string `json:"topic"` + QueueDir string `json:"queueDir"` + QueueLimit uint64 `json:"queueLimit"` + Version string `json:"version"` + BatchSize uint32 `json:"batchSize"` + BatchCommitTimeout time.Duration `json:"batchCommitTimeout"` + TLS struct { Enable bool `json:"enable"` RootCAs *x509.CertPool `json:"-"` SkipVerify bool `json:"skipVerify"` @@ -146,6 +149,11 @@ func (k KafkaArgs) Validate() error { return errors.New("batch should be enabled only if queue dir is enabled") } } + if k.BatchCommitTimeout > 0 { + if k.QueueDir == "" || k.BatchSize <= 1 { + return errors.New("batch commit timeout should be set only if queue dir is enabled and batch size > 1") + } + } return nil } @@ -159,7 +167,7 @@ type KafkaTarget struct { producer sarama.SyncProducer config *sarama.Config store store.Store[event.Event] - batch *store.Batch[string, *sarama.ProducerMessage] + batch *store.Batch[event.Event] loggerOnce logger.LogOnce quitCh chan struct{} } @@ -199,7 +207,11 @@ func (target *KafkaTarget) isActive() (bool, error) { // Save - saves the events to the store which will be replayed when the Kafka connection is active. func (target *KafkaTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + if target.batch != nil { + return target.batch.Add(eventData) + } + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { return err @@ -220,80 +232,59 @@ func (target *KafkaTarget) send(eventData event.Event) error { return err } -// SendFromStore - reads an event from store and sends it to Kafka. -func (target *KafkaTarget) SendFromStore(key store.Key) error { - if err := target.init(); err != nil { - return err +// sendMultiple sends multiple messages to the kafka. +func (target *KafkaTarget) sendMultiple(events []event.Event) error { + if target.producer == nil { + return store.ErrNotConnected } - - // If batch is enabled, the event will be batched in memory - // and will be committed once the batch is full. - if target.batch != nil { - return target.addToBatch(key) - } - - eventData, eErr := target.store.Get(key.Name) - if eErr != nil { - // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() - // Such events will not exist and wouldve been already been sent successfully. - if os.IsNotExist(eErr) { - return nil - } - return eErr - } - - if err := target.send(eventData); err != nil { - if isKafkaConnErr(err) { - return store.ErrNotConnected - } - return err - } - - // Delete the event from store. - return target.store.Del(key.Name) -} - -func (target *KafkaTarget) addToBatch(key store.Key) error { - if target.batch.IsFull() { - if err := target.commitBatch(); err != nil { + var msgs []*sarama.ProducerMessage + for _, event := range events { + msg, err := target.toProducerMessage(event) + if err != nil { return err } + msgs = append(msgs, msg) } - if _, ok := target.batch.GetByKey(key.Name); !ok { - eventData, err := target.store.Get(key.Name) + return target.producer.SendMessages(msgs) +} + +// SendFromStore - reads an event from store and sends it to Kafka. +func (target *KafkaTarget) SendFromStore(key store.Key) (err error) { + if err = target.init(); err != nil { + return err + } + switch { + case key.ItemCount == 1: + var event event.Event + event, err = target.store.Get(key) + if err != nil { + // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() + // Such events will not exist and wouldve been already been sent successfully. + if os.IsNotExist(err) { + return nil + } + return err + } + err = target.send(event) + case key.ItemCount > 1: + var events []event.Event + events, err = target.store.GetMultiple(key) if err != nil { if os.IsNotExist(err) { return nil } return err } - msg, err := target.toProducerMessage(eventData) - if err != nil { - return err - } - if err = target.batch.Add(key.Name, msg); err != nil { - return err - } + err = target.sendMultiple(events) } - // commit the batch if the key is the last one present in the store. - if key.IsLast || target.batch.IsFull() { - return target.commitBatch() - } - return nil -} - -func (target *KafkaTarget) commitBatch() error { - keys, msgs, err := target.batch.GetAll() if err != nil { - return err - } - if err = target.producer.SendMessages(msgs); err != nil { if isKafkaConnErr(err) { return store.ErrNotConnected } return err } - return target.store.DelList(keys) + // Delete the event from store. + return target.store.Del(key) } func (target *KafkaTarget) toProducerMessage(eventData event.Event) (*sarama.ProducerMessage, error) { @@ -319,7 +310,18 @@ func (target *KafkaTarget) toProducerMessage(eventData event.Event) (*sarama.Pro func (target *KafkaTarget) Close() error { close(target.quitCh) + if target.batch != nil { + target.batch.Close() + } + if target.producer != nil { + if target.store != nil { + // It is safe to abort the current transaction if + // queue_dir is configured + target.producer.AbortTxn() + } else { + target.producer.CommitTxn() + } target.producer.Close() return target.client.Close() } @@ -442,10 +444,14 @@ func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*Kafk loggerOnce: loggerOnce, quitCh: make(chan struct{}), } - if target.store != nil { if args.BatchSize > 1 { - target.batch = store.NewBatch[string, *sarama.ProducerMessage](args.BatchSize) + target.batch = store.NewBatch[event.Event](store.BatchConfig[event.Event]{ + Limit: args.BatchSize, + Log: loggerOnce, + Store: queueStore, + CommitTimeout: args.BatchCommitTimeout, + }) } store.StreamItems(target.store, target, target.quitCh, target.loggerOnce) } diff --git a/internal/event/target/mqtt.go b/internal/event/target/mqtt.go index b390a6834..8f568cd3a 100644 --- a/internal/event/target/mqtt.go +++ b/internal/event/target/mqtt.go @@ -180,7 +180,7 @@ func (target *MQTTTarget) SendFromStore(key store.Key) error { return err } - eventData, err := target.store.Get(key.Name) + eventData, err := target.store.Get(key) if err != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and wouldve been already been sent successfully. @@ -195,14 +195,15 @@ func (target *MQTTTarget) SendFromStore(key store.Key) error { } // Delete the event from store. - return target.store.Del(key.Name) + return target.store.Del(key) } // Save - saves the events to the store if queuestore is configured, which will // be replayed when the mqtt connection is active. func (target *MQTTTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { return err diff --git a/internal/event/target/mysql.go b/internal/event/target/mysql.go index 11f419ab5..acccedd89 100644 --- a/internal/event/target/mysql.go +++ b/internal/event/target/mysql.go @@ -198,7 +198,8 @@ func (target *MySQLTarget) isActive() (bool, error) { // Save - saves the events to the store which will be replayed when the SQL connection is active. func (target *MySQLTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { return err @@ -273,7 +274,7 @@ func (target *MySQLTarget) SendFromStore(key store.Key) error { } } - eventData, eErr := target.store.Get(key.Name) + eventData, eErr := target.store.Get(key) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and wouldve been already been sent successfully. @@ -291,7 +292,7 @@ func (target *MySQLTarget) SendFromStore(key store.Key) error { } // Delete the event from store. - return target.store.Del(key.Name) + return target.store.Del(key) } // Close - closes underneath connections to MySQL database. diff --git a/internal/event/target/nats.go b/internal/event/target/nats.go index d6d781d73..b01aa141b 100644 --- a/internal/event/target/nats.go +++ b/internal/event/target/nats.go @@ -299,7 +299,8 @@ func (target *NATSTarget) isActive() (bool, error) { // Save - saves the events to the store which will be replayed when the Nats connection is active. func (target *NATSTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { @@ -353,7 +354,7 @@ func (target *NATSTarget) SendFromStore(key store.Key) error { return err } - eventData, eErr := target.store.Get(key.Name) + eventData, eErr := target.store.Get(key) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and wouldve been already been sent successfully. @@ -367,7 +368,7 @@ func (target *NATSTarget) SendFromStore(key store.Key) error { return err } - return target.store.Del(key.Name) + return target.store.Del(key) } // Close - closes underneath connections to NATS server. diff --git a/internal/event/target/nsq.go b/internal/event/target/nsq.go index cc95f288e..a44cae108 100644 --- a/internal/event/target/nsq.go +++ b/internal/event/target/nsq.go @@ -147,7 +147,8 @@ func (target *NSQTarget) isActive() (bool, error) { // Save - saves the events to the store which will be replayed when the nsq connection is active. func (target *NSQTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { @@ -188,7 +189,7 @@ func (target *NSQTarget) SendFromStore(key store.Key) error { return err } - eventData, eErr := target.store.Get(key.Name) + eventData, eErr := target.store.Get(key) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and wouldve been already been sent successfully. @@ -203,7 +204,7 @@ func (target *NSQTarget) SendFromStore(key store.Key) error { } // Delete the event from store. - return target.store.Del(key.Name) + return target.store.Del(key) } // Close - closes underneath connections to NSQD server. diff --git a/internal/event/target/postgresql.go b/internal/event/target/postgresql.go index e46a766e3..1a99db673 100644 --- a/internal/event/target/postgresql.go +++ b/internal/event/target/postgresql.go @@ -196,7 +196,8 @@ func (target *PostgreSQLTarget) isActive() (bool, error) { // Save - saves the events to the store if questore is configured, which will be replayed when the PostgreSQL connection is active. func (target *PostgreSQLTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { @@ -275,7 +276,7 @@ func (target *PostgreSQLTarget) SendFromStore(key store.Key) error { } } - eventData, eErr := target.store.Get(key.Name) + eventData, eErr := target.store.Get(key) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and wouldve been already been sent successfully. @@ -293,7 +294,7 @@ func (target *PostgreSQLTarget) SendFromStore(key store.Key) error { } // Delete the event from store. - return target.store.Del(key.Name) + return target.store.Del(key) } // Close - closes underneath connections to PostgreSQL database. diff --git a/internal/event/target/redis.go b/internal/event/target/redis.go index b403367d6..78d6c7555 100644 --- a/internal/event/target/redis.go +++ b/internal/event/target/redis.go @@ -173,7 +173,8 @@ func (target *RedisTarget) isActive() (bool, error) { // Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active. func (target *RedisTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { return err @@ -252,7 +253,7 @@ func (target *RedisTarget) SendFromStore(key store.Key) error { target.firstPing = true } - eventData, eErr := target.store.Get(key.Name) + eventData, eErr := target.store.Get(key) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and would've been already been sent successfully. @@ -270,7 +271,7 @@ func (target *RedisTarget) SendFromStore(key store.Key) error { } // Delete the event from store. - return target.store.Del(key.Name) + return target.store.Del(key) } // Close - releases the resources used by the pool. diff --git a/internal/event/target/webhook.go b/internal/event/target/webhook.go index 4935b4061..e5dc4f699 100644 --- a/internal/event/target/webhook.go +++ b/internal/event/target/webhook.go @@ -146,7 +146,8 @@ func (target *WebhookTarget) isActive() (bool, error) { // which will be replayed when the webhook connection is active. func (target *WebhookTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { return err @@ -213,7 +214,7 @@ func (target *WebhookTarget) SendFromStore(key store.Key) error { return err } - eventData, eErr := target.store.Get(key.Name) + eventData, eErr := target.store.Get(key) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and would've been already been sent successfully. @@ -231,7 +232,7 @@ func (target *WebhookTarget) SendFromStore(key store.Key) error { } // Delete the event from store. - return target.store.Del(key.Name) + return target.store.Del(key) } // Close - does nothing and available for interface compatibility. diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 2b37db57e..182a1dc0b 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -418,7 +418,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { if !isDirQueue { err = h.send(ctx, buf.Bytes(), count, h.payloadType, webhookCallTimeout) } else { - err = h.store.PutMultiple(entries) + _, err = h.store.PutMultiple(entries) } if err != nil { @@ -530,7 +530,7 @@ func New(config Config) (*Target, error) { // SendFromStore - reads the log from store and sends it to webhook. func (h *Target) SendFromStore(key store.Key) (err error) { var eventData []byte - eventData, err = h.store.GetRaw(key.Name) + eventData, err = h.store.GetRaw(key) if err != nil { if os.IsNotExist(err) { return nil @@ -552,7 +552,7 @@ func (h *Target) SendFromStore(key store.Key) (err error) { } // Delete the event from store. - return h.store.Del(key.Name) + return h.store.Del(key) } // Send the log message 'entry' to the http target. diff --git a/internal/logger/target/kafka/kafka.go b/internal/logger/target/kafka/kafka.go index 29a0bf013..c66920419 100644 --- a/internal/logger/target/kafka/kafka.go +++ b/internal/logger/target/kafka/kafka.go @@ -315,7 +315,8 @@ func (h *Target) IsOnline(_ context.Context) bool { func (h *Target) Send(ctx context.Context, entry interface{}) error { if h.store != nil { // save the entry to the queue store which will be replayed to the target. - return h.store.Put(entry) + _, err := h.store.Put(entry) + return err } h.logChMu.RLock() defer h.logChMu.RUnlock() @@ -344,7 +345,7 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error { // SendFromStore - reads the log from store and sends it to kafka. func (h *Target) SendFromStore(key store.Key) (err error) { - auditEntry, err := h.store.Get(key.Name) + auditEntry, err := h.store.Get(key) if err != nil { if os.IsNotExist(err) { return nil @@ -358,7 +359,7 @@ func (h *Target) SendFromStore(key store.Key) (err error) { return } // Delete the event from store. - return h.store.Del(key.Name) + return h.store.Del(key) } // Cancel - cancels the target diff --git a/internal/store/batch.go b/internal/store/batch.go index cba034f4b..bb31135c9 100644 --- a/internal/store/batch.go +++ b/internal/store/batch.go @@ -18,100 +18,123 @@ package store import ( + "context" "errors" - "fmt" "sync" + "time" ) // ErrBatchFull indicates that the batch is full var ErrBatchFull = errors.New("batch is full") -type key interface { - string | int | int64 -} +const defaultCommitTimeout = 30 * time.Second // Batch represents an ordered batch -type Batch[K key, T any] struct { - keys []K - items map[K]T - limit uint32 +type Batch[I any] struct { + items []I + limit uint32 + store Store[I] + quitCh chan struct{} sync.Mutex } +// BatchConfig represents the batch config +type BatchConfig[I any] struct { + Limit uint32 + Store Store[I] + CommitTimeout time.Duration + Log logger +} + // Add adds the item to the batch -func (b *Batch[K, T]) Add(key K, item T) error { +func (b *Batch[I]) Add(item I) error { b.Lock() defer b.Unlock() if b.isFull() { - return ErrBatchFull + if b.store == nil { + return ErrBatchFull + } + // commit batch to store + if err := b.commit(); err != nil { + return err + } } - if _, ok := b.items[key]; !ok { - b.keys = append(b.keys, key) - } - b.items[key] = item - + b.items = append(b.items, item) return nil } -// GetAll fetches the items and resets the batch -// Returned items are not referenced by the batch -func (b *Batch[K, T]) GetAll() (orderedKeys []K, orderedItems []T, err error) { - b.Lock() - defer b.Unlock() - - orderedKeys = append([]K(nil), b.keys...) - for _, key := range orderedKeys { - item, ok := b.items[key] - if !ok { - err = fmt.Errorf("item not found for the key: %v; should not happen;", key) - return - } - orderedItems = append(orderedItems, item) - delete(b.items, key) - } - - b.keys = b.keys[:0] - - return -} - -// GetByKey will get the batch item by the provided key -func (b *Batch[K, T]) GetByKey(key K) (T, bool) { - b.Lock() - defer b.Unlock() - - item, ok := b.items[key] - return item, ok -} - // Len returns the no of items in the batch -func (b *Batch[K, T]) Len() int { +func (b *Batch[_]) Len() int { b.Lock() defer b.Unlock() - return len(b.keys) + return len(b.items) } -// IsFull checks if the batch is full or not -func (b *Batch[K, T]) IsFull() bool { - b.Lock() - defer b.Unlock() - - return b.isFull() -} - -func (b *Batch[K, T]) isFull() bool { +func (b *Batch[_]) isFull() bool { return len(b.items) >= int(b.limit) } -// NewBatch creates a new batch -func NewBatch[K key, T any](limit uint32) *Batch[K, T] { - return &Batch[K, T]{ - keys: make([]K, 0, limit), - items: make(map[K]T, limit), - limit: limit, +func (b *Batch[I]) commit() error { + switch len(b.items) { + case 0: + return nil + case 1: + _, err := b.store.Put(b.items[0]) + return err + default: } + if _, err := b.store.PutMultiple(b.items); err != nil { + return err + } + b.items = make([]I, 0, b.limit) + return nil +} + +// Close commits the pending items and quits the goroutines +func (b *Batch[I]) Close() error { + defer func() { + close(b.quitCh) + }() + + b.Lock() + defer b.Unlock() + return b.commit() +} + +// NewBatch creates a new batch +func NewBatch[I any](config BatchConfig[I]) *Batch[I] { + if config.CommitTimeout == 0 { + config.CommitTimeout = defaultCommitTimeout + } + quitCh := make(chan struct{}) + batch := &Batch[I]{ + items: make([]I, 0, config.Limit), + limit: config.Limit, + store: config.Store, + quitCh: quitCh, + } + if batch.store != nil { + go func() { + commitTicker := time.NewTicker(config.CommitTimeout) + defer commitTicker.Stop() + for { + select { + case <-commitTicker.C: + case <-batch.quitCh: + return + } + batch.Lock() + err := batch.commit() + batch.Unlock() + if err != nil { + config.Log(context.Background(), err, "") + } + } + }() + } + return batch } diff --git a/internal/store/batch_test.go b/internal/store/batch_test.go index db8b95dc8..faf9d4556 100644 --- a/internal/store/batch_test.go +++ b/internal/store/batch_test.go @@ -18,109 +18,202 @@ package store import ( - "errors" + "context" "sync" "testing" + "time" ) -func TestBatch(t *testing.T) { +func TestBatchCommit(t *testing.T) { + defer func() { + if err := tearDownQueueStore(); err != nil { + t.Fatalf("Failed to tear down store; %v", err) + } + }() + store, err := setUpQueueStore(queueDir, 100) + if err != nil { + t.Fatalf("Failed to create a queue store; %v", err) + } + var limit uint32 = 100 - batch := NewBatch[int, int](limit) + + batch := NewBatch[TestItem](BatchConfig[TestItem]{ + Limit: limit, + Store: store, + CommitTimeout: 5 * time.Minute, + Log: func(ctx context.Context, err error, id string, errKind ...interface{}) { + t.Log(err) + }, + }) + defer batch.Close() + for i := 0; i < int(limit); i++ { - if err := batch.Add(i, i); err != nil { + if err := batch.Add(testItem); err != nil { t.Fatalf("failed to add %v; %v", i, err) } - if _, ok := batch.GetByKey(i); !ok { - t.Fatalf("failed to get the item by key %v after adding", i) - } - } - err := batch.Add(101, 101) - if err == nil || !errors.Is(err, ErrBatchFull) { - t.Fatalf("Expected err %v but got %v", ErrBatchFull, err) - } - if !batch.IsFull() { - t.Fatal("Expected batch.IsFull to be true but got false") } + batchLen := batch.Len() if batchLen != int(limit) { - t.Fatalf("expected batch length to be %v but got %v", limit, batchLen) + t.Fatalf("Expected batch.Len() %v; but got %v", limit, batchLen) } - keys, items, err := batch.GetAll() - if err != nil { - t.Fatalf("unable to get the items from the batch; %v", err) + + keys := store.List() + if len(keys) > 0 { + t.Fatalf("Expected empty store list but got len(list) %v", len(keys)) } - if len(items) != int(limit) { - t.Fatalf("Expected length of the batch items to be %v but got %v", limit, len(items)) - } - if len(keys) != int(limit) { - t.Fatalf("Expected length of the batch keys to be %v but got %v", limit, len(items)) + if err := batch.Add(testItem); err != nil { + t.Fatalf("unable to add to the batch; %v", err) } batchLen = batch.Len() - if batchLen != 0 { - t.Fatalf("expected batch to be empty but still left with %d items", batchLen) - } - // Add duplicate entries - for i := 0; i < 10; i++ { - if err := batch.Add(99, 99); err != nil { - t.Fatalf("failed to add duplicate item %v to batch after Get; %v", i, err) - } - } - if _, ok := batch.GetByKey(99); !ok { - t.Fatal("failed to get the duplicxate item by key '99' after adding") - } - keys, items, err = batch.GetAll() - if err != nil { - t.Fatalf("unable to get the items from the batch; %v", err) - } - if len(items) != 1 { - t.Fatalf("Expected length of the batch items to be 1 but got %v", len(items)) + if batchLen != 1 { + t.Fatalf("expected batch length to be 1 but got %v", batchLen) } + keys = store.List() if len(keys) != 1 { - t.Fatalf("Expected length of the batch keys to be 1 but got %v", len(items)) + t.Fatalf("expected len(store.List())=1; but got %v", len(keys)) } - // try adding again after Get. + key := keys[0] + if !key.Compress { + t.Fatal("expected key.Compress=true; but got false") + } + if key.ItemCount != int(limit) { + t.Fatalf("expected key.ItemCount=%d; but got %v", limit, key.ItemCount) + } + items, err := store.GetMultiple(key) + if err != nil { + t.Fatalf("unable to read key %v; %v", key.String(), err) + } + if len(items) != int(limit) { + t.Fatalf("expected len(items)=%d; but got %v", limit, len(items)) + } +} + +func TestBatchCommitOnExit(t *testing.T) { + defer func() { + if err := tearDownQueueStore(); err != nil { + t.Fatalf("Failed to tear down store; %v", err) + } + }() + store, err := setUpQueueStore(queueDir, 100) + if err != nil { + t.Fatalf("Failed to create a queue store; %v", err) + } + + var limit uint32 = 100 + + batch := NewBatch[TestItem](BatchConfig[TestItem]{ + Limit: limit, + Store: store, + CommitTimeout: 5 * time.Minute, + Log: func(ctx context.Context, err error, id string, errKind ...interface{}) { + t.Log([]any{err, id, errKind}...) + }, + }) + for i := 0; i < int(limit); i++ { - if err := batch.Add(i, i); err != nil { - t.Fatalf("failed to add item %v to batch after Get; %v", i, err) - } - if _, ok := batch.GetByKey(i); !ok { - t.Fatalf("failed to get the item by key %v after adding", i) + if err := batch.Add(testItem); err != nil { + t.Fatalf("failed to add %v; %v", i, err) } } + + batch.Close() + time.Sleep(1 * time.Second) + + batchLen := batch.Len() + if batchLen != 0 { + t.Fatalf("Expected batch.Len()=0; but got %v", batchLen) + } + + keys := store.List() + if len(keys) != 1 { + t.Fatalf("expected len(store.List())=1; but got %v", len(keys)) + } + + key := keys[0] + if !key.Compress { + t.Fatal("expected key.Compress=true; but got false") + } + if key.ItemCount != int(limit) { + t.Fatalf("expected key.ItemCount=%d; but got %v", limit, key.ItemCount) + } + items, err := store.GetMultiple(key) + if err != nil { + t.Fatalf("unable to read key %v; %v", key.String(), err) + } + if len(items) != int(limit) { + t.Fatalf("expected len(items)=%d; but got %v", limit, len(items)) + } } func TestBatchWithConcurrency(t *testing.T) { + defer func() { + if err := tearDownQueueStore(); err != nil { + t.Fatalf("Failed to tear down store; %v", err) + } + }() + store, err := setUpQueueStore(queueDir, 100) + if err != nil { + t.Fatalf("Failed to create a queue store; %v", err) + } + var limit uint32 = 100 - batch := NewBatch[int, int](limit) + + batch := NewBatch[TestItem](BatchConfig[TestItem]{ + Limit: limit, + Store: store, + CommitTimeout: 5 * time.Minute, + Log: func(ctx context.Context, err error, id string, errKind ...interface{}) { + t.Log(err) + }, + }) + defer batch.Close() var wg sync.WaitGroup for i := 0; i < int(limit); i++ { wg.Add(1) - go func(item int) { + go func(key int) { defer wg.Done() - if err := batch.Add(item, item); err != nil { - t.Errorf("failed to add item %v; %v", item, err) + if err := batch.Add(testItem); err != nil { + t.Errorf("failed to add item %v; %v", key, err) return } - if _, ok := batch.GetByKey(item); !ok { - t.Errorf("failed to get the item by key %v after adding", item) - } }(i) } wg.Wait() - keys, items, err := batch.GetAll() + batchLen := batch.Len() + if batchLen != int(limit) { + t.Fatalf("Expected batch.Len() %v; but got %v", limit, batchLen) + } + + keys := store.List() + if len(keys) > 0 { + t.Fatalf("Expected empty store list but got len(list) %v", len(keys)) + } + if err := batch.Add(testItem); err != nil { + t.Fatalf("unable to add to the batch; %v", err) + } + batchLen = batch.Len() + if batchLen != 1 { + t.Fatalf("expected batch length to be 1 but got %v", batchLen) + } + keys = store.List() + if len(keys) != 1 { + t.Fatalf("expected len(store.List())=1; but got %v", len(keys)) + } + key := keys[0] + if !key.Compress { + t.Fatal("expected key.Compress=true; but got false") + } + if key.ItemCount != int(limit) { + t.Fatalf("expected key.ItemCount=%d; but got %v", limit, key.ItemCount) + } + items, err := store.GetMultiple(key) if err != nil { - t.Fatalf("unable to get the items from the batch; %v", err) + t.Fatalf("unable to read key %v; %v", key.String(), err) } if len(items) != int(limit) { - t.Fatalf("expected batch length %v but got %v", limit, len(items)) - } - if len(keys) != int(limit) { - t.Fatalf("Expected length of the batch keys to be %v but got %v", limit, len(items)) - } - batchLen := batch.Len() - if batchLen != 0 { - t.Fatalf("expected batch to be empty but still left with %d items", batchLen) + t.Fatalf("expected len(items)=%d; but got %v", limit, len(items)) } } diff --git a/internal/store/queuestore.go b/internal/store/queuestore.go index 002202f49..1ee6278bb 100644 --- a/internal/store/queuestore.go +++ b/internal/store/queuestore.go @@ -18,24 +18,25 @@ package store import ( + "bytes" "encoding/json" "errors" - "fmt" "os" "path/filepath" "sort" - "strings" "sync" "time" "github.com/google/uuid" jsoniter "github.com/json-iterator/go" + "github.com/klauspost/compress/s2" "github.com/valyala/bytebufferpool" ) const ( defaultLimit = 100000 // Default store limit. defaultExt = ".unknown" + compressExt = ".snappy" ) // errLimitExceeded error is sent when the maximum limit is reached. @@ -83,18 +84,12 @@ func (store *QueueStore[_]) Open() error { return err } - // Truncate entries. - if uint64(len(files)) > store.entryLimit { - files = files[:store.entryLimit] - } - for _, file := range files { if file.IsDir() { continue } - key := strings.TrimSuffix(file.Name(), store.fileExt) if fi, err := file.Info(); err == nil { - store.entries[key] = fi.ModTime().UnixNano() + store.entries[file.Name()] = fi.ModTime().UnixNano() } } @@ -107,96 +102,138 @@ func (store *QueueStore[_]) Delete() error { } // PutMultiple - puts an item to the store. -func (store *QueueStore[I]) PutMultiple(item []I) error { +func (store *QueueStore[I]) PutMultiple(items []I) (Key, error) { // Generate a new UUID for the key. - key, err := uuid.NewRandom() + uid, err := uuid.NewRandom() if err != nil { - return err + return Key{}, err } store.Lock() defer store.Unlock() if uint64(len(store.entries)) >= store.entryLimit { - return errLimitExceeded + return Key{}, errLimitExceeded } - return store.multiWrite(fmt.Sprintf("%d:%s", len(item), key.String()), item) + key := Key{ + Name: uid.String(), + ItemCount: len(items), + Compress: true, + Extension: store.fileExt, + } + return key, store.multiWrite(key, items) } // multiWrite - writes an item to the directory. -func (store *QueueStore[I]) multiWrite(key string, item []I) error { +func (store *QueueStore[I]) multiWrite(key Key, items []I) (err error) { buf := bytebufferpool.Get() defer bytebufferpool.Put(buf) enc := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(buf) - for i := range item { - err := enc.Encode(item[i]) - if err != nil { + for i := range items { + if err = enc.Encode(items[i]); err != nil { return err } } - b := buf.Bytes() - path := filepath.Join(store.directory, key+store.fileExt) - err := os.WriteFile(path, b, os.FileMode(0o770)) + path := filepath.Join(store.directory, key.String()) + if key.Compress { + err = os.WriteFile(path, s2.Encode(nil, buf.Bytes()), os.FileMode(0o770)) + } else { + err = os.WriteFile(path, buf.Bytes(), os.FileMode(0o770)) + } + buf.Reset() if err != nil { return err } // Increment the item count. - store.entries[key] = time.Now().UnixNano() + store.entries[key.String()] = time.Now().UnixNano() - return nil + return } // write - writes an item to the directory. -func (store *QueueStore[I]) write(key string, item I) error { +func (store *QueueStore[I]) write(key Key, item I) error { // Marshalls the item. eventData, err := json.Marshal(item) if err != nil { return err } + return store.writeBytes(key, eventData) +} - path := filepath.Join(store.directory, key+store.fileExt) - if err := os.WriteFile(path, eventData, os.FileMode(0o770)); err != nil { - return err +// writeBytes - writes bytes to the directory. +func (store *QueueStore[I]) writeBytes(key Key, b []byte) (err error) { + path := filepath.Join(store.directory, key.String()) + + if key.Compress { + err = os.WriteFile(path, s2.Encode(nil, b), os.FileMode(0o770)) + } else { + err = os.WriteFile(path, b, os.FileMode(0o770)) } + if err != nil { + return err + } // Increment the item count. - store.entries[key] = time.Now().UnixNano() - + store.entries[key.String()] = time.Now().UnixNano() return nil } // Put - puts an item to the store. -func (store *QueueStore[I]) Put(item I) error { +func (store *QueueStore[I]) Put(item I) (Key, error) { store.Lock() defer store.Unlock() if uint64(len(store.entries)) >= store.entryLimit { - return errLimitExceeded + return Key{}, errLimitExceeded } // Generate a new UUID for the key. - key, err := uuid.NewRandom() + uid, err := uuid.NewRandom() if err != nil { - return err + return Key{}, err } - return store.write(key.String(), item) + key := Key{ + Name: uid.String(), + Extension: store.fileExt, + ItemCount: 1, + } + return key, store.write(key, item) +} + +// PutRaw - puts the raw bytes to the store +func (store *QueueStore[I]) PutRaw(b []byte) (Key, error) { + store.Lock() + defer store.Unlock() + if uint64(len(store.entries)) >= store.entryLimit { + return Key{}, errLimitExceeded + } + // Generate a new UUID for the key. + uid, err := uuid.NewRandom() + if err != nil { + return Key{}, err + } + key := Key{ + Name: uid.String(), + Extension: store.fileExt, + } + return key, store.writeBytes(key, b) } // GetRaw - gets an item from the store. -func (store *QueueStore[I]) GetRaw(key string) (raw []byte, err error) { +func (store *QueueStore[I]) GetRaw(key Key) (raw []byte, err error) { store.RLock() defer func(store *QueueStore[I]) { store.RUnlock() - if err != nil { + if err != nil && !os.IsNotExist(err) { // Upon error we remove the entry. store.Del(key) } }(store) - raw, err = os.ReadFile(filepath.Join(store.directory, key+store.fileExt)) + raw, err = os.ReadFile(filepath.Join(store.directory, key.String())) if err != nil { return } @@ -209,19 +246,19 @@ func (store *QueueStore[I]) GetRaw(key string) (raw []byte, err error) { } // Get - gets an item from the store. -func (store *QueueStore[I]) Get(key string) (item I, err error) { +func (store *QueueStore[I]) Get(key Key) (item I, err error) { store.RLock() defer func(store *QueueStore[I]) { store.RUnlock() - if err != nil { + if err != nil && !os.IsNotExist(err) { // Upon error we remove the entry. store.Del(key) } }(store) var eventData []byte - eventData, err = os.ReadFile(filepath.Join(store.directory, key+store.fileExt)) + eventData, err = os.ReadFile(filepath.Join(store.directory, key.String())) if err != nil { return item, err } @@ -237,28 +274,52 @@ func (store *QueueStore[I]) Get(key string) (item I, err error) { return item, nil } +// GetMultiple will read the multi payload file and fetch the items +func (store *QueueStore[I]) GetMultiple(key Key) (items []I, err error) { + store.RLock() + + defer func(store *QueueStore[I]) { + store.RUnlock() + if err != nil && !os.IsNotExist(err) { + // Upon error we remove the entry. + store.Del(key) + } + }(store) + + raw, err := os.ReadFile(filepath.Join(store.directory, key.String())) + if err != nil { + return + } + + var decoder *jsoniter.Decoder + if key.Compress { + decodedBytes, err := s2.Decode(nil, raw) + if err != nil { + return nil, err + } + decoder = jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(bytes.NewReader(decodedBytes)) + } else { + decoder = jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(bytes.NewReader(raw)) + } + + for decoder.More() { + var item I + if err := decoder.Decode(&item); err != nil { + return nil, err + } + items = append(items, item) + } + + return +} + // Del - Deletes an entry from the store. -func (store *QueueStore[_]) Del(key string) error { +func (store *QueueStore[_]) Del(key Key) error { store.Lock() defer store.Unlock() return store.del(key) } -// DelList - Deletes a list of entries from the store. -// Returns an error even if one key fails to be deleted. -func (store *QueueStore[_]) DelList(keys []string) error { - store.Lock() - defer store.Unlock() - - for _, key := range keys { - if err := store.del(key); err != nil { - return err - } - } - - return nil -} - // Len returns the entry count. func (store *QueueStore[_]) Len() int { store.RLock() @@ -268,30 +329,35 @@ func (store *QueueStore[_]) Len() int { } // lockless call -func (store *QueueStore[_]) del(key string) error { - err := os.Remove(filepath.Join(store.directory, key+store.fileExt)) +func (store *QueueStore[_]) del(key Key) error { + err := os.Remove(filepath.Join(store.directory, key.String())) // Delete as entry no matter the result - delete(store.entries, key) + delete(store.entries, key.String()) return err } // List - lists all files registered in the store. -func (store *QueueStore[_]) List() ([]string, error) { +func (store *QueueStore[_]) List() (keys []Key) { store.RLock() - l := make([]string, 0, len(store.entries)) - for k := range store.entries { - l = append(l, k) + defer store.RUnlock() + + entries := make([]string, 0, len(store.entries)) + for entry := range store.entries { + entries = append(entries, entry) } // Sort entries... - sort.Slice(l, func(i, j int) bool { - return store.entries[l[i]] < store.entries[l[j]] + sort.Slice(entries, func(i, j int) bool { + return store.entries[entries[i]] < store.entries[entries[j]] }) - store.RUnlock() - return l, nil + for i := range entries { + keys = append(keys, parseKey(entries[i])) + } + + return keys } // list will read all entries from disk. @@ -318,9 +384,3 @@ func (store *QueueStore[_]) list() ([]os.DirEntry, error) { return files, nil } - -// Extension will return the file extension used -// for the items stored in the queue. -func (store *QueueStore[_]) Extension() string { - return store.fileExt -} diff --git a/internal/store/queuestore_test.go b/internal/store/queuestore_test.go index 680cb177b..bcb47049d 100644 --- a/internal/store/queuestore_test.go +++ b/internal/store/queuestore_test.go @@ -18,10 +18,10 @@ package store import ( + "fmt" "os" "path/filepath" "reflect" - "strings" "testing" ) @@ -66,17 +66,14 @@ func TestQueueStorePut(t *testing.T) { } // Put 100 items. for i := 0; i < 100; i++ { - if err := store.Put(testItem); err != nil { + if _, err := store.Put(testItem); err != nil { t.Fatal("Failed to put to queue store ", err) } } // Count the items. - names, err := store.List() - if err != nil { - t.Fatal(err) - } - if len(names) != 100 { - t.Fatalf("List() Expected: 100, got %d", len(names)) + keys := store.List() + if len(keys) != 100 { + t.Fatalf("List() Expected: 100, got %d", len(keys)) } } @@ -93,18 +90,15 @@ func TestQueueStoreGet(t *testing.T) { } // Put 10 items for i := 0; i < 10; i++ { - if err := store.Put(testItem); err != nil { + if _, err := store.Put(testItem); err != nil { t.Fatal("Failed to put to queue store ", err) } } - itemKeys, err := store.List() - if err != nil { - t.Fatal(err) - } + itemKeys := store.List() // Get 10 items. if len(itemKeys) == 10 { for _, key := range itemKeys { - item, eErr := store.Get(strings.TrimSuffix(key, testItemExt)) + item, eErr := store.Get(key) if eErr != nil { t.Fatal("Failed to Get the item from the queue store ", eErr) } @@ -130,18 +124,15 @@ func TestQueueStoreDel(t *testing.T) { } // Put 20 items. for i := 0; i < 20; i++ { - if err := store.Put(testItem); err != nil { + if _, err := store.Put(testItem); err != nil { t.Fatal("Failed to put to queue store ", err) } } - itemKeys, err := store.List() - if err != nil { - t.Fatal(err) - } + itemKeys := store.List() // Remove all the items. if len(itemKeys) == 20 { for _, key := range itemKeys { - err := store.Del(strings.TrimSuffix(key, testItemExt)) + err := store.Del(key) if err != nil { t.Fatal("queue store Del failed with ", err) } @@ -150,12 +141,9 @@ func TestQueueStoreDel(t *testing.T) { t.Fatalf("List() Expected: 20, got %d", len(itemKeys)) } - names, err := store.List() - if err != nil { - t.Fatal(err) - } - if len(names) != 0 { - t.Fatalf("List() Expected: 0, got %d", len(names)) + keys := store.List() + if len(keys) != 0 { + t.Fatalf("List() Expected: 0, got %d", len(keys)) } } @@ -172,12 +160,12 @@ func TestQueueStoreLimit(t *testing.T) { t.Fatal("Failed to create a queue store ", err) } for i := 0; i < 5; i++ { - if err := store.Put(testItem); err != nil { + if _, err := store.Put(testItem); err != nil { t.Fatal("Failed to put to queue store ", err) } } // Should not allow 6th Put. - if err := store.Put(testItem); err == nil { + if _, err := store.Put(testItem); err == nil { t.Fatalf("Expected to fail with %s, but passes", errLimitExceeded) } } @@ -194,18 +182,15 @@ func TestQueueStoreListN(t *testing.T) { t.Fatal("Failed to create a queue store ", err) } for i := 0; i < 10; i++ { - if err := store.Put(testItem); err != nil { + if _, err := store.Put(testItem); err != nil { t.Fatal("Failed to put to queue store ", err) } } // Should return all the item keys in the store. - names, err := store.List() - if err != nil { - t.Fatal(err) - } + keys := store.List() - if len(names) != 10 { - t.Fatalf("List() Expected: 10, got %d", len(names)) + if len(keys) != 10 { + t.Fatalf("List() Expected: 10, got %d", len(keys)) } // re-open @@ -213,28 +198,154 @@ func TestQueueStoreListN(t *testing.T) { if err != nil { t.Fatal("Failed to create a queue store ", err) } - names, err = store.List() - if err != nil { - t.Fatal(err) - } + keys = store.List() - if len(names) != 10 { - t.Fatalf("List() Expected: 10, got %d", len(names)) + if len(keys) != 10 { + t.Fatalf("List() Expected: 10, got %d", len(keys)) } - if len(names) != store.Len() { - t.Fatalf("List() Expected: 10, got %d", len(names)) + if len(keys) != store.Len() { + t.Fatalf("List() Expected: 10, got %d", len(keys)) } // Delete all - for _, key := range names { + for _, key := range keys { err := store.Del(key) if err != nil { t.Fatal(err) } } // Re-list - lst, err := store.List() - if len(lst) > 0 || err != nil { - t.Fatalf("Expected List() to return empty list and no error, got %v err: %v", lst, err) + keys = store.List() + if len(keys) > 0 || err != nil { + t.Fatalf("Expected List() to return empty list and no error, got %v err: %v", keys, err) + } +} + +func TestMultiplePutGets(t *testing.T) { + defer func() { + if err := tearDownQueueStore(); err != nil { + t.Fatalf("Failed to tear down store; %v", err) + } + }() + store, err := setUpQueueStore(queueDir, 10) + if err != nil { + t.Fatalf("Failed to create a queue store; %v", err) + } + // TestItem{Name: "test-item", Property: "property"} + var items []TestItem + for i := 0; i < 10; i++ { + items = append(items, TestItem{ + Name: fmt.Sprintf("test-item-%d", i), + Property: "property", + }) + } + + if _, err := store.PutMultiple(items); err != nil { + t.Fatalf("failed to put multiple; %v", err) + } + + keys := store.List() + if len(keys) != 1 { + t.Fatalf("expected len(keys)=1, but found %d", len(keys)) + } + + key := keys[0] + if !key.Compress { + t.Fatal("expected the item to be compressed") + } + if key.ItemCount != 10 { + t.Fatalf("expected itemcount=10 but found %v", key.ItemCount) + } + + resultItems, err := store.GetMultiple(key) + if err != nil { + t.Fatalf("unable to get multiple items; %v", err) + } + + if !reflect.DeepEqual(resultItems, items) { + t.Fatalf("expected item list: %v; but got %v", items, resultItems) + } + + if err := store.Del(key); err != nil { + t.Fatalf("unable to Del; %v", err) + } + + // Re-list + keys = store.List() + if len(keys) > 0 || err != nil { + t.Fatalf("Expected List() to return empty list and no error, got %v err: %v", keys, err) + } +} + +func TestMixedPutGets(t *testing.T) { + defer func() { + if err := tearDownQueueStore(); err != nil { + t.Fatalf("Failed to tear down store; %v", err) + } + }() + store, err := setUpQueueStore(queueDir, 10) + if err != nil { + t.Fatalf("Failed to create a queue store; %v", err) + } + // TestItem{Name: "test-item", Property: "property"} + var items []TestItem + for i := 0; i < 5; i++ { + items = append(items, TestItem{ + Name: fmt.Sprintf("test-item-%d", i), + Property: "property", + }) + } + if _, err := store.PutMultiple(items); err != nil { + t.Fatalf("failed to put multiple; %v", err) + } + + for i := 5; i < 10; i++ { + item := TestItem{ + Name: fmt.Sprintf("test-item-%d", i), + Property: "property", + } + if _, err := store.Put(item); err != nil { + t.Fatalf("unable to store.Put(); %v", err) + } + items = append(items, item) + } + + keys := store.List() + if len(keys) != 6 { + // 1 multiple + 5 single PUTs + t.Fatalf("expected len(keys)=6, but found %d", len(keys)) + } + + var resultItems []TestItem + for _, key := range keys { + if key.ItemCount > 1 { + items, err := store.GetMultiple(key) + if err != nil { + t.Fatalf("unable to get multiple items; %v", err) + } + resultItems = append(resultItems, items...) + continue + } + item, err := store.Get(key) + if err != nil { + t.Fatalf("unable to get item; %v", err) + } + resultItems = append(resultItems, item) + } + + if !reflect.DeepEqual(resultItems, items) { + t.Fatalf("expected item list: %v; but got %v", items, resultItems) + } + + // Delete all + for _, key := range keys { + if err := store.Del(key); err != nil { + t.Fatalf("unable to Del; %v", err) + } + } + // Re-list + keys = store.List() + if len(keys) > 0 || err != nil { + t.Fatalf("Expected List() to return empty list and no error, got %v err: %v", keys, err) } } diff --git a/internal/store/store.go b/internal/store/store.go index a72721856..fcd76dc60 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "strconv" "strings" "time" @@ -44,23 +45,64 @@ type Target interface { // Store - Used to persist items. type Store[I any] interface { - Put(item I) error - PutMultiple(item []I) error - Get(key string) (I, error) - GetRaw(key string) ([]byte, error) + Put(item I) (Key, error) + PutMultiple(item []I) (Key, error) + Get(key Key) (I, error) + GetMultiple(key Key) ([]I, error) + GetRaw(key Key) ([]byte, error) + PutRaw(b []byte) (Key, error) Len() int - List() ([]string, error) - Del(key string) error - DelList(key []string) error + List() []Key + Del(key Key) error Open() error Delete() error - Extension() string } // Key denotes the key present in the store. type Key struct { - Name string - IsLast bool + Name string + Compress bool + Extension string + ItemCount int +} + +// String returns the filepath name +func (k Key) String() string { + keyStr := k.Name + if k.ItemCount > 1 { + keyStr = fmt.Sprintf("%d:%s", k.ItemCount, k.Name) + } + return keyStr + k.Extension + func() string { + if k.Compress { + return compressExt + } + return "" + }() +} + +func getItemCount(k string) (count int, err error) { + count = 1 + v := strings.Split(k, ":") + if len(v) == 2 { + return strconv.Atoi(v[0]) + } + return +} + +func parseKey(k string) (key Key) { + key.Name = k + if strings.HasSuffix(k, compressExt) { + key.Compress = true + key.Name = strings.TrimSuffix(key.Name, compressExt) + } + if key.ItemCount, _ = getItemCount(k); key.ItemCount > 1 { + key.Name = strings.TrimPrefix(key.Name, fmt.Sprintf("%d:", key.ItemCount)) + } + if vals := strings.Split(key.Name, "."); len(vals) == 2 { + key.Extension = "." + vals[1] + key.Name = strings.TrimSuffix(key.Name, key.Extension) + } + return } // replayItems - Reads the items from the store and replays. @@ -74,18 +116,12 @@ func replayItems[I any](store Store[I], doneCh <-chan struct{}, log logger, id s defer retryTicker.Stop() for { - names, err := store.List() - if err != nil { - log(context.Background(), fmt.Errorf("store.List() failed with: %w", err), id) - } else { - keyCount := len(names) - for i, name := range names { - select { - case keyCh <- Key{strings.TrimSuffix(name, store.Extension()), keyCount == i+1}: - // Get next key. - case <-doneCh: - return - } + for _, key := range store.List() { + select { + case keyCh <- key: + // Get next key. + case <-doneCh: + return } } @@ -114,7 +150,7 @@ func sendItems(target Target, keyCh <-chan Key, doneCh <-chan struct{}, logger l logger( context.Background(), - fmt.Errorf("unable to send webhook log entry to '%s' err '%w'", target.Name(), err), + fmt.Errorf("unable to send log entry to '%s' err '%w'", target.Name(), err), target.Name(), ) diff --git a/internal/store/store_test.go b/internal/store/store_test.go new file mode 100644 index 000000000..4c05f3d73 --- /dev/null +++ b/internal/store/store_test.go @@ -0,0 +1,146 @@ +// Copyright (c) 2015-2024 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package store + +import ( + "testing" +) + +func TestKeyString(t *testing.T) { + testCases := []struct { + key Key + expectedString string + }{ + { + key: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Extension: ".event", + }, + expectedString: "01894394-d046-4783-ba0d-f1c6885790dc.event", + }, + { + key: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Compress: true, + Extension: ".event", + ItemCount: 100, + }, + expectedString: "100:01894394-d046-4783-ba0d-f1c6885790dc.event.snappy", + }, + { + key: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Extension: ".event", + ItemCount: 100, + }, + expectedString: "100:01894394-d046-4783-ba0d-f1c6885790dc.event", + }, + { + key: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Compress: true, + Extension: ".event", + ItemCount: 1, + }, + expectedString: "01894394-d046-4783-ba0d-f1c6885790dc.event.snappy", + }, + { + key: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Extension: ".event", + ItemCount: 1, + }, + expectedString: "01894394-d046-4783-ba0d-f1c6885790dc.event", + }, + } + + for i, testCase := range testCases { + if testCase.expectedString != testCase.key.String() { + t.Fatalf("case[%v]: key.String() Expected: %s, got %s", i, testCase.expectedString, testCase.key.String()) + } + } +} + +func TestParseKey(t *testing.T) { + testCases := []struct { + str string + expectedKey Key + }{ + { + str: "01894394-d046-4783-ba0d-f1c6885790dc.event", + expectedKey: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Extension: ".event", + ItemCount: 1, + }, + }, + { + str: "100:01894394-d046-4783-ba0d-f1c6885790dc.event.snappy", + expectedKey: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Compress: true, + Extension: ".event", + ItemCount: 100, + }, + }, + { + str: "100:01894394-d046-4783-ba0d-f1c6885790dc.event", + expectedKey: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Extension: ".event", + ItemCount: 100, + }, + }, + { + str: "01894394-d046-4783-ba0d-f1c6885790dc.event.snappy", + expectedKey: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Compress: true, + Extension: ".event", + ItemCount: 1, + }, + }, + { + str: "01894394-d046-4783-ba0d-f1c6885790dc.event", + expectedKey: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Extension: ".event", + ItemCount: 1, + }, + }, + } + + for i, testCase := range testCases { + key := parseKey(testCase.str) + if testCase.expectedKey.Name != key.Name { + t.Fatalf("case[%v]: Expected key.Name: %v, got %v", i, testCase.expectedKey.Name, key.Name) + } + if testCase.expectedKey.Compress != key.Compress { + t.Fatalf("case[%v]: Expected key.Compress: %v, got %v", i, testCase.expectedKey.Compress, key.Compress) + } + if testCase.expectedKey.Extension != key.Extension { + t.Fatalf("case[%v]: Expected key.Extension: %v, got %v", i, testCase.expectedKey.Extension, key.Extension) + } + if testCase.expectedKey.ItemCount != key.ItemCount { + t.Fatalf("case[%v]: Expected key.ItemCount: %v, got %v", i, testCase.expectedKey.ItemCount, key.ItemCount) + } + if testCase.expectedKey.String() != key.String() { + t.Fatalf("case[%v]: Expected key.String(): %v, got %v", i, testCase.expectedKey.String(), key.String()) + } + } +}