Kafka notify: support batched commits for queue store (#20377)

The items will be saved per target batch and will
be committed to the queue store when the batch is full

Also, periodically commit the batched items to the queue store
based on configured commit_timeout; default is 30s;

Bonus: compress queue store multi writes
This commit is contained in:
Praveen raj Mani 2024-09-07 04:36:30 +05:30 committed by GitHub
parent 0f1e8db4c5
commit 261111e728
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 907 additions and 397 deletions

View File

@ -274,6 +274,18 @@ var (
Optional: true, Optional: true,
Type: "number", Type: "number",
}, },
config.HelpKV{
Key: target.KafkaBatchSize,
Description: "batch size of the events; used only when queue_dir is set",
Optional: true,
Type: "number",
},
config.HelpKV{
Key: target.KafkaBatchCommitTimeout,
Description: "commit timeout set for the batch; used only when batch_size > 1",
Optional: true,
Type: "duration",
},
} }
HelpMQTT = config.HelpKVS{ HelpMQTT = config.HelpKVS{

View File

@ -374,6 +374,10 @@ var (
Key: target.KafkaBatchSize, Key: target.KafkaBatchSize,
Value: "0", Value: "0",
}, },
config.KV{
Key: target.KafkaBatchCommitTimeout,
Value: "0s",
},
config.KV{ config.KV{
Key: target.KafkaCompressionCodec, Key: target.KafkaCompressionCodec,
Value: "", Value: "",
@ -463,14 +467,23 @@ func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs
return nil, err return nil, err
} }
batchCommitTimeoutEnv := target.EnvKafkaBatchCommitTimeout
if k != config.Default {
batchCommitTimeoutEnv = batchCommitTimeoutEnv + config.Default + k
}
batchCommitTimeout, err := time.ParseDuration(env.Get(batchCommitTimeoutEnv, kv.Get(target.KafkaBatchCommitTimeout)))
if err != nil {
return nil, err
}
kafkaArgs := target.KafkaArgs{ kafkaArgs := target.KafkaArgs{
Enable: enabled, Enable: enabled,
Brokers: brokers, Brokers: brokers,
Topic: env.Get(topicEnv, kv.Get(target.KafkaTopic)), Topic: env.Get(topicEnv, kv.Get(target.KafkaTopic)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.KafkaQueueDir)), QueueDir: env.Get(queueDirEnv, kv.Get(target.KafkaQueueDir)),
QueueLimit: queueLimit, QueueLimit: queueLimit,
Version: env.Get(versionEnv, kv.Get(target.KafkaVersion)), Version: env.Get(versionEnv, kv.Get(target.KafkaVersion)),
BatchSize: uint32(batchSize), BatchSize: uint32(batchSize),
BatchCommitTimeout: batchCommitTimeout,
} }
tlsEnableEnv := target.EnvKafkaTLS tlsEnableEnv := target.EnvKafkaTLS

View File

