From 261111e728d3ecadcdaa200cf77710763d3aad54 Mon Sep 17 00:00:00 2001 From: Praveen raj Mani Date: Sat, 7 Sep 2024 04:36:30 +0530 Subject: [PATCH] Kafka notify: support batched commits for queue store (#20377) The items will be saved per target batch and will be committed to the queue store when the batch is full Also, periodically commit the batched items to the queue store based on configured commit_timeout; default is 30s; Bonus: compress queue store multi writes --- internal/config/notify/help.go | 12 ++ internal/config/notify/parse.go | 27 ++- internal/event/target/amqp.go | 7 +- internal/event/target/elasticsearch.go | 7 +- internal/event/target/kafka.go | 178 ++++++++++---------- internal/event/target/mqtt.go | 7 +- internal/event/target/mysql.go | 7 +- internal/event/target/nats.go | 7 +- internal/event/target/nsq.go | 7 +- internal/event/target/postgresql.go | 7 +- internal/event/target/redis.go | 7 +- internal/event/target/webhook.go | 7 +- internal/logger/target/http/http.go | 6 +- internal/logger/target/kafka/kafka.go | 7 +- internal/store/batch.go | 149 ++++++++++------- internal/store/batch_test.go | 223 ++++++++++++++++++------- internal/store/queuestore.go | 206 +++++++++++++++-------- internal/store/queuestore_test.go | 205 +++++++++++++++++------ internal/store/store.go | 82 ++++++--- internal/store/store_test.go | 146 ++++++++++++++++ 20 files changed, 907 insertions(+), 397 deletions(-) create mode 100644 internal/store/store_test.go diff --git a/internal/config/notify/help.go b/internal/config/notify/help.go index 4d07af917..343f46c7b 100644 --- a/internal/config/notify/help.go +++ b/internal/config/notify/help.go @@ -274,6 +274,18 @@ var ( Optional: true, Type: "number", }, + config.HelpKV{ + Key: target.KafkaBatchSize, + Description: "batch size of the events; used only when queue_dir is set", + Optional: true, + Type: "number", + }, + config.HelpKV{ + Key: target.KafkaBatchCommitTimeout, + Description: "commit timeout set for the batch; used only when batch_size > 1", + Optional: true, + Type: "duration", + }, } HelpMQTT = config.HelpKVS{ diff --git a/internal/config/notify/parse.go b/internal/config/notify/parse.go index 75091a67c..0f7dc4404 100644 --- a/internal/config/notify/parse.go +++ b/internal/config/notify/parse.go @@ -374,6 +374,10 @@ var ( Key: target.KafkaBatchSize, Value: "0", }, + config.KV{ + Key: target.KafkaBatchCommitTimeout, + Value: "0s", + }, config.KV{ Key: target.KafkaCompressionCodec, Value: "", @@ -463,14 +467,23 @@ func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs return nil, err } + batchCommitTimeoutEnv := target.EnvKafkaBatchCommitTimeout + if k != config.Default { + batchCommitTimeoutEnv = batchCommitTimeoutEnv + config.Default + k + } + batchCommitTimeout, err := time.ParseDuration(env.Get(batchCommitTimeoutEnv, kv.Get(target.KafkaBatchCommitTimeout))) + if err != nil { + return nil, err + } kafkaArgs := target.KafkaArgs{ - Enable: enabled, - Brokers: brokers, - Topic: env.Get(topicEnv, kv.Get(target.KafkaTopic)), - QueueDir: env.Get(queueDirEnv, kv.Get(target.KafkaQueueDir)), - QueueLimit: queueLimit, - Version: env.Get(versionEnv, kv.Get(target.KafkaVersion)), - BatchSize: uint32(batchSize), + Enable: enabled, + Brokers: brokers, + Topic: env.Get(topicEnv, kv.Get(target.KafkaTopic)), + QueueDir: env.Get(queueDirEnv, kv.Get(target.KafkaQueueDir)), + QueueLimit: queueLimit, + Version: env.Get(versionEnv, kv.Get(target.KafkaVersion)), + BatchSize: uint32(batchSize), + BatchCommitTimeout: batchCommitTimeout, } tlsEnableEnv := target.EnvKafkaTLS diff --git a/internal/event/target/amqp.go b/internal/event/target/amqp.go index 86f48f21a..c987088da 100644 --- a/internal/event/target/amqp.go +++ b/internal/event/target/amqp.go @@ -276,7 +276,8 @@ func (target *AMQPTarget) send(eventData event.Event, ch *amqp091.Channel, confi // Save - saves the events to the store which will be replayed when the amqp connection is active. func (target *AMQPTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { return err @@ -302,7 +303,7 @@ func (target *AMQPTarget) SendFromStore(key store.Key) error { } defer ch.Close() - eventData, eErr := target.store.Get(key.Name) + eventData, eErr := target.store.Get(key) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and wouldve been already been sent successfully. @@ -317,7 +318,7 @@ func (target *AMQPTarget) SendFromStore(key store.Key) error { } // Delete the event from store. - return target.store.Del(key.Name) + return target.store.Del(key) } // Close - does nothing and available for interface compatibility. diff --git a/internal/event/target/elasticsearch.go b/internal/event/target/elasticsearch.go index 69116d39a..35532334e 100644 --- a/internal/event/target/elasticsearch.go +++ b/internal/event/target/elasticsearch.go @@ -202,7 +202,8 @@ func (target *ElasticsearchTarget) isActive() (bool, error) { // Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active. func (target *ElasticsearchTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { return err @@ -278,7 +279,7 @@ func (target *ElasticsearchTarget) SendFromStore(key store.Key) error { return err } - eventData, eErr := target.store.Get(key.Name) + eventData, eErr := target.store.Get(key) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and wouldve been already been sent successfully. @@ -296,7 +297,7 @@ func (target *ElasticsearchTarget) SendFromStore(key store.Key) error { } // Delete the event from store. - return target.store.Del(key.Name) + return target.store.Del(key) } // Close - does nothing and available for interface compatibility. diff --git a/internal/event/target/kafka.go b/internal/event/target/kafka.go index 2ebddd321..d6af69b8b 100644 --- a/internal/event/target/kafka.go +++ b/internal/event/target/kafka.go @@ -43,23 +43,24 @@ import ( // Kafka input constants const ( - KafkaBrokers = "brokers" - KafkaTopic = "topic" - KafkaQueueDir = "queue_dir" - KafkaQueueLimit = "queue_limit" - KafkaTLS = "tls" - KafkaTLSSkipVerify = "tls_skip_verify" - KafkaTLSClientAuth = "tls_client_auth" - KafkaSASL = "sasl" - KafkaSASLUsername = "sasl_username" - KafkaSASLPassword = "sasl_password" - KafkaSASLMechanism = "sasl_mechanism" - KafkaClientTLSCert = "client_tls_cert" - KafkaClientTLSKey = "client_tls_key" - KafkaVersion = "version" - KafkaBatchSize = "batch_size" - KafkaCompressionCodec = "compression_codec" - KafkaCompressionLevel = "compression_level" + KafkaBrokers = "brokers" + KafkaTopic = "topic" + KafkaQueueDir = "queue_dir" + KafkaQueueLimit = "queue_limit" + KafkaTLS = "tls" + KafkaTLSSkipVerify = "tls_skip_verify" + KafkaTLSClientAuth = "tls_client_auth" + KafkaSASL = "sasl" + KafkaSASLUsername = "sasl_username" + KafkaSASLPassword = "sasl_password" + KafkaSASLMechanism = "sasl_mechanism" + KafkaClientTLSCert = "client_tls_cert" + KafkaClientTLSKey = "client_tls_key" + KafkaVersion = "version" + KafkaBatchSize = "batch_size" + KafkaBatchCommitTimeout = "batch_commit_timeout" + KafkaCompressionCodec = "compression_codec" + KafkaCompressionLevel = "compression_level" EnvKafkaEnable = "MINIO_NOTIFY_KAFKA_ENABLE" EnvKafkaBrokers = "MINIO_NOTIFY_KAFKA_BROKERS" @@ -77,6 +78,7 @@ const ( EnvKafkaClientTLSKey = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY" EnvKafkaVersion = "MINIO_NOTIFY_KAFKA_VERSION" EnvKafkaBatchSize = "MINIO_NOTIFY_KAFKA_BATCH_SIZE" + EnvKafkaBatchCommitTimeout = "MINIO_NOTIFY_KAFKA_BATCH_COMMIT_TIMEOUT" EnvKafkaProducerCompressionCodec = "MINIO_NOTIFY_KAFKA_PRODUCER_COMPRESSION_CODEC" EnvKafkaProducerCompressionLevel = "MINIO_NOTIFY_KAFKA_PRODUCER_COMPRESSION_LEVEL" ) @@ -91,14 +93,15 @@ var codecs = map[string]sarama.CompressionCodec{ // KafkaArgs - Kafka target arguments. type KafkaArgs struct { - Enable bool `json:"enable"` - Brokers []xnet.Host `json:"brokers"` - Topic string `json:"topic"` - QueueDir string `json:"queueDir"` - QueueLimit uint64 `json:"queueLimit"` - Version string `json:"version"` - BatchSize uint32 `json:"batchSize"` - TLS struct { + Enable bool `json:"enable"` + Brokers []xnet.Host `json:"brokers"` + Topic string `json:"topic"` + QueueDir string `json:"queueDir"` + QueueLimit uint64 `json:"queueLimit"` + Version string `json:"version"` + BatchSize uint32 `json:"batchSize"` + BatchCommitTimeout time.Duration `json:"batchCommitTimeout"` + TLS struct { Enable bool `json:"enable"` RootCAs *x509.CertPool `json:"-"` SkipVerify bool `json:"skipVerify"` @@ -146,6 +149,11 @@ func (k KafkaArgs) Validate() error { return errors.New("batch should be enabled only if queue dir is enabled") } } + if k.BatchCommitTimeout > 0 { + if k.QueueDir == "" || k.BatchSize <= 1 { + return errors.New("batch commit timeout should be set only if queue dir is enabled and batch size > 1") + } + } return nil } @@ -159,7 +167,7 @@ type KafkaTarget struct { producer sarama.SyncProducer config *sarama.Config store store.Store[event.Event] - batch *store.Batch[string, *sarama.ProducerMessage] + batch *store.Batch[event.Event] loggerOnce logger.LogOnce quitCh chan struct{} } @@ -199,7 +207,11 @@ func (target *KafkaTarget) isActive() (bool, error) { // Save - saves the events to the store which will be replayed when the Kafka connection is active. func (target *KafkaTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + if target.batch != nil { + return target.batch.Add(eventData) + } + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { return err @@ -220,80 +232,59 @@ func (target *KafkaTarget) send(eventData event.Event) error { return err } -// SendFromStore - reads an event from store and sends it to Kafka. -func (target *KafkaTarget) SendFromStore(key store.Key) error { - if err := target.init(); err != nil { - return err +// sendMultiple sends multiple messages to the kafka. +func (target *KafkaTarget) sendMultiple(events []event.Event) error { + if target.producer == nil { + return store.ErrNotConnected } - - // If batch is enabled, the event will be batched in memory - // and will be committed once the batch is full. - if target.batch != nil { - return target.addToBatch(key) - } - - eventData, eErr := target.store.Get(key.Name) - if eErr != nil { - // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() - // Such events will not exist and wouldve been already been sent successfully. - if os.IsNotExist(eErr) { - return nil - } - return eErr - } - - if err := target.send(eventData); err != nil { - if isKafkaConnErr(err) { - return store.ErrNotConnected - } - return err - } - - // Delete the event from store. - return target.store.Del(key.Name) -} - -func (target *KafkaTarget) addToBatch(key store.Key) error { - if target.batch.IsFull() { - if err := target.commitBatch(); err != nil { + var msgs []*sarama.ProducerMessage + for _, event := range events { + msg, err := target.toProducerMessage(event) + if err != nil { return err } + msgs = append(msgs, msg) } - if _, ok := target.batch.GetByKey(key.Name); !ok { - eventData, err := target.store.Get(key.Name) + return target.producer.SendMessages(msgs) +} + +// SendFromStore - reads an event from store and sends it to Kafka. +func (target *KafkaTarget) SendFromStore(key store.Key) (err error) { + if err = target.init(); err != nil { + return err + } + switch { + case key.ItemCount == 1: + var event event.Event + event, err = target.store.Get(key) + if err != nil { + // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() + // Such events will not exist and wouldve been already been sent successfully. + if os.IsNotExist(err) { + return nil + } + return err + } + err = target.send(event) + case key.ItemCount > 1: + var events []event.Event + events, err = target.store.GetMultiple(key) if err != nil { if os.IsNotExist(err) { return nil } return err } - msg, err := target.toProducerMessage(eventData) - if err != nil { - return err - } - if err = target.batch.Add(key.Name, msg); err != nil { - return err - } + err = target.sendMultiple(events) } - // commit the batch if the key is the last one present in the store. - if key.IsLast || target.batch.IsFull() { - return target.commitBatch() - } - return nil -} - -func (target *KafkaTarget) commitBatch() error { - keys, msgs, err := target.batch.GetAll() if err != nil { - return err - } - if err = target.producer.SendMessages(msgs); err != nil { if isKafkaConnErr(err) { return store.ErrNotConnected } return err } - return target.store.DelList(keys) + // Delete the event from store. + return target.store.Del(key) } func (target *KafkaTarget) toProducerMessage(eventData event.Event) (*sarama.ProducerMessage, error) { @@ -319,7 +310,18 @@ func (target *KafkaTarget) toProducerMessage(eventData event.Event) (*sarama.Pro func (target *KafkaTarget) Close() error { close(target.quitCh) + if target.batch != nil { + target.batch.Close() + } + if target.producer != nil { + if target.store != nil { + // It is safe to abort the current transaction if + // queue_dir is configured + target.producer.AbortTxn() + } else { + target.producer.CommitTxn() + } target.producer.Close() return target.client.Close() } @@ -442,10 +444,14 @@ func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*Kafk loggerOnce: loggerOnce, quitCh: make(chan struct{}), } - if target.store != nil { if args.BatchSize > 1 { - target.batch = store.NewBatch[string, *sarama.ProducerMessage](args.BatchSize) + target.batch = store.NewBatch[event.Event](store.BatchConfig[event.Event]{ + Limit: args.BatchSize, + Log: loggerOnce, + Store: queueStore, + CommitTimeout: args.BatchCommitTimeout, + }) } store.StreamItems(target.store, target, target.quitCh, target.loggerOnce) } diff --git a/internal/event/target/mqtt.go b/internal/event/target/mqtt.go index b390a6834..8f568cd3a 100644 --- a/internal/event/target/mqtt.go +++ b/internal/event/target/mqtt.go @@ -180,7 +180,7 @@ func (target *MQTTTarget) SendFromStore(key store.Key) error { return err } - eventData, err := target.store.Get(key.Name) + eventData, err := target.store.Get(key) if err != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and wouldve been already been sent successfully. @@ -195,14 +195,15 @@ func (target *MQTTTarget) SendFromStore(key store.Key) error { } // Delete the event from store. - return target.store.Del(key.Name) + return target.store.Del(key) } // Save - saves the events to the store if queuestore is configured, which will // be replayed when the mqtt connection is active. func (target *MQTTTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { return err diff --git a/internal/event/target/mysql.go b/internal/event/target/mysql.go index 11f419ab5..acccedd89 100644 --- a/internal/event/target/mysql.go +++ b/internal/event/target/mysql.go @@ -198,7 +198,8 @@ func (target *MySQLTarget) isActive() (bool, error) { // Save - saves the events to the store which will be replayed when the SQL connection is active. func (target *MySQLTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { return err @@ -273,7 +274,7 @@ func (target *MySQLTarget) SendFromStore(key store.Key) error { } } - eventData, eErr := target.store.Get(key.Name) + eventData, eErr := target.store.Get(key) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and wouldve been already been sent successfully. @@ -291,7 +292,7 @@ func (target *MySQLTarget) SendFromStore(key store.Key) error { } // Delete the event from store. - return target.store.Del(key.Name) + return target.store.Del(key) } // Close - closes underneath connections to MySQL database. diff --git a/internal/event/target/nats.go b/internal/event/target/nats.go index d6d781d73..b01aa141b 100644 --- a/internal/event/target/nats.go +++ b/internal/event/target/nats.go @@ -299,7 +299,8 @@ func (target *NATSTarget) isActive() (bool, error) { // Save - saves the events to the store which will be replayed when the Nats connection is active. func (target *NATSTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { @@ -353,7 +354,7 @@ func (target *NATSTarget) SendFromStore(key store.Key) error { return err } - eventData, eErr := target.store.Get(key.Name) + eventData, eErr := target.store.Get(key) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and wouldve been already been sent successfully. @@ -367,7 +368,7 @@ func (target *NATSTarget) SendFromStore(key store.Key) error { return err } - return target.store.Del(key.Name) + return target.store.Del(key) } // Close - closes underneath connections to NATS server. diff --git a/internal/event/target/nsq.go b/internal/event/target/nsq.go index cc95f288e..a44cae108 100644 --- a/internal/event/target/nsq.go +++ b/internal/event/target/nsq.go @@ -147,7 +147,8 @@ func (target *NSQTarget) isActive() (bool, error) { // Save - saves the events to the store which will be replayed when the nsq connection is active. func (target *NSQTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { @@ -188,7 +189,7 @@ func (target *NSQTarget) SendFromStore(key store.Key) error { return err } - eventData, eErr := target.store.Get(key.Name) + eventData, eErr := target.store.Get(key) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and wouldve been already been sent successfully. @@ -203,7 +204,7 @@ func (target *NSQTarget) SendFromStore(key store.Key) error { } // Delete the event from store. - return target.store.Del(key.Name) + return target.store.Del(key) } // Close - closes underneath connections to NSQD server. diff --git a/internal/event/target/postgresql.go b/internal/event/target/postgresql.go index e46a766e3..1a99db673 100644 --- a/internal/event/target/postgresql.go +++ b/internal/event/target/postgresql.go @@ -196,7 +196,8 @@ func (target *PostgreSQLTarget) isActive() (bool, error) { // Save - saves the events to the store if questore is configured, which will be replayed when the PostgreSQL connection is active. func (target *PostgreSQLTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { @@ -275,7 +276,7 @@ func (target *PostgreSQLTarget) SendFromStore(key store.Key) error { } } - eventData, eErr := target.store.Get(key.Name) + eventData, eErr := target.store.Get(key) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and wouldve been already been sent successfully. @@ -293,7 +294,7 @@ func (target *PostgreSQLTarget) SendFromStore(key store.Key) error { } // Delete the event from store. - return target.store.Del(key.Name) + return target.store.Del(key) } // Close - closes underneath connections to PostgreSQL database. diff --git a/internal/event/target/redis.go b/internal/event/target/redis.go index b403367d6..78d6c7555 100644 --- a/internal/event/target/redis.go +++ b/internal/event/target/redis.go @@ -173,7 +173,8 @@ func (target *RedisTarget) isActive() (bool, error) { // Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active. func (target *RedisTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { return err @@ -252,7 +253,7 @@ func (target *RedisTarget) SendFromStore(key store.Key) error { target.firstPing = true } - eventData, eErr := target.store.Get(key.Name) + eventData, eErr := target.store.Get(key) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and would've been already been sent successfully. @@ -270,7 +271,7 @@ func (target *RedisTarget) SendFromStore(key store.Key) error { } // Delete the event from store. - return target.store.Del(key.Name) + return target.store.Del(key) } // Close - releases the resources used by the pool. diff --git a/internal/event/target/webhook.go b/internal/event/target/webhook.go index 4935b4061..e5dc4f699 100644 --- a/internal/event/target/webhook.go +++ b/internal/event/target/webhook.go @@ -146,7 +146,8 @@ func (target *WebhookTarget) isActive() (bool, error) { // which will be replayed when the webhook connection is active. func (target *WebhookTarget) Save(eventData event.Event) error { if target.store != nil { - return target.store.Put(eventData) + _, err := target.store.Put(eventData) + return err } if err := target.init(); err != nil { return err @@ -213,7 +214,7 @@ func (target *WebhookTarget) SendFromStore(key store.Key) error { return err } - eventData, eErr := target.store.Get(key.Name) + eventData, eErr := target.store.Get(key) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and would've been already been sent successfully. @@ -231,7 +232,7 @@ func (target *WebhookTarget) SendFromStore(key store.Key) error { } // Delete the event from store. - return target.store.Del(key.Name) + return target.store.Del(key) } // Close - does nothing and available for interface compatibility. diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 2b37db57e..182a1dc0b 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -418,7 +418,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { if !isDirQueue { err = h.send(ctx, buf.Bytes(), count, h.payloadType, webhookCallTimeout) } else { - err = h.store.PutMultiple(entries) + _, err = h.store.PutMultiple(entries) } if err != nil { @@ -530,7 +530,7 @@ func New(config Config) (*Target, error) { // SendFromStore - reads the log from store and sends it to webhook. func (h *Target) SendFromStore(key store.Key) (err error) { var eventData []byte - eventData, err = h.store.GetRaw(key.Name) + eventData, err = h.store.GetRaw(key) if err != nil { if os.IsNotExist(err) { return nil @@ -552,7 +552,7 @@ func (h *Target) SendFromStore(key store.Key) (err error) { } // Delete the event from store. - return h.store.Del(key.Name) + return h.store.Del(key) } // Send the log message 'entry' to the http target. diff --git a/internal/logger/target/kafka/kafka.go b/internal/logger/target/kafka/kafka.go index 29a0bf013..c66920419 100644 --- a/internal/logger/target/kafka/kafka.go +++ b/internal/logger/target/kafka/kafka.go @@ -315,7 +315,8 @@ func (h *Target) IsOnline(_ context.Context) bool { func (h *Target) Send(ctx context.Context, entry interface{}) error { if h.store != nil { // save the entry to the queue store which will be replayed to the target. - return h.store.Put(entry) + _, err := h.store.Put(entry) + return err } h.logChMu.RLock() defer h.logChMu.RUnlock() @@ -344,7 +345,7 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error { // SendFromStore - reads the log from store and sends it to kafka. func (h *Target) SendFromStore(key store.Key) (err error) { - auditEntry, err := h.store.Get(key.Name) + auditEntry, err := h.store.Get(key) if err != nil { if os.IsNotExist(err) { return nil @@ -358,7 +359,7 @@ func (h *Target) SendFromStore(key store.Key) (err error) { return } // Delete the event from store. - return h.store.Del(key.Name) + return h.store.Del(key) } // Cancel - cancels the target diff --git a/internal/store/batch.go b/internal/store/batch.go index cba034f4b..bb31135c9 100644 --- a/internal/store/batch.go +++ b/internal/store/batch.go @@ -18,100 +18,123 @@ package store import ( + "context" "errors" - "fmt" "sync" + "time" ) // ErrBatchFull indicates that the batch is full var ErrBatchFull = errors.New("batch is full") -type key interface { - string | int | int64 -} +const defaultCommitTimeout = 30 * time.Second // Batch represents an ordered batch -type Batch[K key, T any] struct { - keys []K - items map[K]T - limit uint32 +type Batch[I any] struct { + items []I + limit uint32 + store Store[I] + quitCh chan struct{} sync.Mutex } +// BatchConfig represents the batch config +type BatchConfig[I any] struct { + Limit uint32 + Store Store[I] + CommitTimeout time.Duration + Log logger +} + // Add adds the item to the batch -func (b *Batch[K, T]) Add(key K, item T) error { +func (b *Batch[I]) Add(item I) error { b.Lock() defer b.Unlock() if b.isFull() { - return ErrBatchFull + if b.store == nil { + return ErrBatchFull + } + // commit batch to store + if err := b.commit(); err != nil { + return err + } } - if _, ok := b.items[key]; !ok { - b.keys = append(b.keys, key) - } - b.items[key] = item - + b.items = append(b.items, item) return nil } -// GetAll fetches the items and resets the batch -// Returned items are not referenced by the batch -func (b *Batch[K, T]) GetAll() (orderedKeys []K, orderedItems []T, err error) { - b.Lock() - defer b.Unlock() - - orderedKeys = append([]K(nil), b.keys...) - for _, key := range orderedKeys { - item, ok := b.items[key] - if !ok { - err = fmt.Errorf("item not found for the key: %v; should not happen;", key) - return - } - orderedItems = append(orderedItems, item) - delete(b.items, key) - } - - b.keys = b.keys[:0] - - return -} - -// GetByKey will get the batch item by the provided key -func (b *Batch[K, T]) GetByKey(key K) (T, bool) { - b.Lock() - defer b.Unlock() - - item, ok := b.items[key] - return item, ok -} - // Len returns the no of items in the batch -func (b *Batch[K, T]) Len() int { +func (b *Batch[_]) Len() int { b.Lock() defer b.Unlock() - return len(b.keys) + return len(b.items) } -// IsFull checks if the batch is full or not -func (b *Batch[K, T]) IsFull() bool { - b.Lock() - defer b.Unlock() - - return b.isFull() -} - -func (b *Batch[K, T]) isFull() bool { +func (b *Batch[_]) isFull() bool { return len(b.items) >= int(b.limit) } -// NewBatch creates a new batch -func NewBatch[K key, T any](limit uint32) *Batch[K, T] { - return &Batch[K, T]{ - keys: make([]K, 0, limit), - items: make(map[K]T, limit), - limit: limit, +func (b *Batch[I]) commit() error { + switch len(b.items) { + case 0: + return nil + case 1: + _, err := b.store.Put(b.items[0]) + return err + default: } + if _, err := b.store.PutMultiple(b.items); err != nil { + return err + } + b.items = make([]I, 0, b.limit) + return nil +} + +// Close commits the pending items and quits the goroutines +func (b *Batch[I]) Close() error { + defer func() { + close(b.quitCh) + }() + + b.Lock() + defer b.Unlock() + return b.commit() +} + +// NewBatch creates a new batch +func NewBatch[I any](config BatchConfig[I]) *Batch[I] { + if config.CommitTimeout == 0 { + config.CommitTimeout = defaultCommitTimeout + } + quitCh := make(chan struct{}) + batch := &Batch[I]{ + items: make([]I, 0, config.Limit), + limit: config.Limit, + store: config.Store, + quitCh: quitCh, + } + if batch.store != nil { + go func() { + commitTicker := time.NewTicker(config.CommitTimeout) + defer commitTicker.Stop() + for { + select { + case <-commitTicker.C: + case <-batch.quitCh: + return + } + batch.Lock() + err := batch.commit() + batch.Unlock() + if err != nil { + config.Log(context.Background(), err, "") + } + } + }() + } + return batch } diff --git a/internal/store/batch_test.go b/internal/store/batch_test.go index db8b95dc8..faf9d4556 100644 --- a/internal/store/batch_test.go +++ b/internal/store/batch_test.go @@ -18,109 +18,202 @@ package store import ( - "errors" + "context" "sync" "testing" + "time" ) -func TestBatch(t *testing.T) { +func TestBatchCommit(t *testing.T) { + defer func() { + if err := tearDownQueueStore(); err != nil { + t.Fatalf("Failed to tear down store; %v", err) + } + }() + store, err := setUpQueueStore(queueDir, 100) + if err != nil { + t.Fatalf("Failed to create a queue store; %v", err) + } + var limit uint32 = 100 - batch := NewBatch[int, int](limit) + + batch := NewBatch[TestItem](BatchConfig[TestItem]{ + Limit: limit, + Store: store, + CommitTimeout: 5 * time.Minute, + Log: func(ctx context.Context, err error, id string, errKind ...interface{}) { + t.Log(err) + }, + }) + defer batch.Close() + for i := 0; i < int(limit); i++ { - if err := batch.Add(i, i); err != nil { + if err := batch.Add(testItem); err != nil { t.Fatalf("failed to add %v; %v", i, err) } - if _, ok := batch.GetByKey(i); !ok { - t.Fatalf("failed to get the item by key %v after adding", i) - } - } - err := batch.Add(101, 101) - if err == nil || !errors.Is(err, ErrBatchFull) { - t.Fatalf("Expected err %v but got %v", ErrBatchFull, err) - } - if !batch.IsFull() { - t.Fatal("Expected batch.IsFull to be true but got false") } + batchLen := batch.Len() if batchLen != int(limit) { - t.Fatalf("expected batch length to be %v but got %v", limit, batchLen) + t.Fatalf("Expected batch.Len() %v; but got %v", limit, batchLen) } - keys, items, err := batch.GetAll() - if err != nil { - t.Fatalf("unable to get the items from the batch; %v", err) + + keys := store.List() + if len(keys) > 0 { + t.Fatalf("Expected empty store list but got len(list) %v", len(keys)) } - if len(items) != int(limit) { - t.Fatalf("Expected length of the batch items to be %v but got %v", limit, len(items)) - } - if len(keys) != int(limit) { - t.Fatalf("Expected length of the batch keys to be %v but got %v", limit, len(items)) + if err := batch.Add(testItem); err != nil { + t.Fatalf("unable to add to the batch; %v", err) } batchLen = batch.Len() - if batchLen != 0 { - t.Fatalf("expected batch to be empty but still left with %d items", batchLen) - } - // Add duplicate entries - for i := 0; i < 10; i++ { - if err := batch.Add(99, 99); err != nil { - t.Fatalf("failed to add duplicate item %v to batch after Get; %v", i, err) - } - } - if _, ok := batch.GetByKey(99); !ok { - t.Fatal("failed to get the duplicxate item by key '99' after adding") - } - keys, items, err = batch.GetAll() - if err != nil { - t.Fatalf("unable to get the items from the batch; %v", err) - } - if len(items) != 1 { - t.Fatalf("Expected length of the batch items to be 1 but got %v", len(items)) + if batchLen != 1 { + t.Fatalf("expected batch length to be 1 but got %v", batchLen) } + keys = store.List() if len(keys) != 1 { - t.Fatalf("Expected length of the batch keys to be 1 but got %v", len(items)) + t.Fatalf("expected len(store.List())=1; but got %v", len(keys)) } - // try adding again after Get. + key := keys[0] + if !key.Compress { + t.Fatal("expected key.Compress=true; but got false") + } + if key.ItemCount != int(limit) { + t.Fatalf("expected key.ItemCount=%d; but got %v", limit, key.ItemCount) + } + items, err := store.GetMultiple(key) + if err != nil { + t.Fatalf("unable to read key %v; %v", key.String(), err) + } + if len(items) != int(limit) { + t.Fatalf("expected len(items)=%d; but got %v", limit, len(items)) + } +} + +func TestBatchCommitOnExit(t *testing.T) { + defer func() { + if err := tearDownQueueStore(); err != nil { + t.Fatalf("Failed to tear down store; %v", err) + } + }() + store, err := setUpQueueStore(queueDir, 100) + if err != nil { + t.Fatalf("Failed to create a queue store; %v", err) + } + + var limit uint32 = 100 + + batch := NewBatch[TestItem](BatchConfig[TestItem]{ + Limit: limit, + Store: store, + CommitTimeout: 5 * time.Minute, + Log: func(ctx context.Context, err error, id string, errKind ...interface{}) { + t.Log([]any{err, id, errKind}...) + }, + }) + for i := 0; i < int(limit); i++ { - if err := batch.Add(i, i); err != nil { - t.Fatalf("failed to add item %v to batch after Get; %v", i, err) - } - if _, ok := batch.GetByKey(i); !ok { - t.Fatalf("failed to get the item by key %v after adding", i) + if err := batch.Add(testItem); err != nil { + t.Fatalf("failed to add %v; %v", i, err) } } + + batch.Close() + time.Sleep(1 * time.Second) + + batchLen := batch.Len() + if batchLen != 0 { + t.Fatalf("Expected batch.Len()=0; but got %v", batchLen) + } + + keys := store.List() + if len(keys) != 1 { + t.Fatalf("expected len(store.List())=1; but got %v", len(keys)) + } + + key := keys[0] + if !key.Compress { + t.Fatal("expected key.Compress=true; but got false") + } + if key.ItemCount != int(limit) { + t.Fatalf("expected key.ItemCount=%d; but got %v", limit, key.ItemCount) + } + items, err := store.GetMultiple(key) + if err != nil { + t.Fatalf("unable to read key %v; %v", key.String(), err) + } + if len(items) != int(limit) { + t.Fatalf("expected len(items)=%d; but got %v", limit, len(items)) + } } func TestBatchWithConcurrency(t *testing.T) { + defer func() { + if err := tearDownQueueStore(); err != nil { + t.Fatalf("Failed to tear down store; %v", err) + } + }() + store, err := setUpQueueStore(queueDir, 100) + if err != nil { + t.Fatalf("Failed to create a queue store; %v", err) + } + var limit uint32 = 100 - batch := NewBatch[int, int](limit) + + batch := NewBatch[TestItem](BatchConfig[TestItem]{ + Limit: limit, + Store: store, + CommitTimeout: 5 * time.Minute, + Log: func(ctx context.Context, err error, id string, errKind ...interface{}) { + t.Log(err) + }, + }) + defer batch.Close() var wg sync.WaitGroup for i := 0; i < int(limit); i++ { wg.Add(1) - go func(item int) { + go func(key int) { defer wg.Done() - if err := batch.Add(item, item); err != nil { - t.Errorf("failed to add item %v; %v", item, err) + if err := batch.Add(testItem); err != nil { + t.Errorf("failed to add item %v; %v", key, err) return } - if _, ok := batch.GetByKey(item); !ok { - t.Errorf("failed to get the item by key %v after adding", item) - } }(i) } wg.Wait() - keys, items, err := batch.GetAll() + batchLen := batch.Len() + if batchLen != int(limit) { + t.Fatalf("Expected batch.Len() %v; but got %v", limit, batchLen) + } + + keys := store.List() + if len(keys) > 0 { + t.Fatalf("Expected empty store list but got len(list) %v", len(keys)) + } + if err := batch.Add(testItem); err != nil { + t.Fatalf("unable to add to the batch; %v", err) + } + batchLen = batch.Len() + if batchLen != 1 { + t.Fatalf("expected batch length to be 1 but got %v", batchLen) + } + keys = store.List() + if len(keys) != 1 { + t.Fatalf("expected len(store.List())=1; but got %v", len(keys)) + } + key := keys[0] + if !key.Compress { + t.Fatal("expected key.Compress=true; but got false") + } + if key.ItemCount != int(limit) { + t.Fatalf("expected key.ItemCount=%d; but got %v", limit, key.ItemCount) + } + items, err := store.GetMultiple(key) if err != nil { - t.Fatalf("unable to get the items from the batch; %v", err) + t.Fatalf("unable to read key %v; %v", key.String(), err) } if len(items) != int(limit) { - t.Fatalf("expected batch length %v but got %v", limit, len(items)) - } - if len(keys) != int(limit) { - t.Fatalf("Expected length of the batch keys to be %v but got %v", limit, len(items)) - } - batchLen := batch.Len() - if batchLen != 0 { - t.Fatalf("expected batch to be empty but still left with %d items", batchLen) + t.Fatalf("expected len(items)=%d; but got %v", limit, len(items)) } } diff --git a/internal/store/queuestore.go b/internal/store/queuestore.go index 002202f49..1ee6278bb 100644 --- a/internal/store/queuestore.go +++ b/internal/store/queuestore.go @@ -18,24 +18,25 @@ package store import ( + "bytes" "encoding/json" "errors" - "fmt" "os" "path/filepath" "sort" - "strings" "sync" "time" "github.com/google/uuid" jsoniter "github.com/json-iterator/go" + "github.com/klauspost/compress/s2" "github.com/valyala/bytebufferpool" ) const ( defaultLimit = 100000 // Default store limit. defaultExt = ".unknown" + compressExt = ".snappy" ) // errLimitExceeded error is sent when the maximum limit is reached. @@ -83,18 +84,12 @@ func (store *QueueStore[_]) Open() error { return err } - // Truncate entries. - if uint64(len(files)) > store.entryLimit { - files = files[:store.entryLimit] - } - for _, file := range files { if file.IsDir() { continue } - key := strings.TrimSuffix(file.Name(), store.fileExt) if fi, err := file.Info(); err == nil { - store.entries[key] = fi.ModTime().UnixNano() + store.entries[file.Name()] = fi.ModTime().UnixNano() } } @@ -107,96 +102,138 @@ func (store *QueueStore[_]) Delete() error { } // PutMultiple - puts an item to the store. -func (store *QueueStore[I]) PutMultiple(item []I) error { +func (store *QueueStore[I]) PutMultiple(items []I) (Key, error) { // Generate a new UUID for the key. - key, err := uuid.NewRandom() + uid, err := uuid.NewRandom() if err != nil { - return err + return Key{}, err } store.Lock() defer store.Unlock() if uint64(len(store.entries)) >= store.entryLimit { - return errLimitExceeded + return Key{}, errLimitExceeded } - return store.multiWrite(fmt.Sprintf("%d:%s", len(item), key.String()), item) + key := Key{ + Name: uid.String(), + ItemCount: len(items), + Compress: true, + Extension: store.fileExt, + } + return key, store.multiWrite(key, items) } // multiWrite - writes an item to the directory. -func (store *QueueStore[I]) multiWrite(key string, item []I) error { +func (store *QueueStore[I]) multiWrite(key Key, items []I) (err error) { buf := bytebufferpool.Get() defer bytebufferpool.Put(buf) enc := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(buf) - for i := range item { - err := enc.Encode(item[i]) - if err != nil { + for i := range items { + if err = enc.Encode(items[i]); err != nil { return err } } - b := buf.Bytes() - path := filepath.Join(store.directory, key+store.fileExt) - err := os.WriteFile(path, b, os.FileMode(0o770)) + path := filepath.Join(store.directory, key.String()) + if key.Compress { + err = os.WriteFile(path, s2.Encode(nil, buf.Bytes()), os.FileMode(0o770)) + } else { + err = os.WriteFile(path, buf.Bytes(), os.FileMode(0o770)) + } + buf.Reset() if err != nil { return err } // Increment the item count. - store.entries[key] = time.Now().UnixNano() + store.entries[key.String()] = time.Now().UnixNano() - return nil + return } // write - writes an item to the directory. -func (store *QueueStore[I]) write(key string, item I) error { +func (store *QueueStore[I]) write(key Key, item I) error { // Marshalls the item. eventData, err := json.Marshal(item) if err != nil { return err } + return store.writeBytes(key, eventData) +} - path := filepath.Join(store.directory, key+store.fileExt) - if err := os.WriteFile(path, eventData, os.FileMode(0o770)); err != nil { - return err +// writeBytes - writes bytes to the directory. +func (store *QueueStore[I]) writeBytes(key Key, b []byte) (err error) { + path := filepath.Join(store.directory, key.String()) + + if key.Compress { + err = os.WriteFile(path, s2.Encode(nil, b), os.FileMode(0o770)) + } else { + err = os.WriteFile(path, b, os.FileMode(0o770)) } + if err != nil { + return err + } // Increment the item count. - store.entries[key] = time.Now().UnixNano() - + store.entries[key.String()] = time.Now().UnixNano() return nil } // Put - puts an item to the store. -func (store *QueueStore[I]) Put(item I) error { +func (store *QueueStore[I]) Put(item I) (Key, error) { store.Lock() defer store.Unlock() if uint64(len(store.entries)) >= store.entryLimit { - return errLimitExceeded + return Key{}, errLimitExceeded } // Generate a new UUID for the key. - key, err := uuid.NewRandom() + uid, err := uuid.NewRandom() if err != nil { - return err + return Key{}, err } - return store.write(key.String(), item) + key := Key{ + Name: uid.String(), + Extension: store.fileExt, + ItemCount: 1, + } + return key, store.write(key, item) +} + +// PutRaw - puts the raw bytes to the store +func (store *QueueStore[I]) PutRaw(b []byte) (Key, error) { + store.Lock() + defer store.Unlock() + if uint64(len(store.entries)) >= store.entryLimit { + return Key{}, errLimitExceeded + } + // Generate a new UUID for the key. + uid, err := uuid.NewRandom() + if err != nil { + return Key{}, err + } + key := Key{ + Name: uid.String(), + Extension: store.fileExt, + } + return key, store.writeBytes(key, b) } // GetRaw - gets an item from the store. -func (store *QueueStore[I]) GetRaw(key string) (raw []byte, err error) { +func (store *QueueStore[I]) GetRaw(key Key) (raw []byte, err error) { store.RLock() defer func(store *QueueStore[I]) { store.RUnlock() - if err != nil { + if err != nil && !os.IsNotExist(err) { // Upon error we remove the entry. store.Del(key) } }(store) - raw, err = os.ReadFile(filepath.Join(store.directory, key+store.fileExt)) + raw, err = os.ReadFile(filepath.Join(store.directory, key.String())) if err != nil { return } @@ -209,19 +246,19 @@ func (store *QueueStore[I]) GetRaw(key string) (raw []byte, err error) { } // Get - gets an item from the store. -func (store *QueueStore[I]) Get(key string) (item I, err error) { +func (store *QueueStore[I]) Get(key Key) (item I, err error) { store.RLock() defer func(store *QueueStore[I]) { store.RUnlock() - if err != nil { + if err != nil && !os.IsNotExist(err) { // Upon error we remove the entry. store.Del(key) } }(store) var eventData []byte - eventData, err = os.ReadFile(filepath.Join(store.directory, key+store.fileExt)) + eventData, err = os.ReadFile(filepath.Join(store.directory, key.String())) if err != nil { return item, err } @@ -237,28 +274,52 @@ func (store *QueueStore[I]) Get(key string) (item I, err error) { return item, nil } +// GetMultiple will read the multi payload file and fetch the items +func (store *QueueStore[I]) GetMultiple(key Key) (items []I, err error) { + store.RLock() + + defer func(store *QueueStore[I]) { + store.RUnlock() + if err != nil && !os.IsNotExist(err) { + // Upon error we remove the entry. + store.Del(key) + } + }(store) + + raw, err := os.ReadFile(filepath.Join(store.directory, key.String())) + if err != nil { + return + } + + var decoder *jsoniter.Decoder + if key.Compress { + decodedBytes, err := s2.Decode(nil, raw) + if err != nil { + return nil, err + } + decoder = jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(bytes.NewReader(decodedBytes)) + } else { + decoder = jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(bytes.NewReader(raw)) + } + + for decoder.More() { + var item I + if err := decoder.Decode(&item); err != nil { + return nil, err + } + items = append(items, item) + } + + return +} + // Del - Deletes an entry from the store. -func (store *QueueStore[_]) Del(key string) error { +func (store *QueueStore[_]) Del(key Key) error { store.Lock() defer store.Unlock() return store.del(key) } -// DelList - Deletes a list of entries from the store. -// Returns an error even if one key fails to be deleted. -func (store *QueueStore[_]) DelList(keys []string) error { - store.Lock() - defer store.Unlock() - - for _, key := range keys { - if err := store.del(key); err != nil { - return err - } - } - - return nil -} - // Len returns the entry count. func (store *QueueStore[_]) Len() int { store.RLock() @@ -268,30 +329,35 @@ func (store *QueueStore[_]) Len() int { } // lockless call -func (store *QueueStore[_]) del(key string) error { - err := os.Remove(filepath.Join(store.directory, key+store.fileExt)) +func (store *QueueStore[_]) del(key Key) error { + err := os.Remove(filepath.Join(store.directory, key.String())) // Delete as entry no matter the result - delete(store.entries, key) + delete(store.entries, key.String()) return err } // List - lists all files registered in the store. -func (store *QueueStore[_]) List() ([]string, error) { +func (store *QueueStore[_]) List() (keys []Key) { store.RLock() - l := make([]string, 0, len(store.entries)) - for k := range store.entries { - l = append(l, k) + defer store.RUnlock() + + entries := make([]string, 0, len(store.entries)) + for entry := range store.entries { + entries = append(entries, entry) } // Sort entries... - sort.Slice(l, func(i, j int) bool { - return store.entries[l[i]] < store.entries[l[j]] + sort.Slice(entries, func(i, j int) bool { + return store.entries[entries[i]] < store.entries[entries[j]] }) - store.RUnlock() - return l, nil + for i := range entries { + keys = append(keys, parseKey(entries[i])) + } + + return keys } // list will read all entries from disk. @@ -318,9 +384,3 @@ func (store *QueueStore[_]) list() ([]os.DirEntry, error) { return files, nil } - -// Extension will return the file extension used -// for the items stored in the queue. -func (store *QueueStore[_]) Extension() string { - return store.fileExt -} diff --git a/internal/store/queuestore_test.go b/internal/store/queuestore_test.go index 680cb177b..bcb47049d 100644 --- a/internal/store/queuestore_test.go +++ b/internal/store/queuestore_test.go @@ -18,10 +18,10 @@ package store import ( + "fmt" "os" "path/filepath" "reflect" - "strings" "testing" ) @@ -66,17 +66,14 @@ func TestQueueStorePut(t *testing.T) { } // Put 100 items. for i := 0; i < 100; i++ { - if err := store.Put(testItem); err != nil { + if _, err := store.Put(testItem); err != nil { t.Fatal("Failed to put to queue store ", err) } } // Count the items. - names, err := store.List() - if err != nil { - t.Fatal(err) - } - if len(names) != 100 { - t.Fatalf("List() Expected: 100, got %d", len(names)) + keys := store.List() + if len(keys) != 100 { + t.Fatalf("List() Expected: 100, got %d", len(keys)) } } @@ -93,18 +90,15 @@ func TestQueueStoreGet(t *testing.T) { } // Put 10 items for i := 0; i < 10; i++ { - if err := store.Put(testItem); err != nil { + if _, err := store.Put(testItem); err != nil { t.Fatal("Failed to put to queue store ", err) } } - itemKeys, err := store.List() - if err != nil { - t.Fatal(err) - } + itemKeys := store.List() // Get 10 items. if len(itemKeys) == 10 { for _, key := range itemKeys { - item, eErr := store.Get(strings.TrimSuffix(key, testItemExt)) + item, eErr := store.Get(key) if eErr != nil { t.Fatal("Failed to Get the item from the queue store ", eErr) } @@ -130,18 +124,15 @@ func TestQueueStoreDel(t *testing.T) { } // Put 20 items. for i := 0; i < 20; i++ { - if err := store.Put(testItem); err != nil { + if _, err := store.Put(testItem); err != nil { t.Fatal("Failed to put to queue store ", err) } } - itemKeys, err := store.List() - if err != nil { - t.Fatal(err) - } + itemKeys := store.List() // Remove all the items. if len(itemKeys) == 20 { for _, key := range itemKeys { - err := store.Del(strings.TrimSuffix(key, testItemExt)) + err := store.Del(key) if err != nil { t.Fatal("queue store Del failed with ", err) } @@ -150,12 +141,9 @@ func TestQueueStoreDel(t *testing.T) { t.Fatalf("List() Expected: 20, got %d", len(itemKeys)) } - names, err := store.List() - if err != nil { - t.Fatal(err) - } - if len(names) != 0 { - t.Fatalf("List() Expected: 0, got %d", len(names)) + keys := store.List() + if len(keys) != 0 { + t.Fatalf("List() Expected: 0, got %d", len(keys)) } } @@ -172,12 +160,12 @@ func TestQueueStoreLimit(t *testing.T) { t.Fatal("Failed to create a queue store ", err) } for i := 0; i < 5; i++ { - if err := store.Put(testItem); err != nil { + if _, err := store.Put(testItem); err != nil { t.Fatal("Failed to put to queue store ", err) } } // Should not allow 6th Put. - if err := store.Put(testItem); err == nil { + if _, err := store.Put(testItem); err == nil { t.Fatalf("Expected to fail with %s, but passes", errLimitExceeded) } } @@ -194,18 +182,15 @@ func TestQueueStoreListN(t *testing.T) { t.Fatal("Failed to create a queue store ", err) } for i := 0; i < 10; i++ { - if err := store.Put(testItem); err != nil { + if _, err := store.Put(testItem); err != nil { t.Fatal("Failed to put to queue store ", err) } } // Should return all the item keys in the store. - names, err := store.List() - if err != nil { - t.Fatal(err) - } + keys := store.List() - if len(names) != 10 { - t.Fatalf("List() Expected: 10, got %d", len(names)) + if len(keys) != 10 { + t.Fatalf("List() Expected: 10, got %d", len(keys)) } // re-open @@ -213,28 +198,154 @@ func TestQueueStoreListN(t *testing.T) { if err != nil { t.Fatal("Failed to create a queue store ", err) } - names, err = store.List() - if err != nil { - t.Fatal(err) - } + keys = store.List() - if len(names) != 10 { - t.Fatalf("List() Expected: 10, got %d", len(names)) + if len(keys) != 10 { + t.Fatalf("List() Expected: 10, got %d", len(keys)) } - if len(names) != store.Len() { - t.Fatalf("List() Expected: 10, got %d", len(names)) + if len(keys) != store.Len() { + t.Fatalf("List() Expected: 10, got %d", len(keys)) } // Delete all - for _, key := range names { + for _, key := range keys { err := store.Del(key) if err != nil { t.Fatal(err) } } // Re-list - lst, err := store.List() - if len(lst) > 0 || err != nil { - t.Fatalf("Expected List() to return empty list and no error, got %v err: %v", lst, err) + keys = store.List() + if len(keys) > 0 || err != nil { + t.Fatalf("Expected List() to return empty list and no error, got %v err: %v", keys, err) + } +} + +func TestMultiplePutGets(t *testing.T) { + defer func() { + if err := tearDownQueueStore(); err != nil { + t.Fatalf("Failed to tear down store; %v", err) + } + }() + store, err := setUpQueueStore(queueDir, 10) + if err != nil { + t.Fatalf("Failed to create a queue store; %v", err) + } + // TestItem{Name: "test-item", Property: "property"} + var items []TestItem + for i := 0; i < 10; i++ { + items = append(items, TestItem{ + Name: fmt.Sprintf("test-item-%d", i), + Property: "property", + }) + } + + if _, err := store.PutMultiple(items); err != nil { + t.Fatalf("failed to put multiple; %v", err) + } + + keys := store.List() + if len(keys) != 1 { + t.Fatalf("expected len(keys)=1, but found %d", len(keys)) + } + + key := keys[0] + if !key.Compress { + t.Fatal("expected the item to be compressed") + } + if key.ItemCount != 10 { + t.Fatalf("expected itemcount=10 but found %v", key.ItemCount) + } + + resultItems, err := store.GetMultiple(key) + if err != nil { + t.Fatalf("unable to get multiple items; %v", err) + } + + if !reflect.DeepEqual(resultItems, items) { + t.Fatalf("expected item list: %v; but got %v", items, resultItems) + } + + if err := store.Del(key); err != nil { + t.Fatalf("unable to Del; %v", err) + } + + // Re-list + keys = store.List() + if len(keys) > 0 || err != nil { + t.Fatalf("Expected List() to return empty list and no error, got %v err: %v", keys, err) + } +} + +func TestMixedPutGets(t *testing.T) { + defer func() { + if err := tearDownQueueStore(); err != nil { + t.Fatalf("Failed to tear down store; %v", err) + } + }() + store, err := setUpQueueStore(queueDir, 10) + if err != nil { + t.Fatalf("Failed to create a queue store; %v", err) + } + // TestItem{Name: "test-item", Property: "property"} + var items []TestItem + for i := 0; i < 5; i++ { + items = append(items, TestItem{ + Name: fmt.Sprintf("test-item-%d", i), + Property: "property", + }) + } + if _, err := store.PutMultiple(items); err != nil { + t.Fatalf("failed to put multiple; %v", err) + } + + for i := 5; i < 10; i++ { + item := TestItem{ + Name: fmt.Sprintf("test-item-%d", i), + Property: "property", + } + if _, err := store.Put(item); err != nil { + t.Fatalf("unable to store.Put(); %v", err) + } + items = append(items, item) + } + + keys := store.List() + if len(keys) != 6 { + // 1 multiple + 5 single PUTs + t.Fatalf("expected len(keys)=6, but found %d", len(keys)) + } + + var resultItems []TestItem + for _, key := range keys { + if key.ItemCount > 1 { + items, err := store.GetMultiple(key) + if err != nil { + t.Fatalf("unable to get multiple items; %v", err) + } + resultItems = append(resultItems, items...) + continue + } + item, err := store.Get(key) + if err != nil { + t.Fatalf("unable to get item; %v", err) + } + resultItems = append(resultItems, item) + } + + if !reflect.DeepEqual(resultItems, items) { + t.Fatalf("expected item list: %v; but got %v", items, resultItems) + } + + // Delete all + for _, key := range keys { + if err := store.Del(key); err != nil { + t.Fatalf("unable to Del; %v", err) + } + } + // Re-list + keys = store.List() + if len(keys) > 0 || err != nil { + t.Fatalf("Expected List() to return empty list and no error, got %v err: %v", keys, err) } } diff --git a/internal/store/store.go b/internal/store/store.go index a72721856..fcd76dc60 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "strconv" "strings" "time" @@ -44,23 +45,64 @@ type Target interface { // Store - Used to persist items. type Store[I any] interface { - Put(item I) error - PutMultiple(item []I) error - Get(key string) (I, error) - GetRaw(key string) ([]byte, error) + Put(item I) (Key, error) + PutMultiple(item []I) (Key, error) + Get(key Key) (I, error) + GetMultiple(key Key) ([]I, error) + GetRaw(key Key) ([]byte, error) + PutRaw(b []byte) (Key, error) Len() int - List() ([]string, error) - Del(key string) error - DelList(key []string) error + List() []Key + Del(key Key) error Open() error Delete() error - Extension() string } // Key denotes the key present in the store. type Key struct { - Name string - IsLast bool + Name string + Compress bool + Extension string + ItemCount int +} + +// String returns the filepath name +func (k Key) String() string { + keyStr := k.Name + if k.ItemCount > 1 { + keyStr = fmt.Sprintf("%d:%s", k.ItemCount, k.Name) + } + return keyStr + k.Extension + func() string { + if k.Compress { + return compressExt + } + return "" + }() +} + +func getItemCount(k string) (count int, err error) { + count = 1 + v := strings.Split(k, ":") + if len(v) == 2 { + return strconv.Atoi(v[0]) + } + return +} + +func parseKey(k string) (key Key) { + key.Name = k + if strings.HasSuffix(k, compressExt) { + key.Compress = true + key.Name = strings.TrimSuffix(key.Name, compressExt) + } + if key.ItemCount, _ = getItemCount(k); key.ItemCount > 1 { + key.Name = strings.TrimPrefix(key.Name, fmt.Sprintf("%d:", key.ItemCount)) + } + if vals := strings.Split(key.Name, "."); len(vals) == 2 { + key.Extension = "." + vals[1] + key.Name = strings.TrimSuffix(key.Name, key.Extension) + } + return } // replayItems - Reads the items from the store and replays. @@ -74,18 +116,12 @@ func replayItems[I any](store Store[I], doneCh <-chan struct{}, log logger, id s defer retryTicker.Stop() for { - names, err := store.List() - if err != nil { - log(context.Background(), fmt.Errorf("store.List() failed with: %w", err), id) - } else { - keyCount := len(names) - for i, name := range names { - select { - case keyCh <- Key{strings.TrimSuffix(name, store.Extension()), keyCount == i+1}: - // Get next key. - case <-doneCh: - return - } + for _, key := range store.List() { + select { + case keyCh <- key: + // Get next key. + case <-doneCh: + return } } @@ -114,7 +150,7 @@ func sendItems(target Target, keyCh <-chan Key, doneCh <-chan struct{}, logger l logger( context.Background(), - fmt.Errorf("unable to send webhook log entry to '%s' err '%w'", target.Name(), err), + fmt.Errorf("unable to send log entry to '%s' err '%w'", target.Name(), err), target.Name(), ) diff --git a/internal/store/store_test.go b/internal/store/store_test.go new file mode 100644 index 000000000..4c05f3d73 --- /dev/null +++ b/internal/store/store_test.go @@ -0,0 +1,146 @@ +// Copyright (c) 2015-2024 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package store + +import ( + "testing" +) + +func TestKeyString(t *testing.T) { + testCases := []struct { + key Key + expectedString string + }{ + { + key: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Extension: ".event", + }, + expectedString: "01894394-d046-4783-ba0d-f1c6885790dc.event", + }, + { + key: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Compress: true, + Extension: ".event", + ItemCount: 100, + }, + expectedString: "100:01894394-d046-4783-ba0d-f1c6885790dc.event.snappy", + }, + { + key: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Extension: ".event", + ItemCount: 100, + }, + expectedString: "100:01894394-d046-4783-ba0d-f1c6885790dc.event", + }, + { + key: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Compress: true, + Extension: ".event", + ItemCount: 1, + }, + expectedString: "01894394-d046-4783-ba0d-f1c6885790dc.event.snappy", + }, + { + key: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Extension: ".event", + ItemCount: 1, + }, + expectedString: "01894394-d046-4783-ba0d-f1c6885790dc.event", + }, + } + + for i, testCase := range testCases { + if testCase.expectedString != testCase.key.String() { + t.Fatalf("case[%v]: key.String() Expected: %s, got %s", i, testCase.expectedString, testCase.key.String()) + } + } +} + +func TestParseKey(t *testing.T) { + testCases := []struct { + str string + expectedKey Key + }{ + { + str: "01894394-d046-4783-ba0d-f1c6885790dc.event", + expectedKey: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Extension: ".event", + ItemCount: 1, + }, + }, + { + str: "100:01894394-d046-4783-ba0d-f1c6885790dc.event.snappy", + expectedKey: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Compress: true, + Extension: ".event", + ItemCount: 100, + }, + }, + { + str: "100:01894394-d046-4783-ba0d-f1c6885790dc.event", + expectedKey: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Extension: ".event", + ItemCount: 100, + }, + }, + { + str: "01894394-d046-4783-ba0d-f1c6885790dc.event.snappy", + expectedKey: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Compress: true, + Extension: ".event", + ItemCount: 1, + }, + }, + { + str: "01894394-d046-4783-ba0d-f1c6885790dc.event", + expectedKey: Key{ + Name: "01894394-d046-4783-ba0d-f1c6885790dc", + Extension: ".event", + ItemCount: 1, + }, + }, + } + + for i, testCase := range testCases { + key := parseKey(testCase.str) + if testCase.expectedKey.Name != key.Name { + t.Fatalf("case[%v]: Expected key.Name: %v, got %v", i, testCase.expectedKey.Name, key.Name) + } + if testCase.expectedKey.Compress != key.Compress { + t.Fatalf("case[%v]: Expected key.Compress: %v, got %v", i, testCase.expectedKey.Compress, key.Compress) + } + if testCase.expectedKey.Extension != key.Extension { + t.Fatalf("case[%v]: Expected key.Extension: %v, got %v", i, testCase.expectedKey.Extension, key.Extension) + } + if testCase.expectedKey.ItemCount != key.ItemCount { + t.Fatalf("case[%v]: Expected key.ItemCount: %v, got %v", i, testCase.expectedKey.ItemCount, key.ItemCount) + } + if testCase.expectedKey.String() != key.String() { + t.Fatalf("case[%v]: Expected key.String(): %v, got %v", i, testCase.expectedKey.String(), key.String()) + } + } +}