diff --git a/go.mod b/go.mod
index 89a446c58..3ccc29d06 100644
--- a/go.mod
+++ b/go.mod
@@ -50,7 +50,7 @@ require (
github.com/minio/madmin-go/v2 v2.0.17
github.com/minio/minio-go/v7 v7.0.50
github.com/minio/mux v1.9.0
- github.com/minio/pkg v1.6.5
+ github.com/minio/pkg v1.6.6-0.20230330040824-5db111e5f63c
github.com/minio/selfupdate v0.6.0
github.com/minio/sha256-simd v1.0.0
github.com/minio/simdjson-go v0.4.5
diff --git a/go.sum b/go.sum
index 49a5f05af..7d9318124 100644
--- a/go.sum
+++ b/go.sum
@@ -786,8 +786,8 @@ github.com/minio/minio-go/v7 v7.0.50/go.mod h1:IbbodHyjUAguneyucUaahv+VMNs/EOTV9
github.com/minio/mux v1.9.0 h1:dWafQFyEfGhJvK6AwLOt83bIG5bxKxKJnKMCi0XAaoA=
github.com/minio/mux v1.9.0/go.mod h1:1pAare17ZRL5GpmNL+9YmqHoWnLmMZF9C/ioUCfy0BQ=
github.com/minio/pkg v1.5.4/go.mod h1:2MOaRFdmFKULD+uOLc3qHLGTQTuxCNPKNPfLBTxC8CA=
-github.com/minio/pkg v1.6.5 h1:T9cRNcCLJTFFgQGH0Rzr1CtAWLAIchTsbE0lSztCf40=
-github.com/minio/pkg v1.6.5/go.mod h1:0iX1IuJGSCnMvIvrEJauk1GgQSX9JdU6Kh0P3EQRGkI=
+github.com/minio/pkg v1.6.6-0.20230330040824-5db111e5f63c h1:Ukw0+d0T/+9lserJfodr4HGIIb3hLkt8tlgMh2vUfZg=
+github.com/minio/pkg v1.6.6-0.20230330040824-5db111e5f63c/go.mod h1:0iX1IuJGSCnMvIvrEJauk1GgQSX9JdU6Kh0P3EQRGkI=
github.com/minio/selfupdate v0.6.0 h1:i76PgT0K5xO9+hjzKcacQtO7+MjJ4JKA8Ak8XQ9DDwU=
github.com/minio/selfupdate v0.6.0/go.mod h1:bO02GTIPCMQFTEvE5h4DjYB58bCoZ35XLeBf0buTDdM=
github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g=
diff --git a/internal/event/event.go b/internal/event/event.go
index 7c6846978..e01a801a6 100644
--- a/internal/event/event.go
+++ b/internal/event/event.go
@@ -30,6 +30,9 @@ const (
// AMZTimeFormat - event time format.
AMZTimeFormat = "2006-01-02T15:04:05.000Z"
+
+ // StoreExtension - file extension of an event file in store
+ StoreExtension = ".event"
)
// Identity represents access key who caused the event.
diff --git a/internal/event/target/amqp.go b/internal/event/target/amqp.go
index c838a5cc5..cc2356a72 100644
--- a/internal/event/target/amqp.go
+++ b/internal/event/target/amqp.go
@@ -30,6 +30,7 @@ import (
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/logger"
+ "github.com/minio/minio/internal/store"
xnet "github.com/minio/pkg/net"
"github.com/rabbitmq/amqp091-go"
)
@@ -117,7 +118,7 @@ type AMQPTarget struct {
args AMQPArgs
conn *amqp091.Connection
connMutex sync.Mutex
- store Store
+ store store.Store[event.Event]
loggerOnce logger.LogOnce
quitCh chan struct{}
@@ -128,6 +129,11 @@ func (target *AMQPTarget) ID() event.TargetID {
return target.id
}
+// Name - returns the Name of the target.
+func (target *AMQPTarget) Name() string {
+ return target.ID().String()
+}
+
// Store returns any underlying store if set.
func (target *AMQPTarget) Store() event.TargetStore {
return target.store
@@ -197,8 +203,8 @@ func (target *AMQPTarget) channel() (*amqp091.Channel, chan amqp091.Confirmation
conn, err = amqp091.Dial(target.args.URL.String())
if err != nil {
- if IsConnRefusedErr(err) {
- return nil, nil, errNotConnected
+ if xnet.IsConnRefusedErr(err) {
+ return nil, nil, store.ErrNotConnected
}
return nil, nil, err
}
@@ -330,7 +336,7 @@ func (target *AMQPTarget) init() error {
func (target *AMQPTarget) initAMQP() error {
conn, err := amqp091.Dial(target.args.URL.String())
if err != nil {
- if IsConnRefusedErr(err) || IsConnResetErr(err) {
+ if xnet.IsConnRefusedErr(err) || xnet.IsConnResetErr(err) {
target.loggerOnce(context.Background(), err, target.ID().String())
}
return err
@@ -342,11 +348,11 @@ func (target *AMQPTarget) initAMQP() error {
// NewAMQPTarget - creates new AMQP target.
func NewAMQPTarget(id string, args AMQPArgs, loggerOnce logger.LogOnce) (*AMQPTarget, error) {
- var store Store
+ var queueStore store.Store[event.Event]
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-amqp-"+id)
- store = NewQueueStore(queueDir, args.QueueLimit)
- if err := store.Open(); err != nil {
+ queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
+ if err := queueStore.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of AMQP `%s`: %w", id, err)
}
}
@@ -355,12 +361,12 @@ func NewAMQPTarget(id string, args AMQPArgs, loggerOnce logger.LogOnce) (*AMQPTa
id: event.TargetID{ID: id, Name: "amqp"},
args: args,
loggerOnce: loggerOnce,
- store: store,
+ store: queueStore,
quitCh: make(chan struct{}),
}
if target.store != nil {
- streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
+ store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
diff --git a/internal/event/target/common.go b/internal/event/target/common.go
deleted file mode 100644
index 9a5eae111..000000000
--- a/internal/event/target/common.go
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright (c) 2015-2021 MinIO, Inc.
-//
-// This file is part of MinIO Object Storage stack
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package target
-
-import "github.com/google/uuid"
-
-func getNewUUID() (string, error) {
- u, err := uuid.NewRandom()
- if err != nil {
- return "", err
- }
-
- return u.String(), nil
-}
diff --git a/internal/event/target/elasticsearch.go b/internal/event/target/elasticsearch.go
index 80a9b81e1..b8f6eee38 100644
--- a/internal/event/target/elasticsearch.go
+++ b/internal/event/target/elasticsearch.go
@@ -36,6 +36,7 @@ import (
"github.com/minio/highwayhash"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/logger"
+ "github.com/minio/minio/internal/store"
xnet "github.com/minio/pkg/net"
"github.com/pkg/errors"
)
@@ -157,7 +158,7 @@ type ElasticsearchTarget struct {
id event.TargetID
args ElasticsearchArgs
client esClient
- store Store
+ store store.Store[event.Event]
loggerOnce logger.LogOnce
quitCh chan struct{}
}
@@ -167,6 +168,11 @@ func (target *ElasticsearchTarget) ID() event.TargetID {
return target.id
}
+// Name - returns the Name of the target.
+func (target *ElasticsearchTarget) Name() string {
+ return target.ID().String()
+}
+
// Store returns any underlying store if set.
func (target *ElasticsearchTarget) Store() event.TargetStore {
return target.store
@@ -212,7 +218,7 @@ func (target *ElasticsearchTarget) Save(eventData event.Event) error {
err = target.send(eventData)
if xnet.IsNetworkOrHostDown(err, false) {
- return errNotConnected
+ return store.ErrNotConnected
}
return err
}
@@ -284,7 +290,7 @@ func (target *ElasticsearchTarget) Send(eventKey string) error {
if err := target.send(eventData); err != nil {
if xnet.IsNetworkOrHostDown(err, false) {
- return errNotConnected
+ return store.ErrNotConnected
}
return err
}
@@ -348,7 +354,7 @@ func (target *ElasticsearchTarget) initElasticsearch() error {
err := target.checkAndInitClient(ctx)
if err != nil {
- if err != errNotConnected {
+ if err != store.ErrNotConnected {
target.loggerOnce(context.Background(), err, target.ID().String())
}
return err
@@ -359,11 +365,11 @@ func (target *ElasticsearchTarget) initElasticsearch() error {
// NewElasticsearchTarget - creates new Elasticsearch target.
func NewElasticsearchTarget(id string, args ElasticsearchArgs, loggerOnce logger.LogOnce) (*ElasticsearchTarget, error) {
- var store Store
+ var queueStore store.Store[event.Event]
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id)
- store = NewQueueStore(queueDir, args.QueueLimit)
- if err := store.Open(); err != nil {
+ queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
+ if err := queueStore.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of Elasticsearch `%s`: %w", id, err)
}
}
@@ -371,13 +377,13 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, loggerOnce logger
target := &ElasticsearchTarget{
id: event.TargetID{ID: id, Name: "elasticsearch"},
args: args,
- store: store,
+ store: queueStore,
loggerOnce: loggerOnce,
quitCh: make(chan struct{}),
}
if target.store != nil {
- streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
+ store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
@@ -415,7 +421,7 @@ func (c *esClientV7) getServerSupportStatus(ctx context.Context) (ESSupportStatu
c.Info.WithContext(ctx),
)
if err != nil {
- return ESSUnknown, "", errNotConnected
+ return ESSUnknown, "", store.ErrNotConnected
}
defer resp.Body.Close()
@@ -485,7 +491,7 @@ func (c *esClientV7) ping(ctx context.Context, _ ElasticsearchArgs) (bool, error
c.Ping.WithContext(ctx),
)
if err != nil {
- return false, errNotConnected
+ return false, store.ErrNotConnected
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
diff --git a/internal/event/target/kafka.go b/internal/event/target/kafka.go
index de2483f75..9894a42d4 100644
--- a/internal/event/target/kafka.go
+++ b/internal/event/target/kafka.go
@@ -31,6 +31,7 @@ import (
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/logger"
+ "github.com/minio/minio/internal/store"
xnet "github.com/minio/pkg/net"
"github.com/Shopify/sarama"
@@ -129,7 +130,7 @@ type KafkaTarget struct {
args KafkaArgs
producer sarama.SyncProducer
config *sarama.Config
- store Store
+ store store.Store[event.Event]
loggerOnce logger.LogOnce
quitCh chan struct{}
}
@@ -139,6 +140,11 @@ func (target *KafkaTarget) ID() event.TargetID {
return target.id
}
+// Name - returns the Name of the target.
+func (target *KafkaTarget) Name() string {
+ return target.ID().String()
+}
+
// Store returns any underlying store if set.
func (target *KafkaTarget) Store() event.TargetStore {
return target.store
@@ -154,7 +160,7 @@ func (target *KafkaTarget) IsActive() (bool, error) {
func (target *KafkaTarget) isActive() (bool, error) {
if !target.args.pingBrokers() {
- return false, errNotConnected
+ return false, store.ErrNotConnected
}
return true, nil
}
@@ -178,7 +184,7 @@ func (target *KafkaTarget) Save(eventData event.Event) error {
// send - sends an event to the kafka.
func (target *KafkaTarget) send(eventData event.Event) error {
if target.producer == nil {
- return errNotConnected
+ return store.ErrNotConnected
}
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
@@ -224,7 +230,7 @@ func (target *KafkaTarget) Send(eventKey string) error {
if err != sarama.ErrOutOfBrokers {
return err
}
- return errNotConnected
+ return store.ErrNotConnected
}
}
@@ -242,7 +248,7 @@ func (target *KafkaTarget) Send(eventKey string) error {
if err != nil {
// Sarama opens the ciruit breaker after 3 consecutive connection failures.
if err == sarama.ErrLeaderNotAvailable || err.Error() == "circuit breaker is open" {
- return errNotConnected
+ return store.ErrNotConnected
}
return err
}
@@ -330,7 +336,7 @@ func (target *KafkaTarget) initKafka() error {
return err
}
if !yes {
- return errNotConnected
+ return store.ErrNotConnected
}
return nil
@@ -338,11 +344,11 @@ func (target *KafkaTarget) initKafka() error {
// NewKafkaTarget - creates new Kafka target with auth credentials.
func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*KafkaTarget, error) {
- var store Store
+ var queueStore store.Store[event.Event]
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-kafka-"+id)
- store = NewQueueStore(queueDir, args.QueueLimit)
- if err := store.Open(); err != nil {
+ queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
+ if err := queueStore.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of Kafka `%s`: %w", id, err)
}
}
@@ -350,13 +356,13 @@ func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*Kafk
target := &KafkaTarget{
id: event.TargetID{ID: id, Name: "kafka"},
args: args,
- store: store,
+ store: queueStore,
loggerOnce: loggerOnce,
quitCh: make(chan struct{}),
}
if target.store != nil {
- streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
+ store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
diff --git a/internal/event/target/mqtt.go b/internal/event/target/mqtt.go
index 4f83f55c7..c5f315c1d 100644
--- a/internal/event/target/mqtt.go
+++ b/internal/event/target/mqtt.go
@@ -31,6 +31,7 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/logger"
+ "github.com/minio/minio/internal/store"
xnet "github.com/minio/pkg/net"
)
@@ -111,7 +112,7 @@ type MQTTTarget struct {
id event.TargetID
args MQTTArgs
client mqtt.Client
- store Store
+ store store.Store[event.Event]
quitCh chan struct{}
loggerOnce logger.LogOnce
}
@@ -121,6 +122,11 @@ func (target *MQTTTarget) ID() event.TargetID {
return target.id
}
+// Name - returns the Name of the target.
+func (target *MQTTTarget) Name() string {
+ return target.ID().String()
+}
+
// Store returns any underlying store if set.
func (target *MQTTTarget) Store() event.TargetStore {
return target.store
@@ -136,7 +142,7 @@ func (target *MQTTTarget) IsActive() (bool, error) {
func (target *MQTTTarget) isActive() (bool, error) {
if !target.client.IsConnectionOpen() {
- return false, errNotConnected
+ return false, store.ErrNotConnected
}
return true, nil
}
@@ -156,7 +162,7 @@ func (target *MQTTTarget) send(eventData event.Event) error {
token := target.client.Publish(target.args.Topic, target.args.QoS, false, string(data))
if !token.WaitTimeout(reconnectInterval) {
- return errNotConnected
+ return store.ErrNotConnected
}
return token.Error()
}
@@ -245,7 +251,7 @@ func (target *MQTTTarget) initMQTT() error {
token := target.client.Connect()
ok := token.WaitTimeout(reconnectInterval)
if !ok {
- return errNotConnected
+ return store.ErrNotConnected
}
if token.Error() != nil {
return token.Error()
@@ -256,7 +262,7 @@ func (target *MQTTTarget) initMQTT() error {
return err
}
if !yes {
- return errNotConnected
+ return store.ErrNotConnected
}
return nil
@@ -274,11 +280,11 @@ func NewMQTTTarget(id string, args MQTTArgs, loggerOnce logger.LogOnce) (*MQTTTa
args.KeepAlive = 10 * time.Second
}
- var store Store
+ var queueStore store.Store[event.Event]
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id)
- store = NewQueueStore(queueDir, args.QueueLimit)
- if err := store.Open(); err != nil {
+ queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
+ if err := queueStore.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of MQTT `%s`: %w", id, err)
}
}
@@ -286,13 +292,13 @@ func NewMQTTTarget(id string, args MQTTArgs, loggerOnce logger.LogOnce) (*MQTTTa
target := &MQTTTarget{
id: event.TargetID{ID: id, Name: "mqtt"},
args: args,
- store: store,
+ store: queueStore,
quitCh: make(chan struct{}),
loggerOnce: loggerOnce,
}
if target.store != nil {
- streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
+ store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
diff --git a/internal/event/target/mysql.go b/internal/event/target/mysql.go
index 7b985e7d5..e5ca61515 100644
--- a/internal/event/target/mysql.go
+++ b/internal/event/target/mysql.go
@@ -33,6 +33,7 @@ import (
"github.com/go-sql-driver/mysql"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/logger"
+ "github.com/minio/minio/internal/store"
xnet "github.com/minio/pkg/net"
)
@@ -153,7 +154,7 @@ type MySQLTarget struct {
deleteStmt *sql.Stmt
insertStmt *sql.Stmt
db *sql.DB
- store Store
+ store store.Store[event.Event]
firstPing bool
loggerOnce logger.LogOnce
@@ -165,6 +166,11 @@ func (target *MySQLTarget) ID() event.TargetID {
return target.id
}
+// Name - returns the Name of the target.
+func (target *MySQLTarget) Name() string {
+ return target.ID().String()
+}
+
// Store returns any underlying store if set.
func (target *MySQLTarget) Store() event.TargetStore {
return target.store
@@ -181,7 +187,7 @@ func (target *MySQLTarget) IsActive() (bool, error) {
func (target *MySQLTarget) isActive() (bool, error) {
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
- return false, errNotConnected
+ return false, store.ErrNotConnected
}
return false, err
}
@@ -260,7 +266,7 @@ func (target *MySQLTarget) Send(eventKey string) error {
if !target.firstPing {
if err := target.executeStmts(); err != nil {
if IsConnErr(err) {
- return errNotConnected
+ return store.ErrNotConnected
}
return err
}
@@ -278,7 +284,7 @@ func (target *MySQLTarget) Send(eventKey string) error {
if err := target.send(eventData); err != nil {
if IsConnErr(err) {
- return errNotConnected
+ return store.ErrNotConnected
}
return err
}
@@ -363,7 +369,7 @@ func (target *MySQLTarget) initMySQL() error {
err = target.db.Ping()
if err != nil {
- if !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
+ if !(xnet.IsConnRefusedErr(err) || xnet.IsConnResetErr(err)) {
target.loggerOnce(context.Background(), err, target.ID().String())
}
} else {
@@ -384,7 +390,7 @@ func (target *MySQLTarget) initMySQL() error {
return err
}
if !yes {
- return errNotConnected
+ return store.ErrNotConnected
}
return nil
@@ -392,11 +398,11 @@ func (target *MySQLTarget) initMySQL() error {
// NewMySQLTarget - creates new MySQL target.
func NewMySQLTarget(id string, args MySQLArgs, loggerOnce logger.LogOnce) (*MySQLTarget, error) {
- var store Store
+ var queueStore store.Store[event.Event]
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mysql-"+id)
- store = NewQueueStore(queueDir, args.QueueLimit)
- if err := store.Open(); err != nil {
+ queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
+ if err := queueStore.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of MySQL `%s`: %w", id, err)
}
}
@@ -419,13 +425,13 @@ func NewMySQLTarget(id string, args MySQLArgs, loggerOnce logger.LogOnce) (*MySQ
id: event.TargetID{ID: id, Name: "mysql"},
args: args,
firstPing: false,
- store: store,
+ store: queueStore,
loggerOnce: loggerOnce,
quitCh: make(chan struct{}),
}
if target.store != nil {
- streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
+ store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
diff --git a/internal/event/target/nats.go b/internal/event/target/nats.go
index 70c0f8d8d..ceeec6e32 100644
--- a/internal/event/target/nats.go
+++ b/internal/event/target/nats.go
@@ -28,8 +28,10 @@ import (
"os"
"path/filepath"
+ "github.com/google/uuid"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/logger"
+ "github.com/minio/minio/internal/store"
xnet "github.com/minio/pkg/net"
"github.com/nats-io/nats.go"
"github.com/nats-io/stan.go"
@@ -198,10 +200,11 @@ func (n NATSArgs) connectStan() (stan.Conn, error) {
addressURL = scheme + "://" + n.Address.String()
}
- clientID, err := getNewUUID()
+ u, err := uuid.NewRandom()
if err != nil {
return nil, err
}
+ clientID := u.String()
connOpts := []stan.Option{stan.NatsURL(addressURL)}
if n.Streaming.MaxPubAcksInflight > 0 {
@@ -220,7 +223,7 @@ type NATSTarget struct {
natsConn *nats.Conn
stanConn stan.Conn
jstream nats.JetStream
- store Store
+ store store.Store[event.Event]
loggerOnce logger.LogOnce
quitCh chan struct{}
}
@@ -230,6 +233,11 @@ func (target *NATSTarget) ID() event.TargetID {
return target.id
}
+// Name - returns the Name of the target.
+func (target *NATSTarget) Name() string {
+ return target.ID().String()
+}
+
// Store returns any underlying store if set.
func (target *NATSTarget) Store() event.TargetStore {
return target.store
@@ -249,19 +257,19 @@ func (target *NATSTarget) isActive() (bool, error) {
if target.stanConn == nil || target.stanConn.NatsConn() == nil {
target.stanConn, connErr = target.args.connectStan()
} else if !target.stanConn.NatsConn().IsConnected() {
- return false, errNotConnected
+ return false, store.ErrNotConnected
}
} else {
if target.natsConn == nil {
target.natsConn, connErr = target.args.connectNats()
} else if !target.natsConn.IsConnected() {
- return false, errNotConnected
+ return false, store.ErrNotConnected
}
}
if connErr != nil {
if connErr.Error() == nats.ErrNoServers.Error() {
- return false, errNotConnected
+ return false, store.ErrNotConnected
}
return false, connErr
}
@@ -270,7 +278,7 @@ func (target *NATSTarget) isActive() (bool, error) {
target.jstream, connErr = target.natsConn.JetStream()
if connErr != nil {
if connErr.Error() == nats.ErrNoServers.Error() {
- return false, errNotConnected
+ return false, store.ErrNotConnected
}
return false, connErr
}
@@ -412,7 +420,7 @@ func (target *NATSTarget) initNATS() error {
return err
}
if !yes {
- return errNotConnected
+ return store.ErrNotConnected
}
return nil
@@ -420,11 +428,11 @@ func (target *NATSTarget) initNATS() error {
// NewNATSTarget - creates new NATS target.
func NewNATSTarget(id string, args NATSArgs, loggerOnce logger.LogOnce) (*NATSTarget, error) {
- var store Store
+ var queueStore store.Store[event.Event]
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-nats-"+id)
- store = NewQueueStore(queueDir, args.QueueLimit)
- if err := store.Open(); err != nil {
+ queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
+ if err := queueStore.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of NATS `%s`: %w", id, err)
}
}
@@ -433,12 +441,12 @@ func NewNATSTarget(id string, args NATSArgs, loggerOnce logger.LogOnce) (*NATSTa
id: event.TargetID{ID: id, Name: "nats"},
args: args,
loggerOnce: loggerOnce,
- store: store,
+ store: queueStore,
quitCh: make(chan struct{}),
}
if target.store != nil {
- streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
+ store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
diff --git a/internal/event/target/nsq.go b/internal/event/target/nsq.go
index 395d57a1d..82ce09ba0 100644
--- a/internal/event/target/nsq.go
+++ b/internal/event/target/nsq.go
@@ -31,6 +31,7 @@ import (
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/logger"
+ "github.com/minio/minio/internal/store"
xnet "github.com/minio/pkg/net"
)
@@ -94,7 +95,7 @@ type NSQTarget struct {
id event.TargetID
args NSQArgs
producer *nsq.Producer
- store Store
+ store store.Store[event.Event]
config *nsq.Config
loggerOnce logger.LogOnce
quitCh chan struct{}
@@ -105,6 +106,11 @@ func (target *NSQTarget) ID() event.TargetID {
return target.id
}
+// Name - returns the Name of the target.
+func (target *NSQTarget) Name() string {
+ return target.ID().String()
+}
+
// Store returns any underlying store if set.
func (target *NSQTarget) Store() event.TargetStore {
return target.store
@@ -129,8 +135,8 @@ func (target *NSQTarget) isActive() (bool, error) {
if err := target.producer.Ping(); err != nil {
// To treat "connection refused" errors as errNotConnected.
- if IsConnRefusedErr(err) {
- return false, errNotConnected
+ if xnet.IsConnRefusedErr(err) {
+ return false, store.ErrNotConnected
}
return false, err
}
@@ -234,7 +240,7 @@ func (target *NSQTarget) initNSQ() error {
err = target.producer.Ping()
if err != nil {
// To treat "connection refused" errors as errNotConnected.
- if !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
+ if !(xnet.IsConnRefusedErr(err) || xnet.IsConnResetErr(err)) {
target.loggerOnce(context.Background(), err, target.ID().String())
}
target.producer.Stop()
@@ -246,7 +252,7 @@ func (target *NSQTarget) initNSQ() error {
return err
}
if !yes {
- return errNotConnected
+ return store.ErrNotConnected
}
return nil
@@ -254,11 +260,11 @@ func (target *NSQTarget) initNSQ() error {
// NewNSQTarget - creates new NSQ target.
func NewNSQTarget(id string, args NSQArgs, loggerOnce logger.LogOnce) (*NSQTarget, error) {
- var store Store
+ var queueStore store.Store[event.Event]
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-nsq-"+id)
- store = NewQueueStore(queueDir, args.QueueLimit)
- if err := store.Open(); err != nil {
+ queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
+ if err := queueStore.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of NSQ `%s`: %w", id, err)
}
}
@@ -267,12 +273,12 @@ func NewNSQTarget(id string, args NSQArgs, loggerOnce logger.LogOnce) (*NSQTarge
id: event.TargetID{ID: id, Name: "nsq"},
args: args,
loggerOnce: loggerOnce,
- store: store,
+ store: queueStore,
quitCh: make(chan struct{}),
}
if target.store != nil {
- streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
+ store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
diff --git a/internal/event/target/postgresql.go b/internal/event/target/postgresql.go
index 02d5d98b4..e91fc7b40 100644
--- a/internal/event/target/postgresql.go
+++ b/internal/event/target/postgresql.go
@@ -34,6 +34,7 @@ import (
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/logger"
+ "github.com/minio/minio/internal/store"
xnet "github.com/minio/pkg/net"
)
@@ -145,7 +146,7 @@ type PostgreSQLTarget struct {
deleteStmt *sql.Stmt
insertStmt *sql.Stmt
db *sql.DB
- store Store
+ store store.Store[event.Event]
firstPing bool
connString string
loggerOnce logger.LogOnce
@@ -157,6 +158,11 @@ func (target *PostgreSQLTarget) ID() event.TargetID {
return target.id
}
+// Name - returns the Name of the target.
+func (target *PostgreSQLTarget) Name() string {
+ return target.ID().String()
+}
+
// Store returns any underlying store if set.
func (target *PostgreSQLTarget) Store() event.TargetStore {
return target.store
@@ -173,7 +179,7 @@ func (target *PostgreSQLTarget) IsActive() (bool, error) {
func (target *PostgreSQLTarget) isActive() (bool, error) {
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
- return false, errNotConnected
+ return false, store.ErrNotConnected
}
return false, err
}
@@ -198,7 +204,7 @@ func (target *PostgreSQLTarget) Save(eventData event.Event) error {
// IsConnErr - To detect a connection error.
func IsConnErr(err error) bool {
- return IsConnRefusedErr(err) || err.Error() == "sql: database is closed" || err.Error() == "sql: statement is closed" || err.Error() == "invalid connection"
+ return xnet.IsConnRefusedErr(err) || err.Error() == "sql: database is closed" || err.Error() == "sql: statement is closed" || err.Error() == "invalid connection"
}
// send - sends an event to the PostgreSQL.
@@ -255,7 +261,7 @@ func (target *PostgreSQLTarget) Send(eventKey string) error {
if !target.firstPing {
if err := target.executeStmts(); err != nil {
if IsConnErr(err) {
- return errNotConnected
+ return store.ErrNotConnected
}
return err
}
@@ -273,7 +279,7 @@ func (target *PostgreSQLTarget) Send(eventKey string) error {
if err := target.send(eventData); err != nil {
if IsConnErr(err) {
- return errNotConnected
+ return store.ErrNotConnected
}
return err
}
@@ -357,7 +363,7 @@ func (target *PostgreSQLTarget) initPostgreSQL() error {
err = target.db.Ping()
if err != nil {
- if !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
+ if !(xnet.IsConnRefusedErr(err) || xnet.IsConnResetErr(err)) {
target.loggerOnce(context.Background(), err, target.ID().String())
}
} else {
@@ -378,7 +384,7 @@ func (target *PostgreSQLTarget) initPostgreSQL() error {
return err
}
if !yes {
- return errNotConnected
+ return store.ErrNotConnected
}
return nil
@@ -407,11 +413,11 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, loggerOnce logger.LogOn
}
connStr := strings.Join(params, " ")
- var store Store
+ var queueStore store.Store[event.Event]
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-postgresql-"+id)
- store = NewQueueStore(queueDir, args.QueueLimit)
- if err := store.Open(); err != nil {
+ queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
+ if err := queueStore.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of PostgreSQL `%s`: %w", id, err)
}
}
@@ -420,14 +426,14 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, loggerOnce logger.LogOn
id: event.TargetID{ID: id, Name: "postgresql"},
args: args,
firstPing: false,
- store: store,
+ store: queueStore,
connString: connStr,
loggerOnce: loggerOnce,
quitCh: make(chan struct{}),
}
if target.store != nil {
- streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
+ store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
diff --git a/internal/event/target/redis.go b/internal/event/target/redis.go
index cfc15a132..40f7ba256 100644
--- a/internal/event/target/redis.go
+++ b/internal/event/target/redis.go
@@ -31,6 +31,7 @@ import (
"github.com/gomodule/redigo/redis"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/logger"
+ "github.com/minio/minio/internal/store"
xnet "github.com/minio/pkg/net"
)
@@ -122,7 +123,7 @@ type RedisTarget struct {
id event.TargetID
args RedisArgs
pool *redis.Pool
- store Store
+ store store.Store[event.Event]
firstPing bool
loggerOnce logger.LogOnce
quitCh chan struct{}
@@ -133,6 +134,11 @@ func (target *RedisTarget) ID() event.TargetID {
return target.id
}
+// Name - returns the Name of the target.
+func (target *RedisTarget) Name() string {
+ return target.ID().String()
+}
+
// Store returns any underlying store if set.
func (target *RedisTarget) Store() event.TargetStore {
return target.store
@@ -152,8 +158,8 @@ func (target *RedisTarget) isActive() (bool, error) {
_, pingErr := conn.Do("PING")
if pingErr != nil {
- if IsConnRefusedErr(pingErr) {
- return false, errNotConnected
+ if xnet.IsConnRefusedErr(pingErr) {
+ return false, store.ErrNotConnected
}
return false, pingErr
}
@@ -227,16 +233,16 @@ func (target *RedisTarget) Send(eventKey string) error {
_, pingErr := conn.Do("PING")
if pingErr != nil {
- if IsConnRefusedErr(pingErr) {
- return errNotConnected
+ if xnet.IsConnRefusedErr(pingErr) {
+ return store.ErrNotConnected
}
return pingErr
}
if !target.firstPing {
if err := target.args.validateFormat(conn); err != nil {
- if IsConnRefusedErr(err) {
- return errNotConnected
+ if xnet.IsConnRefusedErr(err) {
+ return store.ErrNotConnected
}
return err
}
@@ -254,8 +260,8 @@ func (target *RedisTarget) Send(eventKey string) error {
}
if err := target.send(eventData); err != nil {
- if IsConnRefusedErr(err) {
- return errNotConnected
+ if xnet.IsConnRefusedErr(err) {
+ return store.ErrNotConnected
}
return err
}
@@ -280,7 +286,7 @@ func (target *RedisTarget) initRedis() error {
_, pingErr := conn.Do("PING")
if pingErr != nil {
- if !(IsConnRefusedErr(pingErr) || IsConnResetErr(pingErr)) {
+ if !(xnet.IsConnRefusedErr(pingErr) || xnet.IsConnResetErr(pingErr)) {
target.loggerOnce(context.Background(), pingErr, target.ID().String())
}
return pingErr
@@ -298,7 +304,7 @@ func (target *RedisTarget) initRedis() error {
return err
}
if !yes {
- return errNotConnected
+ return store.ErrNotConnected
}
return nil
@@ -306,11 +312,11 @@ func (target *RedisTarget) initRedis() error {
// NewRedisTarget - creates new Redis target.
func NewRedisTarget(id string, args RedisArgs, loggerOnce logger.LogOnce) (*RedisTarget, error) {
- var store Store
+ var queueStore store.Store[event.Event]
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-redis-"+id)
- store = NewQueueStore(queueDir, args.QueueLimit)
- if err := store.Open(); err != nil {
+ queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
+ if err := queueStore.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of Redis `%s`: %w", id, err)
}
}
@@ -349,13 +355,13 @@ func NewRedisTarget(id string, args RedisArgs, loggerOnce logger.LogOnce) (*Redi
id: event.TargetID{ID: id, Name: "redis"},
args: args,
pool: pool,
- store: store,
+ store: queueStore,
loggerOnce: loggerOnce,
quitCh: make(chan struct{}),
}
if target.store != nil {
- streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
+ store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
diff --git a/internal/event/target/store.go b/internal/event/target/store.go
deleted file mode 100644
index 199c21a9e..000000000
--- a/internal/event/target/store.go
+++ /dev/null
@@ -1,153 +0,0 @@
-// Copyright (c) 2015-2021 MinIO, Inc.
-//
-// This file is part of MinIO Object Storage stack
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package target
-
-import (
- "context"
- "errors"
- "fmt"
- "strings"
- "syscall"
- "time"
-
- "github.com/minio/minio/internal/event"
- "github.com/minio/minio/internal/logger"
-)
-
-const retryInterval = 3 * time.Second
-
-// errNotConnected - indicates that the target connection is not active.
-var errNotConnected = errors.New("not connected to target server/service")
-
-// errLimitExceeded error is sent when the maximum limit is reached.
-var errLimitExceeded = errors.New("the maximum store limit reached")
-
-// Store - To persist the events.
-type Store interface {
- Put(event event.Event) error
- Get(key string) (event.Event, error)
- Len() int
- List() ([]string, error)
- Del(key string) error
- Open() error
-}
-
-// replayEvents - Reads the events from the store and replays.
-func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce logger.LogOnce, id string) <-chan string {
- eventKeyCh := make(chan string)
-
- go func() {
- defer close(eventKeyCh)
-
- retryTicker := time.NewTicker(retryInterval)
- defer retryTicker.Stop()
-
- for {
- names, err := store.List()
- if err != nil {
- loggerOnce(context.Background(), fmt.Errorf("eventStore.List() failed with: %w", err), id)
- } else {
- for _, name := range names {
- select {
- case eventKeyCh <- strings.TrimSuffix(name, eventExt):
- // Get next key.
- case <-doneCh:
- return
- }
- }
- }
-
- select {
- case <-retryTicker.C:
- case <-doneCh:
- return
- }
- }
- }()
-
- return eventKeyCh
-}
-
-// IsConnRefusedErr - To check fot "connection refused" error.
-func IsConnRefusedErr(err error) bool {
- return errors.Is(err, syscall.ECONNREFUSED)
-}
-
-// IsConnResetErr - Checks for connection reset errors.
-func IsConnResetErr(err error) bool {
- if strings.Contains(err.Error(), "connection reset by peer") {
- return true
- }
- // incase if error message is wrapped.
- return errors.Is(err, syscall.ECONNRESET)
-}
-
-// sendEvents - Reads events from the store and re-plays.
-func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}, loggerOnce logger.LogOnce) {
- retryTicker := time.NewTicker(retryInterval)
- defer retryTicker.Stop()
-
- send := func(eventKey string) bool {
- for {
- err := target.Send(eventKey)
- if err == nil {
- break
- }
-
- if err != errNotConnected && !IsConnResetErr(err) {
- loggerOnce(context.Background(),
- fmt.Errorf("target.Send() failed with '%w'", err),
- target.ID().String())
- }
-
- // Retrying after 3secs back-off
-
- select {
- case <-retryTicker.C:
- case <-doneCh:
- return false
- }
- }
- return true
- }
-
- for {
- select {
- case eventKey, ok := <-eventKeyCh:
- if !ok {
- // closed channel.
- return
- }
-
- if !send(eventKey) {
- return
- }
- case <-doneCh:
- return
- }
- }
-}
-
-func streamEventsFromStore(store Store, target event.Target, doneCh <-chan struct{}, loggerOnce logger.LogOnce) {
- go func() {
- // Replays the events from the store.
- eventKeyCh := replayEvents(store, doneCh, loggerOnce, target.ID().String())
- // Send events from the store.
- sendEvents(target, eventKeyCh, doneCh, loggerOnce)
- }()
-}
diff --git a/internal/event/target/webhook.go b/internal/event/target/webhook.go
index 9b16893ad..b7feb9ad1 100644
--- a/internal/event/target/webhook.go
+++ b/internal/event/target/webhook.go
@@ -35,6 +35,7 @@ import (
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/logger"
+ "github.com/minio/minio/internal/store"
"github.com/minio/pkg/certs"
xnet "github.com/minio/pkg/net"
)
@@ -96,7 +97,7 @@ type WebhookTarget struct {
args WebhookArgs
transport *http.Transport
httpClient *http.Client
- store Store
+ store store.Store[event.Event]
loggerOnce logger.LogOnce
cancel context.CancelFunc
cancelCh <-chan struct{}
@@ -107,6 +108,11 @@ func (target *WebhookTarget) ID() event.TargetID {
return target.id
}
+// Name - returns the Name of the target.
+func (target *WebhookTarget) Name() string {
+ return target.ID().String()
+}
+
// IsActive - Return true if target is up and active
func (target *WebhookTarget) IsActive() (bool, error) {
if err := target.init(); err != nil {
@@ -127,7 +133,7 @@ func (target *WebhookTarget) isActive() (bool, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodHead, target.args.Endpoint.String(), nil)
if err != nil {
if xnet.IsNetworkOrHostDown(err, false) {
- return false, errNotConnected
+ return false, store.ErrNotConnected
}
return false, err
}
@@ -142,7 +148,7 @@ func (target *WebhookTarget) isActive() (bool, error) {
resp, err := target.httpClient.Do(req)
if err != nil {
if xnet.IsNetworkOrHostDown(err, true) {
- return false, errNotConnected
+ return false, store.ErrNotConnected
}
return false, err
}
@@ -165,7 +171,7 @@ func (target *WebhookTarget) Save(eventData event.Event) error {
err := target.send(eventData)
if err != nil {
if xnet.IsNetworkOrHostDown(err, false) {
- return errNotConnected
+ return store.ErrNotConnected
}
}
return err
@@ -235,7 +241,7 @@ func (target *WebhookTarget) Send(eventKey string) error {
if err := target.send(eventData); err != nil {
if xnet.IsNetworkOrHostDown(err, false) {
- return errNotConnected
+ return store.ErrNotConnected
}
return err
}
@@ -274,7 +280,7 @@ func (target *WebhookTarget) initWebhook() error {
return err
}
if !yes {
- return errNotConnected
+ return store.ErrNotConnected
}
return nil
@@ -284,11 +290,11 @@ func (target *WebhookTarget) initWebhook() error {
func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOnce logger.LogOnce, transport *http.Transport) (*WebhookTarget, error) {
ctx, cancel := context.WithCancel(ctx)
- var store Store
+ var queueStore store.Store[event.Event]
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-webhook-"+id)
- store = NewQueueStore(queueDir, args.QueueLimit)
- if err := store.Open(); err != nil {
+ queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
+ if err := queueStore.Open(); err != nil {
cancel()
return nil, fmt.Errorf("unable to initialize the queue store of Webhook `%s`: %w", id, err)
}
@@ -299,13 +305,13 @@ func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOn
args: args,
loggerOnce: loggerOnce,
transport: transport,
- store: store,
+ store: queueStore,
cancel: cancel,
cancelCh: ctx.Done(),
}
if target.store != nil {
- streamEventsFromStore(target.store, target, target.cancelCh, target.loggerOnce)
+ store.StreamItems(target.store, target, target.cancelCh, target.loggerOnce)
}
return target, nil
diff --git a/internal/event/target/queuestore.go b/internal/store/queuestore.go
similarity index 65%
rename from internal/event/target/queuestore.go
rename to internal/store/queuestore.go
index 38d9f8922..d4a2968ac 100644
--- a/internal/event/target/queuestore.go
+++ b/internal/store/queuestore.go
@@ -15,10 +15,11 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package target
+package store
import (
"encoding/json"
+ "errors"
"os"
"path/filepath"
"sort"
@@ -26,38 +27,47 @@ import (
"sync"
"time"
- "github.com/minio/minio/internal/event"
+ "github.com/google/uuid"
)
const (
defaultLimit = 100000 // Default store limit.
- eventExt = ".event"
+ defaultExt = ".unknown"
)
-// QueueStore - Filestore for persisting events.
-type QueueStore struct {
+// errLimitExceeded error is sent when the maximum limit is reached.
+var errLimitExceeded = errors.New("the maximum store limit reached")
+
+// QueueStore - Filestore for persisting items.
+type QueueStore[_ any] struct {
sync.RWMutex
entryLimit uint64
directory string
+ fileExt string
entries map[string]int64 // key -> modtime as unix nano
}
// NewQueueStore - Creates an instance for QueueStore.
-func NewQueueStore(directory string, limit uint64) Store {
+func NewQueueStore[I any](directory string, limit uint64, ext string) *QueueStore[I] {
if limit == 0 {
limit = defaultLimit
}
- return &QueueStore{
+ if ext == "" {
+ ext = defaultExt
+ }
+
+ return &QueueStore[I]{
directory: directory,
entryLimit: limit,
+ fileExt: ext,
entries: make(map[string]int64, limit),
}
}
// Open - Creates the directory if not present.
-func (store *QueueStore) Open() error {
+func (store *QueueStore[_]) Open() error {
store.Lock()
defer store.Unlock()
@@ -78,7 +88,7 @@ func (store *QueueStore) Open() error {
if file.IsDir() {
continue
}
- key := strings.TrimSuffix(file.Name(), eventExt)
+ key := strings.TrimSuffix(file.Name(), store.fileExt)
if fi, err := file.Info(); err == nil {
store.entries[key] = fi.ModTime().UnixNano()
}
@@ -87,44 +97,45 @@ func (store *QueueStore) Open() error {
return nil
}
-// write - writes event to the directory.
-func (store *QueueStore) write(key string, e event.Event) error {
- // Marshalls the event.
- eventData, err := json.Marshal(e)
+// write - writes an item to the directory.
+func (store *QueueStore[I]) write(key string, item I) error {
+ // Marshalls the item.
+ eventData, err := json.Marshal(item)
if err != nil {
return err
}
- path := filepath.Join(store.directory, key+eventExt)
+ path := filepath.Join(store.directory, key+store.fileExt)
if err := os.WriteFile(path, eventData, os.FileMode(0o770)); err != nil {
return err
}
- // Increment the event count.
+ // Increment the item count.
store.entries[key] = time.Now().UnixNano()
return nil
}
-// Put - puts a event to the store.
-func (store *QueueStore) Put(e event.Event) error {
+// Put - puts an item to the store.
+func (store *QueueStore[I]) Put(item I) error {
store.Lock()
defer store.Unlock()
if uint64(len(store.entries)) >= store.entryLimit {
return errLimitExceeded
}
- key, err := getNewUUID()
+ // Generate a new UUID for the key.
+ key, err := uuid.NewRandom()
if err != nil {
return err
}
- return store.write(key, e)
+ return store.write(key.String(), item)
}
-// Get - gets a event from the store.
-func (store *QueueStore) Get(key string) (event event.Event, err error) {
+// Get - gets an item from the store.
+func (store *QueueStore[I]) Get(key string) (item I, err error) {
store.RLock()
- defer func(store *QueueStore) {
+ defer func(store *QueueStore[I]) {
store.RUnlock()
if err != nil {
// Upon error we remove the entry.
@@ -133,31 +144,31 @@ func (store *QueueStore) Get(key string) (event event.Event, err error) {
}(store)
var eventData []byte
- eventData, err = os.ReadFile(filepath.Join(store.directory, key+eventExt))
+ eventData, err = os.ReadFile(filepath.Join(store.directory, key+store.fileExt))
if err != nil {
- return event, err
+ return item, err
}
if len(eventData) == 0 {
- return event, os.ErrNotExist
+ return item, os.ErrNotExist
}
- if err = json.Unmarshal(eventData, &event); err != nil {
- return event, err
+ if err = json.Unmarshal(eventData, &item); err != nil {
+ return item, err
}
- return event, nil
+ return item, nil
}
// Del - Deletes an entry from the store.
-func (store *QueueStore) Del(key string) error {
+func (store *QueueStore[_]) Del(key string) error {
store.Lock()
defer store.Unlock()
return store.del(key)
}
// Len returns the entry count.
-func (store *QueueStore) Len() int {
+func (store *QueueStore[_]) Len() int {
store.RLock()
l := len(store.entries)
defer store.RUnlock()
@@ -165,8 +176,8 @@ func (store *QueueStore) Len() int {
}
// lockless call
-func (store *QueueStore) del(key string) error {
- err := os.Remove(filepath.Join(store.directory, key+eventExt))
+func (store *QueueStore[_]) del(key string) error {
+ err := os.Remove(filepath.Join(store.directory, key+store.fileExt))
// Delete as entry no matter the result
delete(store.entries, key)
@@ -175,7 +186,7 @@ func (store *QueueStore) del(key string) error {
}
// List - lists all files registered in the store.
-func (store *QueueStore) List() ([]string, error) {
+func (store *QueueStore[_]) List() ([]string, error) {
store.RLock()
l := make([]string, 0, len(store.entries))
for k := range store.entries {
@@ -194,7 +205,7 @@ func (store *QueueStore) List() ([]string, error) {
// list will read all entries from disk.
// Entries are returned sorted by modtime, oldest first.
// Underlying entry list in store is *not* updated.
-func (store *QueueStore) list() ([]os.DirEntry, error) {
+func (store *QueueStore[_]) list() ([]os.DirEntry, error) {
files, err := os.ReadDir(store.directory)
if err != nil {
return nil, err
@@ -215,3 +226,9 @@ func (store *QueueStore) list() ([]os.DirEntry, error) {
return files, nil
}
+
+// Extension will return the file extension used
+// for the items stored in the queue.
+func (store *QueueStore[_]) Extension() string {
+ return store.fileExt
+}
diff --git a/internal/event/target/queuestore_test.go b/internal/store/queuestore_test.go
similarity index 62%
rename from internal/event/target/queuestore_test.go
rename to internal/store/queuestore_test.go
index 7c5637d90..680cb177b 100644
--- a/internal/event/target/queuestore_test.go
+++ b/internal/store/queuestore_test.go
@@ -15,7 +15,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package target
+package store
import (
"os"
@@ -23,48 +23,54 @@ import (
"reflect"
"strings"
"testing"
-
- "github.com/minio/minio/internal/event"
)
-// TestDir
-var queueDir = filepath.Join(os.TempDir(), "minio_test")
-
-// Sample test event.
-var testEvent = event.Event{EventVersion: "1.0", EventSource: "test_source", AwsRegion: "test_region", EventTime: "test_time", EventName: event.ObjectAccessedGet}
-
-// Initialize the store.
-func setUpStore(directory string, limit uint64) (Store, error) {
- store := NewQueueStore(queueDir, limit)
- if oErr := store.Open(); oErr != nil {
- return nil, oErr
- }
- return store, nil
+type TestItem struct {
+ Name string `json:"Name"`
+ Property string `json:"property"`
}
-// Tear down store
-func tearDownStore() error {
+var (
+ // TestDir
+ queueDir = filepath.Join(os.TempDir(), "minio_test")
+ // Sample test item.
+ testItem = TestItem{Name: "test-item", Property: "property"}
+ // Ext for test item
+ testItemExt = ".test"
+)
+
+// Initialize the queue store.
+func setUpQueueStore(directory string, limit uint64) (Store[TestItem], error) {
+ queueStore := NewQueueStore[TestItem](queueDir, limit, testItemExt)
+ if oErr := queueStore.Open(); oErr != nil {
+ return nil, oErr
+ }
+ return queueStore, nil
+}
+
+// Tear down queue store.
+func tearDownQueueStore() error {
return os.RemoveAll(queueDir)
}
// TestQueueStorePut - tests for store.Put
func TestQueueStorePut(t *testing.T) {
defer func() {
- if err := tearDownStore(); err != nil {
+ if err := tearDownQueueStore(); err != nil {
t.Fatal("Failed to tear down store ", err)
}
}()
- store, err := setUpStore(queueDir, 100)
+ store, err := setUpQueueStore(queueDir, 100)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
- // Put 100 events.
+ // Put 100 items.
for i := 0; i < 100; i++ {
- if err := store.Put(testEvent); err != nil {
+ if err := store.Put(testItem); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
- // Count the events.
+ // Count the items.
names, err := store.List()
if err != nil {
t.Fatal(err)
@@ -77,71 +83,71 @@ func TestQueueStorePut(t *testing.T) {
// TestQueueStoreGet - tests for store.Get
func TestQueueStoreGet(t *testing.T) {
defer func() {
- if err := tearDownStore(); err != nil {
+ if err := tearDownQueueStore(); err != nil {
t.Fatal("Failed to tear down store ", err)
}
}()
- store, err := setUpStore(queueDir, 10)
+ store, err := setUpQueueStore(queueDir, 10)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
- // Put 10 events
+ // Put 10 items
for i := 0; i < 10; i++ {
- if err := store.Put(testEvent); err != nil {
+ if err := store.Put(testItem); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
- eventKeys, err := store.List()
+ itemKeys, err := store.List()
if err != nil {
t.Fatal(err)
}
- // Get 10 events.
- if len(eventKeys) == 10 {
- for _, key := range eventKeys {
- event, eErr := store.Get(strings.TrimSuffix(key, eventExt))
+ // Get 10 items.
+ if len(itemKeys) == 10 {
+ for _, key := range itemKeys {
+ item, eErr := store.Get(strings.TrimSuffix(key, testItemExt))
if eErr != nil {
- t.Fatal("Failed to Get the event from the queue store ", eErr)
+ t.Fatal("Failed to Get the item from the queue store ", eErr)
}
- if !reflect.DeepEqual(testEvent, event) {
- t.Fatalf("Failed to read the event: error: expected = %v, got = %v", testEvent, event)
+ if !reflect.DeepEqual(testItem, item) {
+ t.Fatalf("Failed to read the item: error: expected = %v, got = %v", testItem, item)
}
}
} else {
- t.Fatalf("List() Expected: 10, got %d", len(eventKeys))
+ t.Fatalf("List() Expected: 10, got %d", len(itemKeys))
}
}
// TestQueueStoreDel - tests for store.Del
func TestQueueStoreDel(t *testing.T) {
defer func() {
- if err := tearDownStore(); err != nil {
+ if err := tearDownQueueStore(); err != nil {
t.Fatal("Failed to tear down store ", err)
}
}()
- store, err := setUpStore(queueDir, 20)
+ store, err := setUpQueueStore(queueDir, 20)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
- // Put 20 events.
+ // Put 20 items.
for i := 0; i < 20; i++ {
- if err := store.Put(testEvent); err != nil {
+ if err := store.Put(testItem); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
- eventKeys, err := store.List()
+ itemKeys, err := store.List()
if err != nil {
t.Fatal(err)
}
- // Remove all the events.
- if len(eventKeys) == 20 {
- for _, key := range eventKeys {
- err := store.Del(strings.TrimSuffix(key, eventExt))
+ // Remove all the items.
+ if len(itemKeys) == 20 {
+ for _, key := range itemKeys {
+ err := store.Del(strings.TrimSuffix(key, testItemExt))
if err != nil {
t.Fatal("queue store Del failed with ", err)
}
}
} else {
- t.Fatalf("List() Expected: 20, got %d", len(eventKeys))
+ t.Fatalf("List() Expected: 20, got %d", len(itemKeys))
}
names, err := store.List()
@@ -153,25 +159,25 @@ func TestQueueStoreDel(t *testing.T) {
}
}
-// TestQueueStoreLimit - tests the event limit for the store.
+// TestQueueStoreLimit - tests the item limit for the store.
func TestQueueStoreLimit(t *testing.T) {
defer func() {
- if err := tearDownStore(); err != nil {
+ if err := tearDownQueueStore(); err != nil {
t.Fatal("Failed to tear down store ", err)
}
}()
// The max limit is set to 5.
- store, err := setUpStore(queueDir, 5)
+ store, err := setUpQueueStore(queueDir, 5)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
for i := 0; i < 5; i++ {
- if err := store.Put(testEvent); err != nil {
+ if err := store.Put(testItem); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
// Should not allow 6th Put.
- if err := store.Put(testEvent); err == nil {
+ if err := store.Put(testItem); err == nil {
t.Fatalf("Expected to fail with %s, but passes", errLimitExceeded)
}
}
@@ -179,20 +185,20 @@ func TestQueueStoreLimit(t *testing.T) {
// TestQueueStoreLimit - tests for store.LimitN.
func TestQueueStoreListN(t *testing.T) {
defer func() {
- if err := tearDownStore(); err != nil {
+ if err := tearDownQueueStore(); err != nil {
t.Fatal("Failed to tear down store ", err)
}
}()
- store, err := setUpStore(queueDir, 10)
+ store, err := setUpQueueStore(queueDir, 10)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
for i := 0; i < 10; i++ {
- if err := store.Put(testEvent); err != nil {
+ if err := store.Put(testItem); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
- // Should return all the event keys in the store.
+ // Should return all the item keys in the store.
names, err := store.List()
if err != nil {
t.Fatal(err)
@@ -203,7 +209,7 @@ func TestQueueStoreListN(t *testing.T) {
}
// re-open
- store, err = setUpStore(queueDir, 10)
+ store, err = setUpQueueStore(queueDir, 10)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
diff --git a/internal/store/store.go b/internal/store/store.go
new file mode 100644
index 000000000..b8428a71f
--- /dev/null
+++ b/internal/store/store.go
@@ -0,0 +1,145 @@
+// Copyright (c) 2015-2023 MinIO, Inc.
+//
+// This file is part of MinIO Object Storage stack
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package store
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/minio/minio/internal/logger"
+ xnet "github.com/minio/pkg/net"
+)
+
+const (
+ retryInterval = 3 * time.Second
+)
+
+// ErrNotConnected - indicates that the target connection is not active.
+var ErrNotConnected = errors.New("not connected to target server/service")
+
+// Target - store target interface
+type Target interface {
+ Name() string
+ Send(key string) error
+}
+
+// Store - Used to persist items.
+type Store[I any] interface {
+ Put(item I) error
+ Get(key string) (I, error)
+ Len() int
+ List() ([]string, error)
+ Del(key string) error
+ Open() error
+ Extension() string
+}
+
+// replayItems - Reads the items from the store and replays.
+func replayItems[I any](store Store[I], doneCh <-chan struct{}, loggerOnce logger.LogOnce, id string) <-chan string {
+ itemKeyCh := make(chan string)
+
+ go func() {
+ defer close(itemKeyCh)
+
+ retryTicker := time.NewTicker(retryInterval)
+ defer retryTicker.Stop()
+
+ for {
+ names, err := store.List()
+ if err != nil {
+ loggerOnce(context.Background(), fmt.Errorf("store.List() failed with: %w", err), id)
+ } else {
+ for _, name := range names {
+ select {
+ case itemKeyCh <- strings.TrimSuffix(name, store.Extension()):
+ // Get next key.
+ case <-doneCh:
+ return
+ }
+ }
+ }
+
+ select {
+ case <-retryTicker.C:
+ case <-doneCh:
+ return
+ }
+ }
+ }()
+
+ return itemKeyCh
+}
+
+// sendItems - Reads items from the store and re-plays.
+func sendItems(target Target, itemKeyCh <-chan string, doneCh <-chan struct{}, loggerOnce logger.LogOnce) {
+ retryTicker := time.NewTicker(retryInterval)
+ defer retryTicker.Stop()
+
+ send := func(itemKey string) bool {
+ for {
+ err := target.Send(itemKey)
+ if err == nil {
+ break
+ }
+
+ if err != ErrNotConnected && !xnet.IsConnResetErr(err) {
+ loggerOnce(context.Background(),
+ fmt.Errorf("target.Send() failed with '%w'", err),
+ target.Name())
+ }
+
+ // Retrying after 3secs back-off
+
+ select {
+ case <-retryTicker.C:
+ case <-doneCh:
+ return false
+ }
+ }
+ return true
+ }
+
+ for {
+ select {
+ case itemKey, ok := <-itemKeyCh:
+ if !ok {
+ // closed channel.
+ return
+ }
+
+ if !send(itemKey) {
+ return
+ }
+ case <-doneCh:
+ return
+ }
+ }
+}
+
+// StreamItems reads the keys from the store and replays the corresponding item to the target.
+func StreamItems[I any](store Store[I], target Target, doneCh <-chan struct{}, loggerOnce logger.LogOnce) {
+ go func() {
+ // Replays the items from the store.
+ itemKeyCh := replayItems(store, doneCh, loggerOnce, target.Name())
+ // Send items from the store.
+ sendItems(target, itemKeyCh, doneCh, loggerOnce)
+ }()
+}