@ -276,7 +276,8 @@ func (target *AMQPTarget) send(eventData event.Event, ch *amqp091.Channel, confi
// Save - saves the events to the store which will be replayed when the amqp connection is active. // Save - saves the events to the store which will be replayed when the amqp connection is active.
func (target *AMQPTarget) Save(eventData event.Event) error { func (target *AMQPTarget) Save(eventData event.Event) error {
if target.store != nil { if target.store != nil {
return target.store.Put(eventData) _, err := target.store.Put(eventData)
return err
} }
if err := target.init(); err != nil { if err := target.init(); err != nil {
return err return err
@ -302,7 +303,7 @@ func (target *AMQPTarget) SendFromStore(key store.Key) error {
} }
defer ch.Close() defer ch.Close()
eventData, eErr := target.store.Get(key.Name) eventData, eErr := target.store.Get(key)
if eErr != nil { if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully. // Such events will not exist and wouldve been already been sent successfully.
@ -317,7 +318,7 @@ func (target *AMQPTarget) SendFromStore(key store.Key) error {
} }
// Delete the event from store. // Delete the event from store.
return target.store.Del(key.Name) return target.store.Del(key)
} }
// Close - does nothing and available for interface compatibility. // Close - does nothing and available for interface compatibility.

View File

@ -202,7 +202,8 @@ func (target *ElasticsearchTarget) isActive() (bool, error) {
// Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active. // Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active.
func (target *ElasticsearchTarget) Save(eventData event.Event) error { func (target *ElasticsearchTarget) Save(eventData event.Event) error {
if target.store != nil { if target.store != nil {
return target.store.Put(eventData) _, err := target.store.Put(eventData)
return err
} }
if err := target.init(); err != nil { if err := target.init(); err != nil {
return err return err
@ -278,7 +279,7 @@ func (target *ElasticsearchTarget) SendFromStore(key store.Key) error {
return err return err
} }
eventData, eErr := target.store.Get(key.Name) eventData, eErr := target.store.Get(key)
if eErr != nil { if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully. // Such events will not exist and wouldve been already been sent successfully.
@ -296,7 +297,7 @@ func (target *ElasticsearchTarget) SendFromStore(key store.Key) error {
} }
// Delete the event from store. // Delete the event from store.
return target.store.Del(key.Name) return target.store.Del(key)
} }
// Close - does nothing and available for interface compatibility. // Close - does nothing and available for interface compatibility.

View File

@ -43,23 +43,24 @@ import (
// Kafka input constants // Kafka input constants
const ( const (
KafkaBrokers = "brokers" KafkaBrokers = "brokers"
KafkaTopic = "topic" KafkaTopic = "topic"
KafkaQueueDir = "queue_dir" KafkaQueueDir = "queue_dir"
KafkaQueueLimit = "queue_limit" KafkaQueueLimit = "queue_limit"
KafkaTLS = "tls" KafkaTLS = "tls"
KafkaTLSSkipVerify = "tls_skip_verify" KafkaTLSSkipVerify = "tls_skip_verify"
KafkaTLSClientAuth = "tls_client_auth" KafkaTLSClientAuth = "tls_client_auth"
KafkaSASL = "sasl" KafkaSASL = "sasl"
KafkaSASLUsername = "sasl_username" KafkaSASLUsername = "sasl_username"
KafkaSASLPassword = "sasl_password" KafkaSASLPassword = "sasl_password"
KafkaSASLMechanism = "sasl_mechanism" KafkaSASLMechanism = "sasl_mechanism"
KafkaClientTLSCert = "client_tls_cert" KafkaClientTLSCert = "client_tls_cert"
KafkaClientTLSKey = "client_tls_key" KafkaClientTLSKey = "client_tls_key"
KafkaVersion = "version" KafkaVersion = "version"
KafkaBatchSize = "batch_size" KafkaBatchSize = "batch_size"
KafkaCompressionCodec = "compression_codec" KafkaBatchCommitTimeout = "batch_commit_timeout"
KafkaCompressionLevel = "compression_level" KafkaCompressionCodec = "compression_codec"
KafkaCompressionLevel = "compression_level"
EnvKafkaEnable = "MINIO_NOTIFY_KAFKA_ENABLE" EnvKafkaEnable = "MINIO_NOTIFY_KAFKA_ENABLE"
EnvKafkaBrokers = "MINIO_NOTIFY_KAFKA_BROKERS" EnvKafkaBrokers = "MINIO_NOTIFY_KAFKA_BROKERS"
@ -77,6 +78,7 @@ const (
EnvKafkaClientTLSKey = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY" EnvKafkaClientTLSKey = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY"
EnvKafkaVersion = "MINIO_NOTIFY_KAFKA_VERSION" EnvKafkaVersion = "MINIO_NOTIFY_KAFKA_VERSION"
EnvKafkaBatchSize = "MINIO_NOTIFY_KAFKA_BATCH_SIZE" EnvKafkaBatchSize = "MINIO_NOTIFY_KAFKA_BATCH_SIZE"
EnvKafkaBatchCommitTimeout = "MINIO_NOTIFY_KAFKA_BATCH_COMMIT_TIMEOUT"
EnvKafkaProducerCompressionCodec = "MINIO_NOTIFY_KAFKA_PRODUCER_COMPRESSION_CODEC" EnvKafkaProducerCompressionCodec = "MINIO_NOTIFY_KAFKA_PRODUCER_COMPRESSION_CODEC"
EnvKafkaProducerCompressionLevel = "MINIO_NOTIFY_KAFKA_PRODUCER_COMPRESSION_LEVEL" EnvKafkaProducerCompressionLevel = "MINIO_NOTIFY_KAFKA_PRODUCER_COMPRESSION_LEVEL"
) )
@ -91,14 +93,15 @@ var codecs = map[string]sarama.CompressionCodec{
// KafkaArgs - Kafka target arguments. // KafkaArgs - Kafka target arguments.
type KafkaArgs struct { type KafkaArgs struct {
Enable bool `json:"enable"` Enable bool `json:"enable"`
Brokers []xnet.Host `json:"brokers"` Brokers []xnet.Host `json:"brokers"`
Topic string `json:"topic"` Topic string `json:"topic"`
QueueDir string `json:"queueDir"` QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"` QueueLimit uint64 `json:"queueLimit"`
Version string `json:"version"` Version string `json:"version"`
BatchSize uint32 `json:"batchSize"` BatchSize uint32 `json:"batchSize"`
TLS struct { BatchCommitTimeout time.Duration `json:"batchCommitTimeout"`
TLS struct {
Enable bool `json:"enable"` Enable bool `json:"enable"`
RootCAs *x509.CertPool `json:"-"` RootCAs *x509.CertPool `json:"-"`
SkipVerify bool `json:"skipVerify"` SkipVerify bool `json:"skipVerify"`
@ -146,6 +149,11 @@ func (k KafkaArgs) Validate() error {
return errors.New("batch should be enabled only if queue dir is enabled") return errors.New("batch should be enabled only if queue dir is enabled")
} }
} }
if k.BatchCommitTimeout > 0 {
if k.QueueDir == "" || k.BatchSize <= 1 {
return errors.New("batch commit timeout should be set only if queue dir is enabled and batch size > 1")
}
}
return nil return nil
} }
@ -159,7 +167,7 @@ type KafkaTarget struct {
producer sarama.SyncProducer producer sarama.SyncProducer
config *sarama.Config config *sarama.Config
store store.Store[event.Event] store store.Store[event.Event]
batch *store.Batch[string, *sarama.ProducerMessage] batch *store.Batch[event.Event]
loggerOnce logger.LogOnce loggerOnce logger.LogOnce
quitCh chan struct{} quitCh chan struct{}
} }
@ -199,7 +207,11 @@ func (target *KafkaTarget) isActive() (bool, error) {
// Save - saves the events to the store which will be replayed when the Kafka connection is active. // Save - saves the events to the store which will be replayed when the Kafka connection is active.
func (target *KafkaTarget) Save(eventData event.Event) error { func (target *KafkaTarget) Save(eventData event.Event) error {
if target.store != nil { if target.store != nil {
return target.store.Put(eventData) if target.batch != nil {
return target.batch.Add(eventData)
}
_, err := target.store.Put(eventData)
return err
} }
if err := target.init(); err != nil { if err := target.init(); err != nil {
return err return err
@ -220,80 +232,59 @@ func (target *KafkaTarget) send(eventData event.Event) error {
return err return err
} }
// SendFromStore - reads an event from store and sends it to Kafka. // sendMultiple sends multiple messages to the kafka.
func (target *KafkaTarget) SendFromStore(key store.Key) error { func (target *KafkaTarget) sendMultiple(events []event.Event) error {
if err := target.init(); err != nil { if target.producer == nil {
return err return store.ErrNotConnected
} }
var msgs []*sarama.ProducerMessage
// If batch is enabled, the event will be batched in memory for _, event := range events {
// and will be committed once the batch is full. msg, err := target.toProducerMessage(event)
if target.batch != nil { if err != nil {
return target.addToBatch(key)
}
eventData, eErr := target.store.Get(key.Name)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
if err := target.send(eventData); err != nil {
if isKafkaConnErr(err) {
return store.ErrNotConnected
}
return err
}
// Delete the event from store.
return target.store.Del(key.Name)
}
func (target *KafkaTarget) addToBatch(key store.Key) error {
if target.batch.IsFull() {
if err := target.commitBatch(); err != nil {
return err return err
} }
msgs = append(msgs, msg)
} }
if _, ok := target.batch.GetByKey(key.Name); !ok { return target.producer.SendMessages(msgs)
eventData, err := target.store.Get(key.Name) }
// SendFromStore - reads an event from store and sends it to Kafka.
func (target *KafkaTarget) SendFromStore(key store.Key) (err error) {
if err = target.init(); err != nil {
return err
}
switch {
case key.ItemCount == 1:
var event event.Event
event, err = target.store.Get(key)
if err != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(err) {
return nil
}
return err
}
err = target.send(event)
case key.ItemCount > 1:
var events []event.Event
events, err = target.store.GetMultiple(key)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil return nil
} }
return err return err
} }
msg, err := target.toProducerMessage(eventData) err = target.sendMultiple(events)
if err != nil {
return err
}
if err = target.batch.Add(key.Name, msg); err != nil {
return err
}
} }
// commit the batch if the key is the last one present in the store.
if key.IsLast || target.batch.IsFull() {
return target.commitBatch()
}
return nil
}
func (target *KafkaTarget) commitBatch() error {
keys, msgs, err := target.batch.GetAll()
if err != nil { if err != nil {
return err
}
if err = target.producer.SendMessages(msgs); err != nil {
if isKafkaConnErr(err) { if isKafkaConnErr(err) {
return store.ErrNotConnected return store.ErrNotConnected
} }
return err return err
} }
return target.store.DelList(keys) // Delete the event from store.
return target.store.Del(key)
} }
func (target *KafkaTarget) toProducerMessage(eventData event.Event) (*sarama.ProducerMessage, error) { func (target *KafkaTarget) toProducerMessage(eventData event.Event) (*sarama.ProducerMessage, error) {
@ -319,7 +310,18 @@ func (target *KafkaTarget) toProducerMessage(eventData event.Event) (*sarama.Pro
func (target *KafkaTarget) Close() error { func (target *KafkaTarget) Close() error {
close(target.quitCh) close(target.quitCh)
if target.batch != nil {
target.batch.Close()
}
if target.producer != nil { if target.producer != nil {
if target.store != nil {
// It is safe to abort the current transaction if
// queue_dir is configured
target.producer.AbortTxn()
} else {
target.producer.CommitTxn()
}
target.producer.Close() target.producer.Close()
return target.client.Close() return target.client.Close()
} }
@ -442,10 +444,14 @@ func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*Kafk
loggerOnce: loggerOnce, loggerOnce: loggerOnce,
quitCh: make(chan struct{}), quitCh: make(chan struct{}),
} }
if target.store != nil { if target.store != nil {
if args.BatchSize > 1 { if args.BatchSize > 1 {
target.batch = store.NewBatch[string, *sarama.ProducerMessage](args.BatchSize) target.batch = store.NewBatch[event.Event](store.BatchConfig[event.Event]{
Limit: args.BatchSize,
Log: loggerOnce,
Store: queueStore,
CommitTimeout: args.BatchCommitTimeout,
})
} }
store.StreamItems(target.store, target, target.quitCh, target.loggerOnce) store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
} }

View File

@ -180,7 +180,7 @@ func (target *MQTTTarget) SendFromStore(key store.Key) error {
return err return err
} }
eventData, err := target.store.Get(key.Name) eventData, err := target.store.Get(key)
if err != nil { if err != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully. // Such events will not exist and wouldve been already been sent successfully.
@ -195,14 +195,15 @@ func (target *MQTTTarget) SendFromStore(key store.Key) error {
} }
// Delete the event from store. // Delete the event from store.
return target.store.Del(key.Name) return target.store.Del(key)
} }
// Save - saves the events to the store if queuestore is configured, which will // Save - saves the events to the store if queuestore is configured, which will
// be replayed when the mqtt connection is active. // be replayed when the mqtt connection is active.
func (target *MQTTTarget) Save(eventData event.Event) error { func (target *MQTTTarget) Save(eventData event.Event) error {
if target.store != nil { if target.store != nil {
return target.store.Put(eventData) _, err := target.store.Put(eventData)
return err
} }
if err := target.init(); err != nil { if err := target.init(); err != nil {
return err return err

View File

@ -198,7 +198,8 @@ func (target *MySQLTarget) isActive() (bool, error) {
// Save - saves the events to the store which will be replayed when the SQL connection is active. // Save - saves the events to the store which will be replayed when the SQL connection is active.
func (target *MySQLTarget) Save(eventData event.Event) error { func (target *MySQLTarget) Save(eventData event.Event) error {
if target.store != nil { if target.store != nil {
return target.store.Put(eventData) _, err := target.store.Put(eventData)
return err
} }
if err := target.init(); err != nil { if err := target.init(); err != nil {
return err return err
@ -273,7 +274,7 @@ func (target *MySQLTarget) SendFromStore(key store.Key) error {
} }
} }
eventData, eErr := target.store.Get(key.Name) eventData, eErr := target.store.Get(key)
if eErr != nil { if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully. // Such events will not exist and wouldve been already been sent successfully.
@ -291,7 +292,7 @@ func (target *MySQLTarget) SendFromStore(key store.Key) error {
} }
// Delete the event from store. // Delete the event from store.
return target.store.Del(key.Name) return target.store.Del(key)
} }
// Close - closes underneath connections to MySQL database. // Close - closes underneath connections to MySQL database.

View File

@ -299,7 +299,8 @@ func (target *NATSTarget) isActive() (bool, error) {
// Save - saves the events to the store which will be replayed when the Nats connection is active. // Save - saves the events to the store which will be replayed when the Nats connection is active.
func (target *NATSTarget) Save(eventData event.Event) error { func (target *NATSTarget) Save(eventData event.Event) error {
if target.store != nil { if target.store != nil {
return target.store.Put(eventData) _, err := target.store.Put(eventData)
return err
} }
if err := target.init(); err != nil { if err := target.init(); err != nil {
@ -353,7 +354,7 @@ func (target *NATSTarget) SendFromStore(key store.Key) error {
return err return err
} }
eventData, eErr := target.store.Get(key.Name) eventData, eErr := target.store.Get(key)
if eErr != nil { if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully. // Such events will not exist and wouldve been already been sent successfully.
@ -367,7 +368,7 @@ func (target *NATSTarget) SendFromStore(key store.Key) error {
return err return err
} }
return target.store.Del(key.Name) return target.store.Del(key)
} }
// Close - closes underneath connections to NATS server. // Close - closes underneath connections to NATS server.

View File

@ -147,7 +147,8 @@ func (target *NSQTarget) isActive() (bool, error) {
// Save - saves the events to the store which will be replayed when the nsq connection is active. // Save - saves the events to the store which will be replayed when the nsq connection is active.
func (target *NSQTarget) Save(eventData event.Event) error { func (target *NSQTarget) Save(eventData event.Event) error {
if target.store != nil { if target.store != nil {
return target.store.Put(eventData) _, err := target.store.Put(eventData)
return err
} }
if err := target.init(); err != nil { if err := target.init(); err != nil {
@ -188,7 +189,7 @@ func (target *NSQTarget) SendFromStore(key store.Key) error {
return err return err
} }
eventData, eErr := target.store.Get(key.Name) eventData, eErr := target.store.Get(key)
if eErr != nil { if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully. // Such events will not exist and wouldve been already been sent successfully.
@ -203,7 +204,7 @@ func (target *NSQTarget) SendFromStore(key store.Key) error {
} }
// Delete the event from store. // Delete the event from store.
return target.store.Del(key.Name) return target.store.Del(key)
} }
// Close - closes underneath connections to NSQD server. // Close - closes underneath connections to NSQD server.

View File

@ -196,7 +196,8 @@ func (target *PostgreSQLTarget) isActive() (bool, error) {
// Save - saves the events to the store if questore is configured, which will be replayed when the PostgreSQL connection is active. // Save - saves the events to the store if questore is configured, which will be replayed when the PostgreSQL connection is active.
func (target *PostgreSQLTarget) Save(eventData event.Event) error { func (target *PostgreSQLTarget) Save(eventData event.Event) error {
if target.store != nil { if target.store != nil {
return target.store.Put(eventData) _, err := target.store.Put(eventData)
return err
} }
if err := target.init(); err != nil { if err := target.init(); err != nil {
@ -275,7 +276,7 @@ func (target *PostgreSQLTarget) SendFromStore(key store.Key) error {
} }
} }
eventData, eErr := target.store.Get(key.Name) eventData, eErr := target.store.Get(key)
if eErr != nil { if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully. // Such events will not exist and wouldve been already been sent successfully.
@ -293,7 +294,7 @@ func (target *PostgreSQLTarget) SendFromStore(key store.Key) error {
} }
// Delete the event from store. // Delete the event from store.
return target.store.Del(key.Name) return target.store.Del(key)
} }
// Close - closes underneath connections to PostgreSQL database. // Close - closes underneath connections to PostgreSQL database.

View File

@ -173,7 +173,8 @@ func (target *RedisTarget) isActive() (bool, error) {
// Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active. // Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active.
func (target *RedisTarget) Save(eventData event.Event) error { func (target *RedisTarget) Save(eventData event.Event) error {
if target.store != nil { if target.store != nil {
return target.store.Put(eventData) _, err := target.store.Put(eventData)
return err
} }
if err := target.init(); err != nil { if err := target.init(); err != nil {
return err return err
@ -252,7 +253,7 @@ func (target *RedisTarget) SendFromStore(key store.Key) error {
target.firstPing = true target.firstPing = true
} }
eventData, eErr := target.store.Get(key.Name) eventData, eErr := target.store.Get(key)
if eErr != nil { if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and would've been already been sent successfully. // Such events will not exist and would've been already been sent successfully.
@ -270,7 +271,7 @@ func (target *RedisTarget) SendFromStore(key store.Key) error {
} }
// Delete the event from store. // Delete the event from store.
return target.store.Del(key.Name) return target.store.Del(key)
} }
// Close - releases the resources used by the pool. // Close - releases the resources used by the pool.

View File

@ -146,7 +146,8 @@ func (target *WebhookTarget) isActive() (bool, error) {
// which will be replayed when the webhook connection is active. // which will be replayed when the webhook connection is active.
func (target *WebhookTarget) Save(eventData event.Event) error { func (target *WebhookTarget) Save(eventData event.Event) error {
if target.store != nil { if target.store != nil {
return target.store.Put(eventData) _, err := target.store.Put(eventData)
return err
} }
if err := target.init(); err != nil { if err := target.init(); err != nil {
return err return err
@ -213,7 +214,7 @@ func (target *WebhookTarget) SendFromStore(key store.Key) error {
return err return err
} }
eventData, eErr := target.store.Get(key.Name) eventData, eErr := target.store.Get(key)
if eErr != nil { if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and would've been already been sent successfully. // Such events will not exist and would've been already been sent successfully.
@ -231,7 +232,7 @@ func (target *WebhookTarget) SendFromStore(key store.Key) error {
} }
// Delete the event from store. // Delete the event from store.
return target.store.Del(key.Name) return target.store.Del(key)
} }
// Close - does nothing and available for interface compatibility. // Close - does nothing and available for interface compatibility.

View File

@ -418,7 +418,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
if !isDirQueue { if !isDirQueue {
err = h.send(ctx, buf.Bytes(), count, h.payloadType, webhookCallTimeout) err = h.send(ctx, buf.Bytes(), count, h.payloadType, webhookCallTimeout)
} else { } else {
err = h.store.PutMultiple(entries) _, err = h.store.PutMultiple(entries)
} }
if err != nil { if err != nil {
@ -530,7 +530,7 @@ func New(config Config) (*Target, error) {
// SendFromStore - reads the log from store and sends it to webhook. // SendFromStore - reads the log from store and sends it to webhook.
func (h *Target) SendFromStore(key store.Key) (err error) { func (h *Target) SendFromStore(key store.Key) (err error) {
var eventData []byte var eventData []byte
eventData, err = h.store.GetRaw(key.Name) eventData, err = h.store.GetRaw(key)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil return nil
@ -552,7 +552,7 @@ func (h *Target) SendFromStore(key store.Key) (err error) {
} }
// Delete the event from store. // Delete the event from store.
return h.store.Del(key.Name) return h.store.Del(key)
} }
// Send the log message 'entry' to the http target. // Send the log message 'entry' to the http target.

View File

@ -315,7 +315,8 @@ func (h *Target) IsOnline(_ context.Context) bool {
func (h *Target) Send(ctx context.Context, entry interface{}) error { func (h *Target) Send(ctx context.Context, entry interface{}) error {
if h.store != nil { if h.store != nil {
// save the entry to the queue store which will be replayed to the target. // save the entry to the queue store which will be replayed to the target.
return h.store.Put(entry) _, err := h.store.Put(entry)
return err
} }
h.logChMu.RLock() h.logChMu.RLock()
defer h.logChMu.RUnlock() defer h.logChMu.RUnlock()
@ -344,7 +345,7 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error {
// SendFromStore - reads the log from store and sends it to kafka. // SendFromStore - reads the log from store and sends it to kafka.
func (h *Target) SendFromStore(key store.Key) (err error) { func (h *Target) SendFromStore(key store.Key) (err error) {
auditEntry, err := h.store.Get(key.Name) auditEntry, err := h.store.Get(key)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil return nil
@ -358,7 +359,7 @@ func (h *Target) SendFromStore(key store.Key) (err error) {
return return
} }
// Delete the event from store. // Delete the event from store.
return h.store.Del(key.Name) return h.store.Del(key)
} }
// Cancel - cancels the target // Cancel - cancels the target

View File

@ -18,100 +18,123 @@
package store package store
import ( import (
"context"
"errors" "errors"
"fmt"
"sync" "sync"
"time"
) )
// ErrBatchFull indicates that the batch is full // ErrBatchFull indicates that the batch is full
var ErrBatchFull = errors.New("batch is full") var ErrBatchFull = errors.New("batch is full")
type key interface { const defaultCommitTimeout = 30 * time.Second
string | int | int64
}
// Batch represents an ordered batch // Batch represents an ordered batch
type Batch[K key, T any] struct { type Batch[I any] struct {
keys []K items []I
items map[K]T limit uint32
limit uint32 store Store[I]
quitCh chan struct{}
sync.Mutex sync.Mutex
} }
// BatchConfig represents the batch config
type BatchConfig[I any] struct {
Limit uint32
Store Store[I]
CommitTimeout time.Duration
Log logger
}
// Add adds the item to the batch // Add adds the item to the batch
func (b *Batch[K, T]) Add(key K, item T) error { func (b *Batch[I]) Add(item I) error {
b.Lock() b.Lock()
defer b.Unlock() defer b.Unlock()
if b.isFull() { if b.isFull() {
return ErrBatchFull if b.store == nil {
return ErrBatchFull
}
// commit batch to store
if err := b.commit(); err != nil {
return err
}
} }
if _, ok := b.items[key]; !ok { b.items = append(b.items, item)
b.keys = append(b.keys, key)
}
b.items[key] = item
return nil return nil
} }
// GetAll fetches the items and resets the batch
// Returned items are not referenced by the batch
func (b *Batch[K, T]) GetAll() (orderedKeys []K, orderedItems []T, err error) {
b.Lock()
defer b.Unlock()
orderedKeys = append([]K(nil), b.keys...)
for _, key := range orderedKeys {
item, ok := b.items[key]
if !ok {
err = fmt.Errorf("item not found for the key: %v; should not happen;", key)
return
}
orderedItems = append(orderedItems, item)
delete(b.items, key)
}
b.keys = b.keys[:0]
return
}
// GetByKey will get the batch item by the provided key
func (b *Batch[K, T]) GetByKey(key K) (T, bool) {
b.Lock()
defer b.Unlock()
item, ok := b.items[key]
return item, ok
}
// Len returns the no of items in the batch // Len returns the no of items in the batch
func (b *Batch[K, T]) Len() int { func (b *Batch[_]) Len() int {
b.Lock() b.Lock()
defer b.Unlock() defer b.Unlock()
return len(b.keys) return len(b.items)
} }
// IsFull checks if the batch is full or not func (b *Batch[_]) isFull() bool {
func (b *Batch[K, T]) IsFull() bool {
b.Lock()
defer b.Unlock()
return b.isFull()
}
func (b *Batch[K, T]) isFull() bool {
return len(b.items) >= int(b.limit) return len(b.items) >= int(b.limit)
} }
// NewBatch creates a new batch func (b *Batch[I]) commit() error {
func NewBatch[K key, T any](limit uint32) *Batch[K, T] { switch len(b.items) {
return &Batch[K, T]{ case 0:
keys: make([]K, 0, limit), return nil
items: make(map[K]T, limit), case 1:
limit: limit, _, err := b.store.Put(b.items[0])
return err
default:
} }
if _, err := b.store.PutMultiple(b.items); err != nil {
return err
}
b.items = make([]I, 0, b.limit)
return nil
}
// Close commits the pending items and quits the goroutines
func (b *Batch[I]) Close() error {
defer func() {
close(b.quitCh)
}()
b.Lock()
defer b.Unlock()
return b.commit()
}
// NewBatch creates a new batch
func NewBatch[I any](config BatchConfig[I]) *Batch[I] {
if config.CommitTimeout == 0 {
config.CommitTimeout = defaultCommitTimeout
}
quitCh := make(chan struct{})
batch := &Batch[I]{
items: make([]I, 0, config.Limit),
limit: config.Limit,
store: config.Store,
quitCh: quitCh,
}
if batch.store != nil {
go func() {
commitTicker := time.NewTicker(config.CommitTimeout)
defer commitTicker.Stop()
for {
select {
case <-commitTicker.C:
case <-batch.quitCh:
return
}
batch.Lock()
err := batch.commit()
batch.Unlock()
if err != nil {
config.Log(context.Background(), err, "")
}
}
}()
}
return batch
} }

View File

@ -18,109 +18,202 @@
package store package store
import ( import (
"errors" "context"
"sync" "sync"
"testing" "testing"
"time"
) )
func TestBatch(t *testing.T) { func TestBatchCommit(t *testing.T) {
defer func() {
if err := tearDownQueueStore(); err != nil {
t.Fatalf("Failed to tear down store; %v", err)
}
}()
store, err := setUpQueueStore(queueDir, 100)
if err != nil {
t.Fatalf("Failed to create a queue store; %v", err)
}
var limit uint32 = 100 var limit uint32 = 100
batch := NewBatch[int, int](limit)
batch := NewBatch[TestItem](BatchConfig[TestItem]{
Limit: limit,
Store: store,
CommitTimeout: 5 * time.Minute,
Log: func(ctx context.Context, err error, id string, errKind ...interface{}) {
t.Log(err)
},
})
defer batch.Close()
for i := 0; i < int(limit); i++ { for i := 0; i < int(limit); i++ {
if err := batch.Add(i, i); err != nil { if err := batch.Add(testItem); err != nil {
t.Fatalf("failed to add %v; %v", i, err) t.Fatalf("failed to add %v; %v", i, err)
} }
if _, ok := batch.GetByKey(i); !ok {
t.Fatalf("failed to get the item by key %v after adding", i)
}
}
err := batch.Add(101, 101)
if err == nil || !errors.Is(err, ErrBatchFull) {
t.Fatalf("Expected err %v but got %v", ErrBatchFull, err)
}
if !batch.IsFull() {
t.Fatal("Expected batch.IsFull to be true but got false")
} }
batchLen := batch.Len() batchLen := batch.Len()
if batchLen != int(limit) { if batchLen != int(limit) {
t.Fatalf("expected batch length to be %v but got %v", limit, batchLen) t.Fatalf("Expected batch.Len() %v; but got %v", limit, batchLen)
} }
keys, items, err := batch.GetAll()
if err != nil { keys := store.List()
t.Fatalf("unable to get the items from the batch; %v", err) if len(keys) > 0 {
t.Fatalf("Expected empty store list but got len(list) %v", len(keys))
} }
if len(items) != int(limit) { if err := batch.Add(testItem); err != nil {
t.Fatalf("Expected length of the batch items to be %v but got %v", limit, len(items)) t.Fatalf("unable to add to the batch; %v", err)
}
if len(keys) != int(limit) {
t.Fatalf("Expected length of the batch keys to be %v but got %v", limit, len(items))
} }
batchLen = batch.Len() batchLen = batch.Len()
if batchLen != 0 { if batchLen != 1 {
t.Fatalf("expected batch to be empty but still left with %d items", batchLen) t.Fatalf("expected batch length to be 1 but got %v", batchLen)
}
// Add duplicate entries
for i := 0; i < 10; i++ {
if err := batch.Add(99, 99); err != nil {
t.Fatalf("failed to add duplicate item %v to batch after Get; %v", i, err)
}
}
if _, ok := batch.GetByKey(99); !ok {
t.Fatal("failed to get the duplicxate item by key '99' after adding")
}
keys, items, err = batch.GetAll()
if err != nil {
t.Fatalf("unable to get the items from the batch; %v", err)
}
if len(items) != 1 {
t.Fatalf("Expected length of the batch items to be 1 but got %v", len(items))
} }
keys = store.List()
if len(keys) != 1 { if len(keys) != 1 {
t.Fatalf("Expected length of the batch keys to be 1 but got %v", len(items)) t.Fatalf("expected len(store.List())=1; but got %v", len(keys))
} }
// try adding again after Get. key := keys[0]
if !key.Compress {
t.Fatal("expected key.Compress=true; but got false")
}
if key.ItemCount != int(limit) {
t.Fatalf("expected key.ItemCount=%d; but got %v", limit, key.ItemCount)
}
items, err := store.GetMultiple(key)
if err != nil {
t.Fatalf("unable to read key %v; %v", key.String(), err)
}
if len(items) != int(limit) {
t.Fatalf("expected len(items)=%d; but got %v", limit, len(items))
}
}
func TestBatchCommitOnExit(t *testing.T) {
defer func() {
if err := tearDownQueueStore(); err != nil {
t.Fatalf("Failed to tear down store; %v", err)
}
}()
store, err := setUpQueueStore(queueDir, 100)
if err != nil {
t.Fatalf("Failed to create a queue store; %v", err)
}
var limit uint32 = 100
batch := NewBatch[TestItem](BatchConfig[TestItem]{
Limit: limit,
Store: store,
CommitTimeout: 5 * time.Minute,
Log: func(ctx context.Context, err error, id string, errKind ...interface{}) {
t.Log([]any{err, id, errKind}...)
},
})
for i := 0; i < int(limit); i++ { for i := 0; i < int(limit); i++ {
if err := batch.Add(i, i); err != nil { if err := batch.Add(testItem); err != nil {
t.Fatalf("failed to add item %v to batch after Get; %v", i, err) t.Fatalf("failed to add %v; %v", i, err)
}
if _, ok := batch.GetByKey(i); !ok {
t.Fatalf("failed to get the item by key %v after adding", i)
} }
} }
batch.Close()
time.Sleep(1 * time.Second)
batchLen := batch.Len()
if batchLen != 0 {
t.Fatalf("Expected batch.Len()=0; but got %v", batchLen)
}
keys := store.List()
if len(keys) != 1 {
t.Fatalf("expected len(store.List())=1; but got %v", len(keys))
}
key := keys[0]
if !key.Compress {
t.Fatal("expected key.Compress=true; but got false")
}
if key.ItemCount != int(limit) {
t.Fatalf("expected key.ItemCount=%d; but got %v", limit, key.ItemCount)
}
items, err := store.GetMultiple(key)
if err != nil {
t.Fatalf("unable to read key %v; %v", key.String(), err)
}
if len(items) != int(limit) {
t.Fatalf("expected len(items)=%d; but got %v", limit, len(items))
}
} }
func TestBatchWithConcurrency(t *testing.T) { func TestBatchWithConcurrency(t *testing.T) {
defer func() {
if err := tearDownQueueStore(); err != nil {
t.Fatalf("Failed to tear down store; %v", err)
}
}()
store, err := setUpQueueStore(queueDir, 100)
if err != nil {
t.Fatalf("Failed to create a queue store; %v", err)
}
var limit uint32 = 100 var limit uint32 = 100
batch := NewBatch[int, int](limit)
batch := NewBatch[TestItem](BatchConfig[TestItem]{
Limit: limit,
Store: store,
CommitTimeout: 5 * time.Minute,
Log: func(ctx context.Context, err error, id string, errKind ...interface{}) {
t.Log(err)
},
})
defer batch.Close()
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < int(limit); i++ { for i := 0; i < int(limit); i++ {
wg.Add(1) wg.Add(1)
go func(item int) { go func(key int) {
defer wg.Done() defer wg.Done()
if err := batch.Add(item, item); err != nil { if err := batch.Add(testItem); err != nil {
t.Errorf("failed to add item %v; %v", item, err) t.Errorf("failed to add item %v; %v", key, err)
return return
} }
if _, ok := batch.GetByKey(item); !ok {
t.Errorf("failed to get the item by key %v after adding", item)
}
}(i) }(i)
} }
wg.Wait() wg.Wait()
keys, items, err := batch.GetAll() batchLen := batch.Len()
if batchLen != int(limit) {
t.Fatalf("Expected batch.Len() %v; but got %v", limit, batchLen)
}
keys := store.List()
if len(keys) > 0 {
t.Fatalf("Expected empty store list but got len(list) %v", len(keys))
}
if err := batch.Add(testItem); err != nil {
t.Fatalf("unable to add to the batch; %v", err)
}
batchLen = batch.Len()
if batchLen != 1 {
t.Fatalf("expected batch length to be 1 but got %v", batchLen)
}
keys = store.List()
if len(keys) != 1 {
t.Fatalf("expected len(store.List())=1; but got %v", len(keys))
}
key := keys[0]
if !key.Compress {
t.Fatal("expected key.Compress=true; but got false")
}
if key.ItemCount != int(limit) {
t.Fatalf("expected key.ItemCount=%d; but got %v", limit, key.ItemCount)
}
items, err := store.GetMultiple(key)
if err != nil { if err != nil {
t.Fatalf("unable to get the items from the batch; %v", err) t.Fatalf("unable to read key %v; %v", key.String(), err)
} }
if len(items) != int(limit) { if len(items) != int(limit) {
t.Fatalf("expected batch length %v but got %v", limit, len(items)) t.Fatalf("expected len(items)=%d; but got %v", limit, len(items))
}
if len(keys) != int(limit) {
t.Fatalf("Expected length of the batch keys to be %v but got %v", limit, len(items))
}
batchLen := batch.Len()
if batchLen != 0 {
t.Fatalf("expected batch to be empty but still left with %d items", batchLen)
} }
} }

View File

@ -18,24 +18,25 @@
package store package store
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"os" "os"
"path/filepath" "path/filepath"
"sort" "sort"
"strings"
"sync" "sync"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"github.com/klauspost/compress/s2"
"github.com/valyala/bytebufferpool" "github.com/valyala/bytebufferpool"
) )
const ( const (
defaultLimit = 100000 // Default store limit. defaultLimit = 100000 // Default store limit.
defaultExt = ".unknown" defaultExt = ".unknown"
compressExt = ".snappy"
) )
// errLimitExceeded error is sent when the maximum limit is reached. // errLimitExceeded error is sent when the maximum limit is reached.
@ -83,18 +84,12 @@ func (store *QueueStore[_]) Open() error {
return err return err
} }
// Truncate entries.
if uint64(len(files)) > store.entryLimit {
files = files[:store.entryLimit]
}
for _, file := range files { for _, file := range files {
if file.IsDir() { if file.IsDir() {
continue continue
} }
key := strings.TrimSuffix(file.Name(), store.fileExt)
if fi, err := file.Info(); err == nil { if fi, err := file.Info(); err == nil {
store.entries[key] = fi.ModTime().UnixNano() store.entries[file.Name()] = fi.ModTime().UnixNano()
} }
} }
@ -107,96 +102,138 @@ func (store *QueueStore[_]) Delete() error {
} }
// PutMultiple - puts an item to the store. // PutMultiple - puts an item to the store.
func (store *QueueStore[I]) PutMultiple(item []I) error { func (store *QueueStore[I]) PutMultiple(items []I) (Key, error) {
// Generate a new UUID for the key. // Generate a new UUID for the key.
key, err := uuid.NewRandom() uid, err := uuid.NewRandom()
if err != nil { if err != nil {
return err return Key{}, err
} }
store.Lock() store.Lock()
defer store.Unlock() defer store.Unlock()
if uint64(len(store.entries)) >= store.entryLimit { if uint64(len(store.entries)) >= store.entryLimit {
return errLimitExceeded return Key{}, errLimitExceeded
} }
return store.multiWrite(fmt.Sprintf("%d:%s", len(item), key.String()), item) key := Key{
Name: uid.String(),
ItemCount: len(items),
Compress: true,
Extension: store.fileExt,
}
return key, store.multiWrite(key, items)
} }
// multiWrite - writes an item to the directory. // multiWrite - writes an item to the directory.
func (store *QueueStore[I]) multiWrite(key string, item []I) error { func (store *QueueStore[I]) multiWrite(key Key, items []I) (err error) {
buf := bytebufferpool.Get() buf := bytebufferpool.Get()
defer bytebufferpool.Put(buf) defer bytebufferpool.Put(buf)
enc := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(buf) enc := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(buf)
for i := range item { for i := range items {
err := enc.Encode(item[i]) if err = enc.Encode(items[i]); err != nil {
if err != nil {
return err return err
} }
} }
b := buf.Bytes()
path := filepath.Join(store.directory, key+store.fileExt) path := filepath.Join(store.directory, key.String())
err := os.WriteFile(path, b, os.FileMode(0o770)) if key.Compress {
err = os.WriteFile(path, s2.Encode(nil, buf.Bytes()), os.FileMode(0o770))
} else {
err = os.WriteFile(path, buf.Bytes(), os.FileMode(0o770))
}
buf.Reset() buf.Reset()
if err != nil { if err != nil {
return err return err
} }
// Increment the item count. // Increment the item count.
store.entries[key] = time.Now().UnixNano() store.entries[key.String()] = time.Now().UnixNano()
return nil return
} }
// write - writes an item to the directory. // write - writes an item to the directory.
func (store *QueueStore[I]) write(key string, item I) error { func (store *QueueStore[I]) write(key Key, item I) error {
// Marshalls the item. // Marshalls the item.
eventData, err := json.Marshal(item) eventData, err := json.Marshal(item)
if err != nil { if err != nil {
return err return err
} }
return store.writeBytes(key, eventData)
}
path := filepath.Join(store.directory, key+store.fileExt) // writeBytes - writes bytes to the directory.
if err := os.WriteFile(path, eventData, os.FileMode(0o770)); err != nil { func (store *QueueStore[I]) writeBytes(key Key, b []byte) (err error) {
return err path := filepath.Join(store.directory, key.String())
if key.Compress {
err = os.WriteFile(path, s2.Encode(nil, b), os.FileMode(0o770))
} else {
err = os.WriteFile(path, b, os.FileMode(0o770))
} }
if err != nil {
return err
}
// Increment the item count. // Increment the item count.
store.entries[key] = time.Now().UnixNano() store.entries[key.String()] = time.Now().UnixNano()
return nil return nil
} }
// Put - puts an item to the store. // Put - puts an item to the store.
func (store *QueueStore[I]) Put(item I) error { func (store *QueueStore[I]) Put(item I) (Key, error) {
store.Lock() store.Lock()
defer store.Unlock() defer store.Unlock()
if uint64(len(store.entries)) >= store.entryLimit { if uint64(len(store.entries)) >= store.entryLimit {
return errLimitExceeded return Key{}, errLimitExceeded
} }
// Generate a new UUID for the key. // Generate a new UUID for the key.
key, err := uuid.NewRandom() uid, err := uuid.NewRandom()
if err != nil { if err != nil {
return err return Key{}, err
} }
return store.write(key.String(), item) key := Key{
Name: uid.String(),
Extension: store.fileExt,
ItemCount: 1,
}
return key, store.write(key, item)
}
// PutRaw - puts the raw bytes to the store
func (store *QueueStore[I]) PutRaw(b []byte) (Key, error) {
store.Lock()
defer store.Unlock()
if uint64(len(store.entries)) >= store.entryLimit {
return Key{}, errLimitExceeded
}
// Generate a new UUID for the key.
uid, err := uuid.NewRandom()
if err != nil {
return Key{}, err
}
key := Key{
Name: uid.String(),
Extension: store.fileExt,
}
return key, store.writeBytes(key, b)
} }
// GetRaw - gets an item from the store. // GetRaw - gets an item from the store.
func (store *QueueStore[I]) GetRaw(key string) (raw []byte, err error) { func (store *QueueStore[I]) GetRaw(key Key) (raw []byte, err error) {
store.RLock() store.RLock()
defer func(store *QueueStore[I]) { defer func(store *QueueStore[I]) {
store.RUnlock() store.RUnlock()
if err != nil { if err != nil && !os.IsNotExist(err) {
// Upon error we remove the entry. // Upon error we remove the entry.
store.Del(key) store.Del(key)
} }
}(store) }(store)
raw, err = os.ReadFile(filepath.Join(store.directory, key+store.fileExt)) raw, err = os.ReadFile(filepath.Join(store.directory, key.String()))
if err != nil { if err != nil {
return return
} }
@ -209,19 +246,19 @@ func (store *QueueStore[I]) GetRaw(key string) (raw []byte, err error) {
} }
// Get - gets an item from the store. // Get - gets an item from the store.
func (store *QueueStore[I]) Get(key string) (item I, err error) { func (store *QueueStore[I]) Get(key Key) (item I, err error) {
store.RLock() store.RLock()
defer func(store *QueueStore[I]) { defer func(store *QueueStore[I]) {
store.RUnlock() store.RUnlock()
if err != nil { if err != nil && !os.IsNotExist(err) {
// Upon error we remove the entry. // Upon error we remove the entry.
store.Del(key) store.Del(key)
} }
}(store) }(store)
var eventData []byte var eventData []byte
eventData, err = os.ReadFile(filepath.Join(store.directory, key+store.fileExt)) eventData, err = os.ReadFile(filepath.Join(store.directory, key.String()))
if err != nil { if err != nil {
return item, err return item, err
} }
@ -237,28 +274,52 @@ func (store *QueueStore[I]) Get(key string) (item I, err error) {
return item, nil return item, nil
} }
// GetMultiple will read the multi payload file and fetch the items
func (store *QueueStore[I]) GetMultiple(key Key) (items []I, err error) {
store.RLock()
defer func(store *QueueStore[I]) {
store.RUnlock()
if err != nil && !os.IsNotExist(err) {
// Upon error we remove the entry.
store.Del(key)
}
}(store)
raw, err := os.ReadFile(filepath.Join(store.directory, key.String()))
if err != nil {
return
}
var decoder *jsoniter.Decoder
if key.Compress {
decodedBytes, err := s2.Decode(nil, raw)
if err != nil {
return nil, err
}
decoder = jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(bytes.NewReader(decodedBytes))
} else {
decoder = jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(bytes.NewReader(raw))
}
for decoder.More() {
var item I
if err := decoder.Decode(&item); err != nil {
return nil, err
}
items = append(items, item)
}
return
}
// Del - Deletes an entry from the store. // Del - Deletes an entry from the store.
func (store *QueueStore[_]) Del(key string) error { func (store *QueueStore[_]) Del(key Key) error {
store.Lock() store.Lock()
defer store.Unlock() defer store.Unlock()
return store.del(key) return store.del(key)
} }
// DelList - Deletes a list of entries from the store.
// Returns an error even if one key fails to be deleted.
func (store *QueueStore[_]) DelList(keys []string) error {
store.Lock()
defer store.Unlock()
for _, key := range keys {
if err := store.del(key); err != nil {
return err
}
}
return nil
}
// Len returns the entry count. // Len returns the entry count.
func (store *QueueStore[_]) Len() int { func (store *QueueStore[_]) Len() int {
store.RLock() store.RLock()
@ -268,30 +329,35 @@ func (store *QueueStore[_]) Len() int {
} }
// lockless call // lockless call
func (store *QueueStore[_]) del(key string) error { func (store *QueueStore[_]) del(key Key) error {
err := os.Remove(filepath.Join(store.directory, key+store.fileExt)) err := os.Remove(filepath.Join(store.directory, key.String()))
// Delete as entry no matter the result // Delete as entry no matter the result
delete(store.entries, key) delete(store.entries, key.String())
return err return err
} }
// List - lists all files registered in the store. // List - lists all files registered in the store.
func (store *QueueStore[_]) List() ([]string, error) { func (store *QueueStore[_]) List() (keys []Key) {
store.RLock() store.RLock()
l := make([]string, 0, len(store.entries)) defer store.RUnlock()
for k := range store.entries {
l = append(l, k) entries := make([]string, 0, len(store.entries))
for entry := range store.entries {
entries = append(entries, entry)
} }
// Sort entries... // Sort entries...
sort.Slice(l, func(i, j int) bool { sort.Slice(entries, func(i, j int) bool {
return store.entries[l[i]] < store.entries[l[j]] return store.entries[entries[i]] < store.entries[entries[j]]
}) })
store.RUnlock()
return l, nil for i := range entries {
keys = append(keys, parseKey(entries[i]))
}
return keys
} }
// list will read all entries from disk. // list will read all entries from disk.
@ -318,9 +384,3 @@ func (store *QueueStore[_]) list() ([]os.DirEntry, error) {
return files, nil return files, nil
} }
// Extension will return the file extension used
// for the items stored in the queue.
func (store *QueueStore[_]) Extension() string {
return store.fileExt
}

View File

@ -18,10 +18,10 @@
package store package store
import ( import (
"fmt"
"os" "os"
"path/filepath" "path/filepath"
"reflect" "reflect"
"strings"
"testing" "testing"
) )
@ -66,17 +66,14 @@ func TestQueueStorePut(t *testing.T) {
} }
// Put 100 items. // Put 100 items.
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
if err := store.Put(testItem); err != nil { if _, err := store.Put(testItem); err != nil {
t.Fatal("Failed to put to queue store ", err) t.Fatal("Failed to put to queue store ", err)
} }
} }
// Count the items. // Count the items.
names, err := store.List() keys := store.List()
if err != nil { if len(keys) != 100 {
t.Fatal(err) t.Fatalf("List() Expected: 100, got %d", len(keys))
}
if len(names) != 100 {
t.Fatalf("List() Expected: 100, got %d", len(names))
} }
} }
@ -93,18 +90,15 @@ func TestQueueStoreGet(t *testing.T) {
} }
// Put 10 items // Put 10 items
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
if err := store.Put(testItem); err != nil { if _, err := store.Put(testItem); err != nil {
t.Fatal("Failed to put to queue store ", err) t.Fatal("Failed to put to queue store ", err)
} }
} }
itemKeys, err := store.List() itemKeys := store.List()
if err != nil {
t.Fatal(err)
}
// Get 10 items. // Get 10 items.
if len(itemKeys) == 10 { if len(itemKeys) == 10 {
for _, key := range itemKeys { for _, key := range itemKeys {
item, eErr := store.Get(strings.TrimSuffix(key, testItemExt)) item, eErr := store.Get(key)
if eErr != nil { if eErr != nil {
t.Fatal("Failed to Get the item from the queue store ", eErr) t.Fatal("Failed to Get the item from the queue store ", eErr)
} }
@ -130,18 +124,15 @@ func TestQueueStoreDel(t *testing.T) {
} }
// Put 20 items. // Put 20 items.
for i := 0; i < 20; i++ { for i := 0; i < 20; i++ {
if err := store.Put(testItem); err != nil { if _, err := store.Put(testItem); err != nil {
t.Fatal("Failed to put to queue store ", err) t.Fatal("Failed to put to queue store ", err)
} }
} }
itemKeys, err := store.List() itemKeys := store.List()
if err != nil {
t.Fatal(err)
}
// Remove all the items. // Remove all the items.
if len(itemKeys) == 20 { if len(itemKeys) == 20 {
for _, key := range itemKeys { for _, key := range itemKeys {
err := store.Del(strings.TrimSuffix(key, testItemExt)) err := store.Del(key)
if err != nil { if err != nil {
t.Fatal("queue store Del failed with ", err) t.Fatal("queue store Del failed with ", err)
} }
@ -150,12 +141,9 @@ func TestQueueStoreDel(t *testing.T) {
t.Fatalf("List() Expected: 20, got %d", len(itemKeys)) t.Fatalf("List() Expected: 20, got %d", len(itemKeys))
} }
names, err := store.List() keys := store.List()
if err != nil { if len(keys) != 0 {
t.Fatal(err) t.Fatalf("List() Expected: 0, got %d", len(keys))
}
if len(names) != 0 {
t.Fatalf("List() Expected: 0, got %d", len(names))
} }
} }
@ -172,12 +160,12 @@ func TestQueueStoreLimit(t *testing.T) {
t.Fatal("Failed to create a queue store ", err) t.Fatal("Failed to create a queue store ", err)
} }
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
if err := store.Put(testItem); err != nil { if _, err := store.Put(testItem); err != nil {
t.Fatal("Failed to put to queue store ", err) t.Fatal("Failed to put to queue store ", err)
} }
} }
// Should not allow 6th Put. // Should not allow 6th Put.
if err := store.Put(testItem); err == nil { if _, err := store.Put(testItem); err == nil {
t.Fatalf("Expected to fail with %s, but passes", errLimitExceeded) t.Fatalf("Expected to fail with %s, but passes", errLimitExceeded)
} }
} }
@ -194,18 +182,15 @@ func TestQueueStoreListN(t *testing.T) {
t.Fatal("Failed to create a queue store ", err) t.Fatal("Failed to create a queue store ", err)
} }
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
if err := store.Put(testItem); err != nil { if _, err := store.Put(testItem); err != nil {
t.Fatal("Failed to put to queue store ", err) t.Fatal("Failed to put to queue store ", err)
} }
} }
// Should return all the item keys in the store. // Should return all the item keys in the store.
names, err := store.List() keys := store.List()
if err != nil {
t.Fatal(err)
}
if len(names) != 10 { if len(keys) != 10 {
t.Fatalf("List() Expected: 10, got %d", len(names)) t.Fatalf("List() Expected: 10, got %d", len(keys))
} }
// re-open // re-open
@ -213,28 +198,154 @@ func TestQueueStoreListN(t *testing.T) {
if err != nil { if err != nil {
t.Fatal("Failed to create a queue store ", err) t.Fatal("Failed to create a queue store ", err)
} }
names, err = store.List() keys = store.List()
if err != nil {
t.Fatal(err)
}
if len(names) != 10 { if len(keys) != 10 {
t.Fatalf("List() Expected: 10, got %d", len(names)) t.Fatalf("List() Expected: 10, got %d", len(keys))
} }
if len(names) != store.Len() { if len(keys) != store.Len() {
t.Fatalf("List() Expected: 10, got %d", len(names)) t.Fatalf("List() Expected: 10, got %d", len(keys))
} }
// Delete all // Delete all
for _, key := range names { for _, key := range keys {
err := store.Del(key) err := store.Del(key)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
// Re-list // Re-list
lst, err := store.List() keys = store.List()
if len(lst) > 0 || err != nil { if len(keys) > 0 || err != nil {
t.Fatalf("Expected List() to return empty list and no error, got %v err: %v", lst, err) t.Fatalf("Expected List() to return empty list and no error, got %v err: %v", keys, err)
}
}
func TestMultiplePutGets(t *testing.T) {
defer func() {
if err := tearDownQueueStore(); err != nil {
t.Fatalf("Failed to tear down store; %v", err)
}
}()
store, err := setUpQueueStore(queueDir, 10)
if err != nil {
t.Fatalf("Failed to create a queue store; %v", err)
}
// TestItem{Name: "test-item", Property: "property"}
var items []TestItem
for i := 0; i < 10; i++ {
items = append(items, TestItem{
Name: fmt.Sprintf("test-item-%d", i),
Property: "property",
})
}
if _, err := store.PutMultiple(items); err != nil {
t.Fatalf("failed to put multiple; %v", err)
}
keys := store.List()
if len(keys) != 1 {
t.Fatalf("expected len(keys)=1, but found %d", len(keys))
}
key := keys[0]
if !key.Compress {
t.Fatal("expected the item to be compressed")
}
if key.ItemCount != 10 {
t.Fatalf("expected itemcount=10 but found %v", key.ItemCount)
}
resultItems, err := store.GetMultiple(key)
if err != nil {
t.Fatalf("unable to get multiple items; %v", err)
}
if !reflect.DeepEqual(resultItems, items) {
t.Fatalf("expected item list: %v; but got %v", items, resultItems)
}
if err := store.Del(key); err != nil {
t.Fatalf("unable to Del; %v", err)
}
// Re-list
keys = store.List()
if len(keys) > 0 || err != nil {
t.Fatalf("Expected List() to return empty list and no error, got %v err: %v", keys, err)
}
}
func TestMixedPutGets(t *testing.T) {
defer func() {
if err := tearDownQueueStore(); err != nil {
t.Fatalf("Failed to tear down store; %v", err)
}
}()
store, err := setUpQueueStore(queueDir, 10)
if err != nil {
t.Fatalf("Failed to create a queue store; %v", err)
}
// TestItem{Name: "test-item", Property: "property"}
var items []TestItem
for i := 0; i < 5; i++ {
items = append(items, TestItem{
Name: fmt.Sprintf("test-item-%d", i),
Property: "property",
})
}
if _, err := store.PutMultiple(items); err != nil {
t.Fatalf("failed to put multiple; %v", err)
}
for i := 5; i < 10; i++ {
item := TestItem{
Name: fmt.Sprintf("test-item-%d", i),
Property: "property",
}
if _, err := store.Put(item); err != nil {
t.Fatalf("unable to store.Put(); %v", err)
}
items = append(items, item)
}
keys := store.List()
if len(keys) != 6 {
// 1 multiple + 5 single PUTs
t.Fatalf("expected len(keys)=6, but found %d", len(keys))
}
var resultItems []TestItem
for _, key := range keys {
if key.ItemCount > 1 {
items, err := store.GetMultiple(key)
if err != nil {
t.Fatalf("unable to get multiple items; %v", err)
}
resultItems = append(resultItems, items...)
continue
}
item, err := store.Get(key)
if err != nil {
t.Fatalf("unable to get item; %v", err)
}
resultItems = append(resultItems, item)
}
if !reflect.DeepEqual(resultItems, items) {
t.Fatalf("expected item list: %v; but got %v", items, resultItems)
}
// Delete all
for _, key := range keys {
if err := store.Del(key); err != nil {
t.Fatalf("unable to Del; %v", err)
}
}
// Re-list
keys = store.List()
if len(keys) > 0 || err != nil {
t.Fatalf("Expected List() to return empty list and no error, got %v err: %v", keys, err)
} }
} }

View File

@ -21,6 +21,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strconv"
"strings" "strings"
"time" "time"
@ -44,23 +45,64 @@ type Target interface {
// Store - Used to persist items. // Store - Used to persist items.
type Store[I any] interface { type Store[I any] interface {
Put(item I) error Put(item I) (Key, error)
PutMultiple(item []I) error PutMultiple(item []I) (Key, error)
Get(key string) (I, error) Get(key Key) (I, error)
GetRaw(key string) ([]byte, error) GetMultiple(key Key) ([]I, error)
GetRaw(key Key) ([]byte, error)
PutRaw(b []byte) (Key, error)
Len() int Len() int
List() ([]string, error) List() []Key
Del(key string) error Del(key Key) error
DelList(key []string) error
Open() error Open() error
Delete() error Delete() error
Extension() string
} }
// Key denotes the key present in the store. // Key denotes the key present in the store.
type Key struct { type Key struct {
Name string Name string
IsLast bool Compress bool
Extension string
ItemCount int
}
// String returns the filepath name
func (k Key) String() string {
keyStr := k.Name
if k.ItemCount > 1 {
keyStr = fmt.Sprintf("%d:%s", k.ItemCount, k.Name)
}
return keyStr + k.Extension + func() string {
if k.Compress {
return compressExt
}
return ""
}()
}
func getItemCount(k string) (count int, err error) {
count = 1
v := strings.Split(k, ":")
if len(v) == 2 {
return strconv.Atoi(v[0])
}
return
}
func parseKey(k string) (key Key) {
key.Name = k
if strings.HasSuffix(k, compressExt) {
key.Compress = true
key.Name = strings.TrimSuffix(key.Name, compressExt)
}
if key.ItemCount, _ = getItemCount(k); key.ItemCount > 1 {
key.Name = strings.TrimPrefix(key.Name, fmt.Sprintf("%d:", key.ItemCount))
}
if vals := strings.Split(key.Name, "."); len(vals) == 2 {
key.Extension = "." + vals[1]
key.Name = strings.TrimSuffix(key.Name, key.Extension)
}
return
} }
// replayItems - Reads the items from the store and replays. // replayItems - Reads the items from the store and replays.
@ -74,18 +116,12 @@ func replayItems[I any](store Store[I], doneCh <-chan struct{}, log logger, id s
defer retryTicker.Stop() defer retryTicker.Stop()
for { for {
names, err := store.List() for _, key := range store.List() {
if err != nil { select {
log(context.Background(), fmt.Errorf("store.List() failed with: %w", err), id) case keyCh <- key:
} else { // Get next key.
keyCount := len(names) case <-doneCh:
for i, name := range names { return
select {
case keyCh <- Key{strings.TrimSuffix(name, store.Extension()), keyCount == i+1}:
// Get next key.
case <-doneCh:
return
}
} }
} }
@ -114,7 +150,7 @@ func sendItems(target Target, keyCh <-chan Key, doneCh <-chan struct{}, logger l
logger( logger(
context.Background(), context.Background(),
fmt.Errorf("unable to send webhook log entry to '%s' err '%w'", target.Name(), err), fmt.Errorf("unable to send log entry to '%s' err '%w'", target.Name(), err),
target.Name(), target.Name(),
) )

View File

@ -0,0 +1,146 @@
// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package store
import (
"testing"
)
func TestKeyString(t *testing.T) {
testCases := []struct {
key Key
expectedString string
}{
{
key: Key{
Name: "01894394-d046-4783-ba0d-f1c6885790dc",
Extension: ".event",
},
expectedString: "01894394-d046-4783-ba0d-f1c6885790dc.event",
},
{
key: Key{
Name: "01894394-d046-4783-ba0d-f1c6885790dc",
Compress: true,
Extension: ".event",
ItemCount: 100,
},
expectedString: "100:01894394-d046-4783-ba0d-f1c6885790dc.event.snappy",
},
{
key: Key{
Name: "01894394-d046-4783-ba0d-f1c6885790dc",
Extension: ".event",
ItemCount: 100,
},
expectedString: "100:01894394-d046-4783-ba0d-f1c6885790dc.event",
},
{
key: Key{
Name: "01894394-d046-4783-ba0d-f1c6885790dc",
Compress: true,
Extension: ".event",
ItemCount: 1,
},
expectedString: "01894394-d046-4783-ba0d-f1c6885790dc.event.snappy",
},
{
key: Key{
Name: "01894394-d046-4783-ba0d-f1c6885790dc",
Extension: ".event",
ItemCount: 1,
},
expectedString: "01894394-d046-4783-ba0d-f1c6885790dc.event",
},
}
for i, testCase := range testCases {
if testCase.expectedString != testCase.key.String() {
t.Fatalf("case[%v]: key.String() Expected: %s, got %s", i, testCase.expectedString, testCase.key.String())
}
}
}
func TestParseKey(t *testing.T) {
testCases := []struct {
str string
expectedKey Key
}{
{
str: "01894394-d046-4783-ba0d-f1c6885790dc.event",
expectedKey: Key{
Name: "01894394-d046-4783-ba0d-f1c6885790dc",
Extension: ".event",
ItemCount: 1,
},
},
{
str: "100:01894394-d046-4783-ba0d-f1c6885790dc.event.snappy",
expectedKey: Key{
Name: "01894394-d046-4783-ba0d-f1c6885790dc",
Compress: true,
Extension: ".event",
ItemCount: 100,
},
},
{
str: "100:01894394-d046-4783-ba0d-f1c6885790dc.event",
expectedKey: Key{
Name: "01894394-d046-4783-ba0d-f1c6885790dc",
Extension: ".event",
ItemCount: 100,
},
},
{
str: "01894394-d046-4783-ba0d-f1c6885790dc.event.snappy",
expectedKey: Key{
Name: "01894394-d046-4783-ba0d-f1c6885790dc",
Compress: true,
Extension: ".event",
ItemCount: 1,
},
},
{
str: "01894394-d046-4783-ba0d-f1c6885790dc.event",
expectedKey: Key{
Name: "01894394-d046-4783-ba0d-f1c6885790dc",
Extension: ".event",
ItemCount: 1,
},
},
}
for i, testCase := range testCases {
key := parseKey(testCase.str)
if testCase.expectedKey.Name != key.Name {
t.Fatalf("case[%v]: Expected key.Name: %v, got %v", i, testCase.expectedKey.Name, key.Name)
}
if testCase.expectedKey.Compress != key.Compress {
t.Fatalf("case[%v]: Expected key.Compress: %v, got %v", i, testCase.expectedKey.Compress, key.Compress)
}
if testCase.expectedKey.Extension != key.Extension {
t.Fatalf("case[%v]: Expected key.Extension: %v, got %v", i, testCase.expectedKey.Extension, key.Extension)
}
if testCase.expectedKey.ItemCount != key.ItemCount {
t.Fatalf("case[%v]: Expected key.ItemCount: %v, got %v", i, testCase.expectedKey.ItemCount, key.ItemCount)
}
if testCase.expectedKey.String() != key.String() {
t.Fatalf("case[%v]: Expected key.String(): %v, got %v", i, testCase.expectedKey.String(), key.String())
}
}
}