From 51f7f9aaa38f23d8de5ba606eb08836cfcf1370e Mon Sep 17 00:00:00 2001 From: Praveen raj Mani Date: Tue, 4 Apr 2023 23:22:24 +0530 Subject: [PATCH] Generalize the event store using go generics (#16910) --- go.mod | 2 +- go.sum | 4 +- internal/event/event.go | 3 + internal/event/target/amqp.go | 24 +-- internal/event/target/common.go | 29 ---- internal/event/target/elasticsearch.go | 28 ++-- internal/event/target/kafka.go | 28 ++-- internal/event/target/mqtt.go | 26 +-- internal/event/target/mysql.go | 28 ++-- internal/event/target/nats.go | 32 ++-- internal/event/target/nsq.go | 26 +-- internal/event/target/postgresql.go | 30 ++-- internal/event/target/redis.go | 38 +++-- internal/event/target/store.go | 153 ------------------ internal/event/target/webhook.go | 28 ++-- .../{event/target => store}/queuestore.go | 85 ++++++---- .../target => store}/queuestore_test.go | 118 +++++++------- internal/store/store.go | 145 +++++++++++++++++ 18 files changed, 439 insertions(+), 388 deletions(-) delete mode 100644 internal/event/target/common.go delete mode 100644 internal/event/target/store.go rename internal/{event/target => store}/queuestore.go (65%) rename internal/{event/target => store}/queuestore_test.go (62%) create mode 100644 internal/store/store.go diff --git a/go.mod b/go.mod index 89a446c58..3ccc29d06 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/minio/madmin-go/v2 v2.0.17 github.com/minio/minio-go/v7 v7.0.50 github.com/minio/mux v1.9.0 - github.com/minio/pkg v1.6.5 + github.com/minio/pkg v1.6.6-0.20230330040824-5db111e5f63c github.com/minio/selfupdate v0.6.0 github.com/minio/sha256-simd v1.0.0 github.com/minio/simdjson-go v0.4.5 diff --git a/go.sum b/go.sum index 49a5f05af..7d9318124 100644 --- a/go.sum +++ b/go.sum @@ -786,8 +786,8 @@ github.com/minio/minio-go/v7 v7.0.50/go.mod h1:IbbodHyjUAguneyucUaahv+VMNs/EOTV9 github.com/minio/mux v1.9.0 h1:dWafQFyEfGhJvK6AwLOt83bIG5bxKxKJnKMCi0XAaoA= github.com/minio/mux v1.9.0/go.mod h1:1pAare17ZRL5GpmNL+9YmqHoWnLmMZF9C/ioUCfy0BQ= github.com/minio/pkg v1.5.4/go.mod h1:2MOaRFdmFKULD+uOLc3qHLGTQTuxCNPKNPfLBTxC8CA= -github.com/minio/pkg v1.6.5 h1:T9cRNcCLJTFFgQGH0Rzr1CtAWLAIchTsbE0lSztCf40= -github.com/minio/pkg v1.6.5/go.mod h1:0iX1IuJGSCnMvIvrEJauk1GgQSX9JdU6Kh0P3EQRGkI= +github.com/minio/pkg v1.6.6-0.20230330040824-5db111e5f63c h1:Ukw0+d0T/+9lserJfodr4HGIIb3hLkt8tlgMh2vUfZg= +github.com/minio/pkg v1.6.6-0.20230330040824-5db111e5f63c/go.mod h1:0iX1IuJGSCnMvIvrEJauk1GgQSX9JdU6Kh0P3EQRGkI= github.com/minio/selfupdate v0.6.0 h1:i76PgT0K5xO9+hjzKcacQtO7+MjJ4JKA8Ak8XQ9DDwU= github.com/minio/selfupdate v0.6.0/go.mod h1:bO02GTIPCMQFTEvE5h4DjYB58bCoZ35XLeBf0buTDdM= github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= diff --git a/internal/event/event.go b/internal/event/event.go index 7c6846978..e01a801a6 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -30,6 +30,9 @@ const ( // AMZTimeFormat - event time format. AMZTimeFormat = "2006-01-02T15:04:05.000Z" + + // StoreExtension - file extension of an event file in store + StoreExtension = ".event" ) // Identity represents access key who caused the event. diff --git a/internal/event/target/amqp.go b/internal/event/target/amqp.go index c838a5cc5..cc2356a72 100644 --- a/internal/event/target/amqp.go +++ b/internal/event/target/amqp.go @@ -30,6 +30,7 @@ import ( "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" "github.com/rabbitmq/amqp091-go" ) @@ -117,7 +118,7 @@ type AMQPTarget struct { args AMQPArgs conn *amqp091.Connection connMutex sync.Mutex - store Store + store store.Store[event.Event] loggerOnce logger.LogOnce quitCh chan struct{} @@ -128,6 +129,11 @@ func (target *AMQPTarget) ID() event.TargetID { return target.id } +// Name - returns the Name of the target. +func (target *AMQPTarget) Name() string { + return target.ID().String() +} + // Store returns any underlying store if set. func (target *AMQPTarget) Store() event.TargetStore { return target.store @@ -197,8 +203,8 @@ func (target *AMQPTarget) channel() (*amqp091.Channel, chan amqp091.Confirmation conn, err = amqp091.Dial(target.args.URL.String()) if err != nil { - if IsConnRefusedErr(err) { - return nil, nil, errNotConnected + if xnet.IsConnRefusedErr(err) { + return nil, nil, store.ErrNotConnected } return nil, nil, err } @@ -330,7 +336,7 @@ func (target *AMQPTarget) init() error { func (target *AMQPTarget) initAMQP() error { conn, err := amqp091.Dial(target.args.URL.String()) if err != nil { - if IsConnRefusedErr(err) || IsConnResetErr(err) { + if xnet.IsConnRefusedErr(err) || xnet.IsConnResetErr(err) { target.loggerOnce(context.Background(), err, target.ID().String()) } return err @@ -342,11 +348,11 @@ func (target *AMQPTarget) initAMQP() error { // NewAMQPTarget - creates new AMQP target. func NewAMQPTarget(id string, args AMQPArgs, loggerOnce logger.LogOnce) (*AMQPTarget, error) { - var store Store + var queueStore store.Store[event.Event] if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-amqp-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if err := store.Open(); err != nil { + queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension) + if err := queueStore.Open(); err != nil { return nil, fmt.Errorf("unable to initialize the queue store of AMQP `%s`: %w", id, err) } } @@ -355,12 +361,12 @@ func NewAMQPTarget(id string, args AMQPArgs, loggerOnce logger.LogOnce) (*AMQPTa id: event.TargetID{ID: id, Name: "amqp"}, args: args, loggerOnce: loggerOnce, - store: store, + store: queueStore, quitCh: make(chan struct{}), } if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + store.StreamItems(target.store, target, target.quitCh, target.loggerOnce) } return target, nil diff --git a/internal/event/target/common.go b/internal/event/target/common.go deleted file mode 100644 index 9a5eae111..000000000 --- a/internal/event/target/common.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (c) 2015-2021 MinIO, Inc. -// -// This file is part of MinIO Object Storage stack -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package target - -import "github.com/google/uuid" - -func getNewUUID() (string, error) { - u, err := uuid.NewRandom() - if err != nil { - return "", err - } - - return u.String(), nil -} diff --git a/internal/event/target/elasticsearch.go b/internal/event/target/elasticsearch.go index 80a9b81e1..b8f6eee38 100644 --- a/internal/event/target/elasticsearch.go +++ b/internal/event/target/elasticsearch.go @@ -36,6 +36,7 @@ import ( "github.com/minio/highwayhash" "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" "github.com/pkg/errors" ) @@ -157,7 +158,7 @@ type ElasticsearchTarget struct { id event.TargetID args ElasticsearchArgs client esClient - store Store + store store.Store[event.Event] loggerOnce logger.LogOnce quitCh chan struct{} } @@ -167,6 +168,11 @@ func (target *ElasticsearchTarget) ID() event.TargetID { return target.id } +// Name - returns the Name of the target. +func (target *ElasticsearchTarget) Name() string { + return target.ID().String() +} + // Store returns any underlying store if set. func (target *ElasticsearchTarget) Store() event.TargetStore { return target.store @@ -212,7 +218,7 @@ func (target *ElasticsearchTarget) Save(eventData event.Event) error { err = target.send(eventData) if xnet.IsNetworkOrHostDown(err, false) { - return errNotConnected + return store.ErrNotConnected } return err } @@ -284,7 +290,7 @@ func (target *ElasticsearchTarget) Send(eventKey string) error { if err := target.send(eventData); err != nil { if xnet.IsNetworkOrHostDown(err, false) { - return errNotConnected + return store.ErrNotConnected } return err } @@ -348,7 +354,7 @@ func (target *ElasticsearchTarget) initElasticsearch() error { err := target.checkAndInitClient(ctx) if err != nil { - if err != errNotConnected { + if err != store.ErrNotConnected { target.loggerOnce(context.Background(), err, target.ID().String()) } return err @@ -359,11 +365,11 @@ func (target *ElasticsearchTarget) initElasticsearch() error { // NewElasticsearchTarget - creates new Elasticsearch target. func NewElasticsearchTarget(id string, args ElasticsearchArgs, loggerOnce logger.LogOnce) (*ElasticsearchTarget, error) { - var store Store + var queueStore store.Store[event.Event] if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if err := store.Open(); err != nil { + queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension) + if err := queueStore.Open(); err != nil { return nil, fmt.Errorf("unable to initialize the queue store of Elasticsearch `%s`: %w", id, err) } } @@ -371,13 +377,13 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, loggerOnce logger target := &ElasticsearchTarget{ id: event.TargetID{ID: id, Name: "elasticsearch"}, args: args, - store: store, + store: queueStore, loggerOnce: loggerOnce, quitCh: make(chan struct{}), } if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + store.StreamItems(target.store, target, target.quitCh, target.loggerOnce) } return target, nil @@ -415,7 +421,7 @@ func (c *esClientV7) getServerSupportStatus(ctx context.Context) (ESSupportStatu c.Info.WithContext(ctx), ) if err != nil { - return ESSUnknown, "", errNotConnected + return ESSUnknown, "", store.ErrNotConnected } defer resp.Body.Close() @@ -485,7 +491,7 @@ func (c *esClientV7) ping(ctx context.Context, _ ElasticsearchArgs) (bool, error c.Ping.WithContext(ctx), ) if err != nil { - return false, errNotConnected + return false, store.ErrNotConnected } io.Copy(io.Discard, resp.Body) resp.Body.Close() diff --git a/internal/event/target/kafka.go b/internal/event/target/kafka.go index de2483f75..9894a42d4 100644 --- a/internal/event/target/kafka.go +++ b/internal/event/target/kafka.go @@ -31,6 +31,7 @@ import ( "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" "github.com/Shopify/sarama" @@ -129,7 +130,7 @@ type KafkaTarget struct { args KafkaArgs producer sarama.SyncProducer config *sarama.Config - store Store + store store.Store[event.Event] loggerOnce logger.LogOnce quitCh chan struct{} } @@ -139,6 +140,11 @@ func (target *KafkaTarget) ID() event.TargetID { return target.id } +// Name - returns the Name of the target. +func (target *KafkaTarget) Name() string { + return target.ID().String() +} + // Store returns any underlying store if set. func (target *KafkaTarget) Store() event.TargetStore { return target.store @@ -154,7 +160,7 @@ func (target *KafkaTarget) IsActive() (bool, error) { func (target *KafkaTarget) isActive() (bool, error) { if !target.args.pingBrokers() { - return false, errNotConnected + return false, store.ErrNotConnected } return true, nil } @@ -178,7 +184,7 @@ func (target *KafkaTarget) Save(eventData event.Event) error { // send - sends an event to the kafka. func (target *KafkaTarget) send(eventData event.Event) error { if target.producer == nil { - return errNotConnected + return store.ErrNotConnected } objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { @@ -224,7 +230,7 @@ func (target *KafkaTarget) Send(eventKey string) error { if err != sarama.ErrOutOfBrokers { return err } - return errNotConnected + return store.ErrNotConnected } } @@ -242,7 +248,7 @@ func (target *KafkaTarget) Send(eventKey string) error { if err != nil { // Sarama opens the ciruit breaker after 3 consecutive connection failures. if err == sarama.ErrLeaderNotAvailable || err.Error() == "circuit breaker is open" { - return errNotConnected + return store.ErrNotConnected } return err } @@ -330,7 +336,7 @@ func (target *KafkaTarget) initKafka() error { return err } if !yes { - return errNotConnected + return store.ErrNotConnected } return nil @@ -338,11 +344,11 @@ func (target *KafkaTarget) initKafka() error { // NewKafkaTarget - creates new Kafka target with auth credentials. func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*KafkaTarget, error) { - var store Store + var queueStore store.Store[event.Event] if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-kafka-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if err := store.Open(); err != nil { + queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension) + if err := queueStore.Open(); err != nil { return nil, fmt.Errorf("unable to initialize the queue store of Kafka `%s`: %w", id, err) } } @@ -350,13 +356,13 @@ func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*Kafk target := &KafkaTarget{ id: event.TargetID{ID: id, Name: "kafka"}, args: args, - store: store, + store: queueStore, loggerOnce: loggerOnce, quitCh: make(chan struct{}), } if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + store.StreamItems(target.store, target, target.quitCh, target.loggerOnce) } return target, nil diff --git a/internal/event/target/mqtt.go b/internal/event/target/mqtt.go index 4f83f55c7..c5f315c1d 100644 --- a/internal/event/target/mqtt.go +++ b/internal/event/target/mqtt.go @@ -31,6 +31,7 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" ) @@ -111,7 +112,7 @@ type MQTTTarget struct { id event.TargetID args MQTTArgs client mqtt.Client - store Store + store store.Store[event.Event] quitCh chan struct{} loggerOnce logger.LogOnce } @@ -121,6 +122,11 @@ func (target *MQTTTarget) ID() event.TargetID { return target.id } +// Name - returns the Name of the target. +func (target *MQTTTarget) Name() string { + return target.ID().String() +} + // Store returns any underlying store if set. func (target *MQTTTarget) Store() event.TargetStore { return target.store @@ -136,7 +142,7 @@ func (target *MQTTTarget) IsActive() (bool, error) { func (target *MQTTTarget) isActive() (bool, error) { if !target.client.IsConnectionOpen() { - return false, errNotConnected + return false, store.ErrNotConnected } return true, nil } @@ -156,7 +162,7 @@ func (target *MQTTTarget) send(eventData event.Event) error { token := target.client.Publish(target.args.Topic, target.args.QoS, false, string(data)) if !token.WaitTimeout(reconnectInterval) { - return errNotConnected + return store.ErrNotConnected } return token.Error() } @@ -245,7 +251,7 @@ func (target *MQTTTarget) initMQTT() error { token := target.client.Connect() ok := token.WaitTimeout(reconnectInterval) if !ok { - return errNotConnected + return store.ErrNotConnected } if token.Error() != nil { return token.Error() @@ -256,7 +262,7 @@ func (target *MQTTTarget) initMQTT() error { return err } if !yes { - return errNotConnected + return store.ErrNotConnected } return nil @@ -274,11 +280,11 @@ func NewMQTTTarget(id string, args MQTTArgs, loggerOnce logger.LogOnce) (*MQTTTa args.KeepAlive = 10 * time.Second } - var store Store + var queueStore store.Store[event.Event] if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if err := store.Open(); err != nil { + queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension) + if err := queueStore.Open(); err != nil { return nil, fmt.Errorf("unable to initialize the queue store of MQTT `%s`: %w", id, err) } } @@ -286,13 +292,13 @@ func NewMQTTTarget(id string, args MQTTArgs, loggerOnce logger.LogOnce) (*MQTTTa target := &MQTTTarget{ id: event.TargetID{ID: id, Name: "mqtt"}, args: args, - store: store, + store: queueStore, quitCh: make(chan struct{}), loggerOnce: loggerOnce, } if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + store.StreamItems(target.store, target, target.quitCh, target.loggerOnce) } return target, nil diff --git a/internal/event/target/mysql.go b/internal/event/target/mysql.go index 7b985e7d5..e5ca61515 100644 --- a/internal/event/target/mysql.go +++ b/internal/event/target/mysql.go @@ -33,6 +33,7 @@ import ( "github.com/go-sql-driver/mysql" "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" ) @@ -153,7 +154,7 @@ type MySQLTarget struct { deleteStmt *sql.Stmt insertStmt *sql.Stmt db *sql.DB - store Store + store store.Store[event.Event] firstPing bool loggerOnce logger.LogOnce @@ -165,6 +166,11 @@ func (target *MySQLTarget) ID() event.TargetID { return target.id } +// Name - returns the Name of the target. +func (target *MySQLTarget) Name() string { + return target.ID().String() +} + // Store returns any underlying store if set. func (target *MySQLTarget) Store() event.TargetStore { return target.store @@ -181,7 +187,7 @@ func (target *MySQLTarget) IsActive() (bool, error) { func (target *MySQLTarget) isActive() (bool, error) { if err := target.db.Ping(); err != nil { if IsConnErr(err) { - return false, errNotConnected + return false, store.ErrNotConnected } return false, err } @@ -260,7 +266,7 @@ func (target *MySQLTarget) Send(eventKey string) error { if !target.firstPing { if err := target.executeStmts(); err != nil { if IsConnErr(err) { - return errNotConnected + return store.ErrNotConnected } return err } @@ -278,7 +284,7 @@ func (target *MySQLTarget) Send(eventKey string) error { if err := target.send(eventData); err != nil { if IsConnErr(err) { - return errNotConnected + return store.ErrNotConnected } return err } @@ -363,7 +369,7 @@ func (target *MySQLTarget) initMySQL() error { err = target.db.Ping() if err != nil { - if !(IsConnRefusedErr(err) || IsConnResetErr(err)) { + if !(xnet.IsConnRefusedErr(err) || xnet.IsConnResetErr(err)) { target.loggerOnce(context.Background(), err, target.ID().String()) } } else { @@ -384,7 +390,7 @@ func (target *MySQLTarget) initMySQL() error { return err } if !yes { - return errNotConnected + return store.ErrNotConnected } return nil @@ -392,11 +398,11 @@ func (target *MySQLTarget) initMySQL() error { // NewMySQLTarget - creates new MySQL target. func NewMySQLTarget(id string, args MySQLArgs, loggerOnce logger.LogOnce) (*MySQLTarget, error) { - var store Store + var queueStore store.Store[event.Event] if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-mysql-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if err := store.Open(); err != nil { + queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension) + if err := queueStore.Open(); err != nil { return nil, fmt.Errorf("unable to initialize the queue store of MySQL `%s`: %w", id, err) } } @@ -419,13 +425,13 @@ func NewMySQLTarget(id string, args MySQLArgs, loggerOnce logger.LogOnce) (*MySQ id: event.TargetID{ID: id, Name: "mysql"}, args: args, firstPing: false, - store: store, + store: queueStore, loggerOnce: loggerOnce, quitCh: make(chan struct{}), } if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + store.StreamItems(target.store, target, target.quitCh, target.loggerOnce) } return target, nil diff --git a/internal/event/target/nats.go b/internal/event/target/nats.go index 70c0f8d8d..ceeec6e32 100644 --- a/internal/event/target/nats.go +++ b/internal/event/target/nats.go @@ -28,8 +28,10 @@ import ( "os" "path/filepath" + "github.com/google/uuid" "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" "github.com/nats-io/nats.go" "github.com/nats-io/stan.go" @@ -198,10 +200,11 @@ func (n NATSArgs) connectStan() (stan.Conn, error) { addressURL = scheme + "://" + n.Address.String() } - clientID, err := getNewUUID() + u, err := uuid.NewRandom() if err != nil { return nil, err } + clientID := u.String() connOpts := []stan.Option{stan.NatsURL(addressURL)} if n.Streaming.MaxPubAcksInflight > 0 { @@ -220,7 +223,7 @@ type NATSTarget struct { natsConn *nats.Conn stanConn stan.Conn jstream nats.JetStream - store Store + store store.Store[event.Event] loggerOnce logger.LogOnce quitCh chan struct{} } @@ -230,6 +233,11 @@ func (target *NATSTarget) ID() event.TargetID { return target.id } +// Name - returns the Name of the target. +func (target *NATSTarget) Name() string { + return target.ID().String() +} + // Store returns any underlying store if set. func (target *NATSTarget) Store() event.TargetStore { return target.store @@ -249,19 +257,19 @@ func (target *NATSTarget) isActive() (bool, error) { if target.stanConn == nil || target.stanConn.NatsConn() == nil { target.stanConn, connErr = target.args.connectStan() } else if !target.stanConn.NatsConn().IsConnected() { - return false, errNotConnected + return false, store.ErrNotConnected } } else { if target.natsConn == nil { target.natsConn, connErr = target.args.connectNats() } else if !target.natsConn.IsConnected() { - return false, errNotConnected + return false, store.ErrNotConnected } } if connErr != nil { if connErr.Error() == nats.ErrNoServers.Error() { - return false, errNotConnected + return false, store.ErrNotConnected } return false, connErr } @@ -270,7 +278,7 @@ func (target *NATSTarget) isActive() (bool, error) { target.jstream, connErr = target.natsConn.JetStream() if connErr != nil { if connErr.Error() == nats.ErrNoServers.Error() { - return false, errNotConnected + return false, store.ErrNotConnected } return false, connErr } @@ -412,7 +420,7 @@ func (target *NATSTarget) initNATS() error { return err } if !yes { - return errNotConnected + return store.ErrNotConnected } return nil @@ -420,11 +428,11 @@ func (target *NATSTarget) initNATS() error { // NewNATSTarget - creates new NATS target. func NewNATSTarget(id string, args NATSArgs, loggerOnce logger.LogOnce) (*NATSTarget, error) { - var store Store + var queueStore store.Store[event.Event] if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-nats-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if err := store.Open(); err != nil { + queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension) + if err := queueStore.Open(); err != nil { return nil, fmt.Errorf("unable to initialize the queue store of NATS `%s`: %w", id, err) } } @@ -433,12 +441,12 @@ func NewNATSTarget(id string, args NATSArgs, loggerOnce logger.LogOnce) (*NATSTa id: event.TargetID{ID: id, Name: "nats"}, args: args, loggerOnce: loggerOnce, - store: store, + store: queueStore, quitCh: make(chan struct{}), } if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + store.StreamItems(target.store, target, target.quitCh, target.loggerOnce) } return target, nil diff --git a/internal/event/target/nsq.go b/internal/event/target/nsq.go index 395d57a1d..82ce09ba0 100644 --- a/internal/event/target/nsq.go +++ b/internal/event/target/nsq.go @@ -31,6 +31,7 @@ import ( "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" ) @@ -94,7 +95,7 @@ type NSQTarget struct { id event.TargetID args NSQArgs producer *nsq.Producer - store Store + store store.Store[event.Event] config *nsq.Config loggerOnce logger.LogOnce quitCh chan struct{} @@ -105,6 +106,11 @@ func (target *NSQTarget) ID() event.TargetID { return target.id } +// Name - returns the Name of the target. +func (target *NSQTarget) Name() string { + return target.ID().String() +} + // Store returns any underlying store if set. func (target *NSQTarget) Store() event.TargetStore { return target.store @@ -129,8 +135,8 @@ func (target *NSQTarget) isActive() (bool, error) { if err := target.producer.Ping(); err != nil { // To treat "connection refused" errors as errNotConnected. - if IsConnRefusedErr(err) { - return false, errNotConnected + if xnet.IsConnRefusedErr(err) { + return false, store.ErrNotConnected } return false, err } @@ -234,7 +240,7 @@ func (target *NSQTarget) initNSQ() error { err = target.producer.Ping() if err != nil { // To treat "connection refused" errors as errNotConnected. - if !(IsConnRefusedErr(err) || IsConnResetErr(err)) { + if !(xnet.IsConnRefusedErr(err) || xnet.IsConnResetErr(err)) { target.loggerOnce(context.Background(), err, target.ID().String()) } target.producer.Stop() @@ -246,7 +252,7 @@ func (target *NSQTarget) initNSQ() error { return err } if !yes { - return errNotConnected + return store.ErrNotConnected } return nil @@ -254,11 +260,11 @@ func (target *NSQTarget) initNSQ() error { // NewNSQTarget - creates new NSQ target. func NewNSQTarget(id string, args NSQArgs, loggerOnce logger.LogOnce) (*NSQTarget, error) { - var store Store + var queueStore store.Store[event.Event] if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-nsq-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if err := store.Open(); err != nil { + queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension) + if err := queueStore.Open(); err != nil { return nil, fmt.Errorf("unable to initialize the queue store of NSQ `%s`: %w", id, err) } } @@ -267,12 +273,12 @@ func NewNSQTarget(id string, args NSQArgs, loggerOnce logger.LogOnce) (*NSQTarge id: event.TargetID{ID: id, Name: "nsq"}, args: args, loggerOnce: loggerOnce, - store: store, + store: queueStore, quitCh: make(chan struct{}), } if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + store.StreamItems(target.store, target, target.quitCh, target.loggerOnce) } return target, nil diff --git a/internal/event/target/postgresql.go b/internal/event/target/postgresql.go index 02d5d98b4..e91fc7b40 100644 --- a/internal/event/target/postgresql.go +++ b/internal/event/target/postgresql.go @@ -34,6 +34,7 @@ import ( "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" ) @@ -145,7 +146,7 @@ type PostgreSQLTarget struct { deleteStmt *sql.Stmt insertStmt *sql.Stmt db *sql.DB - store Store + store store.Store[event.Event] firstPing bool connString string loggerOnce logger.LogOnce @@ -157,6 +158,11 @@ func (target *PostgreSQLTarget) ID() event.TargetID { return target.id } +// Name - returns the Name of the target. +func (target *PostgreSQLTarget) Name() string { + return target.ID().String() +} + // Store returns any underlying store if set. func (target *PostgreSQLTarget) Store() event.TargetStore { return target.store @@ -173,7 +179,7 @@ func (target *PostgreSQLTarget) IsActive() (bool, error) { func (target *PostgreSQLTarget) isActive() (bool, error) { if err := target.db.Ping(); err != nil { if IsConnErr(err) { - return false, errNotConnected + return false, store.ErrNotConnected } return false, err } @@ -198,7 +204,7 @@ func (target *PostgreSQLTarget) Save(eventData event.Event) error { // IsConnErr - To detect a connection error. func IsConnErr(err error) bool { - return IsConnRefusedErr(err) || err.Error() == "sql: database is closed" || err.Error() == "sql: statement is closed" || err.Error() == "invalid connection" + return xnet.IsConnRefusedErr(err) || err.Error() == "sql: database is closed" || err.Error() == "sql: statement is closed" || err.Error() == "invalid connection" } // send - sends an event to the PostgreSQL. @@ -255,7 +261,7 @@ func (target *PostgreSQLTarget) Send(eventKey string) error { if !target.firstPing { if err := target.executeStmts(); err != nil { if IsConnErr(err) { - return errNotConnected + return store.ErrNotConnected } return err } @@ -273,7 +279,7 @@ func (target *PostgreSQLTarget) Send(eventKey string) error { if err := target.send(eventData); err != nil { if IsConnErr(err) { - return errNotConnected + return store.ErrNotConnected } return err } @@ -357,7 +363,7 @@ func (target *PostgreSQLTarget) initPostgreSQL() error { err = target.db.Ping() if err != nil { - if !(IsConnRefusedErr(err) || IsConnResetErr(err)) { + if !(xnet.IsConnRefusedErr(err) || xnet.IsConnResetErr(err)) { target.loggerOnce(context.Background(), err, target.ID().String()) } } else { @@ -378,7 +384,7 @@ func (target *PostgreSQLTarget) initPostgreSQL() error { return err } if !yes { - return errNotConnected + return store.ErrNotConnected } return nil @@ -407,11 +413,11 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, loggerOnce logger.LogOn } connStr := strings.Join(params, " ") - var store Store + var queueStore store.Store[event.Event] if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-postgresql-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if err := store.Open(); err != nil { + queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension) + if err := queueStore.Open(); err != nil { return nil, fmt.Errorf("unable to initialize the queue store of PostgreSQL `%s`: %w", id, err) } } @@ -420,14 +426,14 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, loggerOnce logger.LogOn id: event.TargetID{ID: id, Name: "postgresql"}, args: args, firstPing: false, - store: store, + store: queueStore, connString: connStr, loggerOnce: loggerOnce, quitCh: make(chan struct{}), } if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + store.StreamItems(target.store, target, target.quitCh, target.loggerOnce) } return target, nil diff --git a/internal/event/target/redis.go b/internal/event/target/redis.go index cfc15a132..40f7ba256 100644 --- a/internal/event/target/redis.go +++ b/internal/event/target/redis.go @@ -31,6 +31,7 @@ import ( "github.com/gomodule/redigo/redis" "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" ) @@ -122,7 +123,7 @@ type RedisTarget struct { id event.TargetID args RedisArgs pool *redis.Pool - store Store + store store.Store[event.Event] firstPing bool loggerOnce logger.LogOnce quitCh chan struct{} @@ -133,6 +134,11 @@ func (target *RedisTarget) ID() event.TargetID { return target.id } +// Name - returns the Name of the target. +func (target *RedisTarget) Name() string { + return target.ID().String() +} + // Store returns any underlying store if set. func (target *RedisTarget) Store() event.TargetStore { return target.store @@ -152,8 +158,8 @@ func (target *RedisTarget) isActive() (bool, error) { _, pingErr := conn.Do("PING") if pingErr != nil { - if IsConnRefusedErr(pingErr) { - return false, errNotConnected + if xnet.IsConnRefusedErr(pingErr) { + return false, store.ErrNotConnected } return false, pingErr } @@ -227,16 +233,16 @@ func (target *RedisTarget) Send(eventKey string) error { _, pingErr := conn.Do("PING") if pingErr != nil { - if IsConnRefusedErr(pingErr) { - return errNotConnected + if xnet.IsConnRefusedErr(pingErr) { + return store.ErrNotConnected } return pingErr } if !target.firstPing { if err := target.args.validateFormat(conn); err != nil { - if IsConnRefusedErr(err) { - return errNotConnected + if xnet.IsConnRefusedErr(err) { + return store.ErrNotConnected } return err } @@ -254,8 +260,8 @@ func (target *RedisTarget) Send(eventKey string) error { } if err := target.send(eventData); err != nil { - if IsConnRefusedErr(err) { - return errNotConnected + if xnet.IsConnRefusedErr(err) { + return store.ErrNotConnected } return err } @@ -280,7 +286,7 @@ func (target *RedisTarget) initRedis() error { _, pingErr := conn.Do("PING") if pingErr != nil { - if !(IsConnRefusedErr(pingErr) || IsConnResetErr(pingErr)) { + if !(xnet.IsConnRefusedErr(pingErr) || xnet.IsConnResetErr(pingErr)) { target.loggerOnce(context.Background(), pingErr, target.ID().String()) } return pingErr @@ -298,7 +304,7 @@ func (target *RedisTarget) initRedis() error { return err } if !yes { - return errNotConnected + return store.ErrNotConnected } return nil @@ -306,11 +312,11 @@ func (target *RedisTarget) initRedis() error { // NewRedisTarget - creates new Redis target. func NewRedisTarget(id string, args RedisArgs, loggerOnce logger.LogOnce) (*RedisTarget, error) { - var store Store + var queueStore store.Store[event.Event] if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-redis-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if err := store.Open(); err != nil { + queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension) + if err := queueStore.Open(); err != nil { return nil, fmt.Errorf("unable to initialize the queue store of Redis `%s`: %w", id, err) } } @@ -349,13 +355,13 @@ func NewRedisTarget(id string, args RedisArgs, loggerOnce logger.LogOnce) (*Redi id: event.TargetID{ID: id, Name: "redis"}, args: args, pool: pool, - store: store, + store: queueStore, loggerOnce: loggerOnce, quitCh: make(chan struct{}), } if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + store.StreamItems(target.store, target, target.quitCh, target.loggerOnce) } return target, nil diff --git a/internal/event/target/store.go b/internal/event/target/store.go deleted file mode 100644 index 199c21a9e..000000000 --- a/internal/event/target/store.go +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright (c) 2015-2021 MinIO, Inc. -// -// This file is part of MinIO Object Storage stack -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package target - -import ( - "context" - "errors" - "fmt" - "strings" - "syscall" - "time" - - "github.com/minio/minio/internal/event" - "github.com/minio/minio/internal/logger" -) - -const retryInterval = 3 * time.Second - -// errNotConnected - indicates that the target connection is not active. -var errNotConnected = errors.New("not connected to target server/service") - -// errLimitExceeded error is sent when the maximum limit is reached. -var errLimitExceeded = errors.New("the maximum store limit reached") - -// Store - To persist the events. -type Store interface { - Put(event event.Event) error - Get(key string) (event.Event, error) - Len() int - List() ([]string, error) - Del(key string) error - Open() error -} - -// replayEvents - Reads the events from the store and replays. -func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce logger.LogOnce, id string) <-chan string { - eventKeyCh := make(chan string) - - go func() { - defer close(eventKeyCh) - - retryTicker := time.NewTicker(retryInterval) - defer retryTicker.Stop() - - for { - names, err := store.List() - if err != nil { - loggerOnce(context.Background(), fmt.Errorf("eventStore.List() failed with: %w", err), id) - } else { - for _, name := range names { - select { - case eventKeyCh <- strings.TrimSuffix(name, eventExt): - // Get next key. - case <-doneCh: - return - } - } - } - - select { - case <-retryTicker.C: - case <-doneCh: - return - } - } - }() - - return eventKeyCh -} - -// IsConnRefusedErr - To check fot "connection refused" error. -func IsConnRefusedErr(err error) bool { - return errors.Is(err, syscall.ECONNREFUSED) -} - -// IsConnResetErr - Checks for connection reset errors. -func IsConnResetErr(err error) bool { - if strings.Contains(err.Error(), "connection reset by peer") { - return true - } - // incase if error message is wrapped. - return errors.Is(err, syscall.ECONNRESET) -} - -// sendEvents - Reads events from the store and re-plays. -func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}, loggerOnce logger.LogOnce) { - retryTicker := time.NewTicker(retryInterval) - defer retryTicker.Stop() - - send := func(eventKey string) bool { - for { - err := target.Send(eventKey) - if err == nil { - break - } - - if err != errNotConnected && !IsConnResetErr(err) { - loggerOnce(context.Background(), - fmt.Errorf("target.Send() failed with '%w'", err), - target.ID().String()) - } - - // Retrying after 3secs back-off - - select { - case <-retryTicker.C: - case <-doneCh: - return false - } - } - return true - } - - for { - select { - case eventKey, ok := <-eventKeyCh: - if !ok { - // closed channel. - return - } - - if !send(eventKey) { - return - } - case <-doneCh: - return - } - } -} - -func streamEventsFromStore(store Store, target event.Target, doneCh <-chan struct{}, loggerOnce logger.LogOnce) { - go func() { - // Replays the events from the store. - eventKeyCh := replayEvents(store, doneCh, loggerOnce, target.ID().String()) - // Send events from the store. - sendEvents(target, eventKeyCh, doneCh, loggerOnce) - }() -} diff --git a/internal/event/target/webhook.go b/internal/event/target/webhook.go index 9b16893ad..b7feb9ad1 100644 --- a/internal/event/target/webhook.go +++ b/internal/event/target/webhook.go @@ -35,6 +35,7 @@ import ( "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/store" "github.com/minio/pkg/certs" xnet "github.com/minio/pkg/net" ) @@ -96,7 +97,7 @@ type WebhookTarget struct { args WebhookArgs transport *http.Transport httpClient *http.Client - store Store + store store.Store[event.Event] loggerOnce logger.LogOnce cancel context.CancelFunc cancelCh <-chan struct{} @@ -107,6 +108,11 @@ func (target *WebhookTarget) ID() event.TargetID { return target.id } +// Name - returns the Name of the target. +func (target *WebhookTarget) Name() string { + return target.ID().String() +} + // IsActive - Return true if target is up and active func (target *WebhookTarget) IsActive() (bool, error) { if err := target.init(); err != nil { @@ -127,7 +133,7 @@ func (target *WebhookTarget) isActive() (bool, error) { req, err := http.NewRequestWithContext(ctx, http.MethodHead, target.args.Endpoint.String(), nil) if err != nil { if xnet.IsNetworkOrHostDown(err, false) { - return false, errNotConnected + return false, store.ErrNotConnected } return false, err } @@ -142,7 +148,7 @@ func (target *WebhookTarget) isActive() (bool, error) { resp, err := target.httpClient.Do(req) if err != nil { if xnet.IsNetworkOrHostDown(err, true) { - return false, errNotConnected + return false, store.ErrNotConnected } return false, err } @@ -165,7 +171,7 @@ func (target *WebhookTarget) Save(eventData event.Event) error { err := target.send(eventData) if err != nil { if xnet.IsNetworkOrHostDown(err, false) { - return errNotConnected + return store.ErrNotConnected } } return err @@ -235,7 +241,7 @@ func (target *WebhookTarget) Send(eventKey string) error { if err := target.send(eventData); err != nil { if xnet.IsNetworkOrHostDown(err, false) { - return errNotConnected + return store.ErrNotConnected } return err } @@ -274,7 +280,7 @@ func (target *WebhookTarget) initWebhook() error { return err } if !yes { - return errNotConnected + return store.ErrNotConnected } return nil @@ -284,11 +290,11 @@ func (target *WebhookTarget) initWebhook() error { func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOnce logger.LogOnce, transport *http.Transport) (*WebhookTarget, error) { ctx, cancel := context.WithCancel(ctx) - var store Store + var queueStore store.Store[event.Event] if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-webhook-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if err := store.Open(); err != nil { + queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension) + if err := queueStore.Open(); err != nil { cancel() return nil, fmt.Errorf("unable to initialize the queue store of Webhook `%s`: %w", id, err) } @@ -299,13 +305,13 @@ func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOn args: args, loggerOnce: loggerOnce, transport: transport, - store: store, + store: queueStore, cancel: cancel, cancelCh: ctx.Done(), } if target.store != nil { - streamEventsFromStore(target.store, target, target.cancelCh, target.loggerOnce) + store.StreamItems(target.store, target, target.cancelCh, target.loggerOnce) } return target, nil diff --git a/internal/event/target/queuestore.go b/internal/store/queuestore.go similarity index 65% rename from internal/event/target/queuestore.go rename to internal/store/queuestore.go index 38d9f8922..d4a2968ac 100644 --- a/internal/event/target/queuestore.go +++ b/internal/store/queuestore.go @@ -15,10 +15,11 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package target +package store import ( "encoding/json" + "errors" "os" "path/filepath" "sort" @@ -26,38 +27,47 @@ import ( "sync" "time" - "github.com/minio/minio/internal/event" + "github.com/google/uuid" ) const ( defaultLimit = 100000 // Default store limit. - eventExt = ".event" + defaultExt = ".unknown" ) -// QueueStore - Filestore for persisting events. -type QueueStore struct { +// errLimitExceeded error is sent when the maximum limit is reached. +var errLimitExceeded = errors.New("the maximum store limit reached") + +// QueueStore - Filestore for persisting items. +type QueueStore[_ any] struct { sync.RWMutex entryLimit uint64 directory string + fileExt string entries map[string]int64 // key -> modtime as unix nano } // NewQueueStore - Creates an instance for QueueStore. -func NewQueueStore(directory string, limit uint64) Store { +func NewQueueStore[I any](directory string, limit uint64, ext string) *QueueStore[I] { if limit == 0 { limit = defaultLimit } - return &QueueStore{ + if ext == "" { + ext = defaultExt + } + + return &QueueStore[I]{ directory: directory, entryLimit: limit, + fileExt: ext, entries: make(map[string]int64, limit), } } // Open - Creates the directory if not present. -func (store *QueueStore) Open() error { +func (store *QueueStore[_]) Open() error { store.Lock() defer store.Unlock() @@ -78,7 +88,7 @@ func (store *QueueStore) Open() error { if file.IsDir() { continue } - key := strings.TrimSuffix(file.Name(), eventExt) + key := strings.TrimSuffix(file.Name(), store.fileExt) if fi, err := file.Info(); err == nil { store.entries[key] = fi.ModTime().UnixNano() } @@ -87,44 +97,45 @@ func (store *QueueStore) Open() error { return nil } -// write - writes event to the directory. -func (store *QueueStore) write(key string, e event.Event) error { - // Marshalls the event. - eventData, err := json.Marshal(e) +// write - writes an item to the directory. +func (store *QueueStore[I]) write(key string, item I) error { + // Marshalls the item. + eventData, err := json.Marshal(item) if err != nil { return err } - path := filepath.Join(store.directory, key+eventExt) + path := filepath.Join(store.directory, key+store.fileExt) if err := os.WriteFile(path, eventData, os.FileMode(0o770)); err != nil { return err } - // Increment the event count. + // Increment the item count. store.entries[key] = time.Now().UnixNano() return nil } -// Put - puts a event to the store. -func (store *QueueStore) Put(e event.Event) error { +// Put - puts an item to the store. +func (store *QueueStore[I]) Put(item I) error { store.Lock() defer store.Unlock() if uint64(len(store.entries)) >= store.entryLimit { return errLimitExceeded } - key, err := getNewUUID() + // Generate a new UUID for the key. + key, err := uuid.NewRandom() if err != nil { return err } - return store.write(key, e) + return store.write(key.String(), item) } -// Get - gets a event from the store. -func (store *QueueStore) Get(key string) (event event.Event, err error) { +// Get - gets an item from the store. +func (store *QueueStore[I]) Get(key string) (item I, err error) { store.RLock() - defer func(store *QueueStore) { + defer func(store *QueueStore[I]) { store.RUnlock() if err != nil { // Upon error we remove the entry. @@ -133,31 +144,31 @@ func (store *QueueStore) Get(key string) (event event.Event, err error) { }(store) var eventData []byte - eventData, err = os.ReadFile(filepath.Join(store.directory, key+eventExt)) + eventData, err = os.ReadFile(filepath.Join(store.directory, key+store.fileExt)) if err != nil { - return event, err + return item, err } if len(eventData) == 0 { - return event, os.ErrNotExist + return item, os.ErrNotExist } - if err = json.Unmarshal(eventData, &event); err != nil { - return event, err + if err = json.Unmarshal(eventData, &item); err != nil { + return item, err } - return event, nil + return item, nil } // Del - Deletes an entry from the store. -func (store *QueueStore) Del(key string) error { +func (store *QueueStore[_]) Del(key string) error { store.Lock() defer store.Unlock() return store.del(key) } // Len returns the entry count. -func (store *QueueStore) Len() int { +func (store *QueueStore[_]) Len() int { store.RLock() l := len(store.entries) defer store.RUnlock() @@ -165,8 +176,8 @@ func (store *QueueStore) Len() int { } // lockless call -func (store *QueueStore) del(key string) error { - err := os.Remove(filepath.Join(store.directory, key+eventExt)) +func (store *QueueStore[_]) del(key string) error { + err := os.Remove(filepath.Join(store.directory, key+store.fileExt)) // Delete as entry no matter the result delete(store.entries, key) @@ -175,7 +186,7 @@ func (store *QueueStore) del(key string) error { } // List - lists all files registered in the store. -func (store *QueueStore) List() ([]string, error) { +func (store *QueueStore[_]) List() ([]string, error) { store.RLock() l := make([]string, 0, len(store.entries)) for k := range store.entries { @@ -194,7 +205,7 @@ func (store *QueueStore) List() ([]string, error) { // list will read all entries from disk. // Entries are returned sorted by modtime, oldest first. // Underlying entry list in store is *not* updated. -func (store *QueueStore) list() ([]os.DirEntry, error) { +func (store *QueueStore[_]) list() ([]os.DirEntry, error) { files, err := os.ReadDir(store.directory) if err != nil { return nil, err @@ -215,3 +226,9 @@ func (store *QueueStore) list() ([]os.DirEntry, error) { return files, nil } + +// Extension will return the file extension used +// for the items stored in the queue. +func (store *QueueStore[_]) Extension() string { + return store.fileExt +} diff --git a/internal/event/target/queuestore_test.go b/internal/store/queuestore_test.go similarity index 62% rename from internal/event/target/queuestore_test.go rename to internal/store/queuestore_test.go index 7c5637d90..680cb177b 100644 --- a/internal/event/target/queuestore_test.go +++ b/internal/store/queuestore_test.go @@ -15,7 +15,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package target +package store import ( "os" @@ -23,48 +23,54 @@ import ( "reflect" "strings" "testing" - - "github.com/minio/minio/internal/event" ) -// TestDir -var queueDir = filepath.Join(os.TempDir(), "minio_test") - -// Sample test event. -var testEvent = event.Event{EventVersion: "1.0", EventSource: "test_source", AwsRegion: "test_region", EventTime: "test_time", EventName: event.ObjectAccessedGet} - -// Initialize the store. -func setUpStore(directory string, limit uint64) (Store, error) { - store := NewQueueStore(queueDir, limit) - if oErr := store.Open(); oErr != nil { - return nil, oErr - } - return store, nil +type TestItem struct { + Name string `json:"Name"` + Property string `json:"property"` } -// Tear down store -func tearDownStore() error { +var ( + // TestDir + queueDir = filepath.Join(os.TempDir(), "minio_test") + // Sample test item. + testItem = TestItem{Name: "test-item", Property: "property"} + // Ext for test item + testItemExt = ".test" +) + +// Initialize the queue store. +func setUpQueueStore(directory string, limit uint64) (Store[TestItem], error) { + queueStore := NewQueueStore[TestItem](queueDir, limit, testItemExt) + if oErr := queueStore.Open(); oErr != nil { + return nil, oErr + } + return queueStore, nil +} + +// Tear down queue store. +func tearDownQueueStore() error { return os.RemoveAll(queueDir) } // TestQueueStorePut - tests for store.Put func TestQueueStorePut(t *testing.T) { defer func() { - if err := tearDownStore(); err != nil { + if err := tearDownQueueStore(); err != nil { t.Fatal("Failed to tear down store ", err) } }() - store, err := setUpStore(queueDir, 100) + store, err := setUpQueueStore(queueDir, 100) if err != nil { t.Fatal("Failed to create a queue store ", err) } - // Put 100 events. + // Put 100 items. for i := 0; i < 100; i++ { - if err := store.Put(testEvent); err != nil { + if err := store.Put(testItem); err != nil { t.Fatal("Failed to put to queue store ", err) } } - // Count the events. + // Count the items. names, err := store.List() if err != nil { t.Fatal(err) @@ -77,71 +83,71 @@ func TestQueueStorePut(t *testing.T) { // TestQueueStoreGet - tests for store.Get func TestQueueStoreGet(t *testing.T) { defer func() { - if err := tearDownStore(); err != nil { + if err := tearDownQueueStore(); err != nil { t.Fatal("Failed to tear down store ", err) } }() - store, err := setUpStore(queueDir, 10) + store, err := setUpQueueStore(queueDir, 10) if err != nil { t.Fatal("Failed to create a queue store ", err) } - // Put 10 events + // Put 10 items for i := 0; i < 10; i++ { - if err := store.Put(testEvent); err != nil { + if err := store.Put(testItem); err != nil { t.Fatal("Failed to put to queue store ", err) } } - eventKeys, err := store.List() + itemKeys, err := store.List() if err != nil { t.Fatal(err) } - // Get 10 events. - if len(eventKeys) == 10 { - for _, key := range eventKeys { - event, eErr := store.Get(strings.TrimSuffix(key, eventExt)) + // Get 10 items. + if len(itemKeys) == 10 { + for _, key := range itemKeys { + item, eErr := store.Get(strings.TrimSuffix(key, testItemExt)) if eErr != nil { - t.Fatal("Failed to Get the event from the queue store ", eErr) + t.Fatal("Failed to Get the item from the queue store ", eErr) } - if !reflect.DeepEqual(testEvent, event) { - t.Fatalf("Failed to read the event: error: expected = %v, got = %v", testEvent, event) + if !reflect.DeepEqual(testItem, item) { + t.Fatalf("Failed to read the item: error: expected = %v, got = %v", testItem, item) } } } else { - t.Fatalf("List() Expected: 10, got %d", len(eventKeys)) + t.Fatalf("List() Expected: 10, got %d", len(itemKeys)) } } // TestQueueStoreDel - tests for store.Del func TestQueueStoreDel(t *testing.T) { defer func() { - if err := tearDownStore(); err != nil { + if err := tearDownQueueStore(); err != nil { t.Fatal("Failed to tear down store ", err) } }() - store, err := setUpStore(queueDir, 20) + store, err := setUpQueueStore(queueDir, 20) if err != nil { t.Fatal("Failed to create a queue store ", err) } - // Put 20 events. + // Put 20 items. for i := 0; i < 20; i++ { - if err := store.Put(testEvent); err != nil { + if err := store.Put(testItem); err != nil { t.Fatal("Failed to put to queue store ", err) } } - eventKeys, err := store.List() + itemKeys, err := store.List() if err != nil { t.Fatal(err) } - // Remove all the events. - if len(eventKeys) == 20 { - for _, key := range eventKeys { - err := store.Del(strings.TrimSuffix(key, eventExt)) + // Remove all the items. + if len(itemKeys) == 20 { + for _, key := range itemKeys { + err := store.Del(strings.TrimSuffix(key, testItemExt)) if err != nil { t.Fatal("queue store Del failed with ", err) } } } else { - t.Fatalf("List() Expected: 20, got %d", len(eventKeys)) + t.Fatalf("List() Expected: 20, got %d", len(itemKeys)) } names, err := store.List() @@ -153,25 +159,25 @@ func TestQueueStoreDel(t *testing.T) { } } -// TestQueueStoreLimit - tests the event limit for the store. +// TestQueueStoreLimit - tests the item limit for the store. func TestQueueStoreLimit(t *testing.T) { defer func() { - if err := tearDownStore(); err != nil { + if err := tearDownQueueStore(); err != nil { t.Fatal("Failed to tear down store ", err) } }() // The max limit is set to 5. - store, err := setUpStore(queueDir, 5) + store, err := setUpQueueStore(queueDir, 5) if err != nil { t.Fatal("Failed to create a queue store ", err) } for i := 0; i < 5; i++ { - if err := store.Put(testEvent); err != nil { + if err := store.Put(testItem); err != nil { t.Fatal("Failed to put to queue store ", err) } } // Should not allow 6th Put. - if err := store.Put(testEvent); err == nil { + if err := store.Put(testItem); err == nil { t.Fatalf("Expected to fail with %s, but passes", errLimitExceeded) } } @@ -179,20 +185,20 @@ func TestQueueStoreLimit(t *testing.T) { // TestQueueStoreLimit - tests for store.LimitN. func TestQueueStoreListN(t *testing.T) { defer func() { - if err := tearDownStore(); err != nil { + if err := tearDownQueueStore(); err != nil { t.Fatal("Failed to tear down store ", err) } }() - store, err := setUpStore(queueDir, 10) + store, err := setUpQueueStore(queueDir, 10) if err != nil { t.Fatal("Failed to create a queue store ", err) } for i := 0; i < 10; i++ { - if err := store.Put(testEvent); err != nil { + if err := store.Put(testItem); err != nil { t.Fatal("Failed to put to queue store ", err) } } - // Should return all the event keys in the store. + // Should return all the item keys in the store. names, err := store.List() if err != nil { t.Fatal(err) @@ -203,7 +209,7 @@ func TestQueueStoreListN(t *testing.T) { } // re-open - store, err = setUpStore(queueDir, 10) + store, err = setUpQueueStore(queueDir, 10) if err != nil { t.Fatal("Failed to create a queue store ", err) } diff --git a/internal/store/store.go b/internal/store/store.go new file mode 100644 index 000000000..b8428a71f --- /dev/null +++ b/internal/store/store.go @@ -0,0 +1,145 @@ +// Copyright (c) 2015-2023 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package store + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/minio/minio/internal/logger" + xnet "github.com/minio/pkg/net" +) + +const ( + retryInterval = 3 * time.Second +) + +// ErrNotConnected - indicates that the target connection is not active. +var ErrNotConnected = errors.New("not connected to target server/service") + +// Target - store target interface +type Target interface { + Name() string + Send(key string) error +} + +// Store - Used to persist items. +type Store[I any] interface { + Put(item I) error + Get(key string) (I, error) + Len() int + List() ([]string, error) + Del(key string) error + Open() error + Extension() string +} + +// replayItems - Reads the items from the store and replays. +func replayItems[I any](store Store[I], doneCh <-chan struct{}, loggerOnce logger.LogOnce, id string) <-chan string { + itemKeyCh := make(chan string) + + go func() { + defer close(itemKeyCh) + + retryTicker := time.NewTicker(retryInterval) + defer retryTicker.Stop() + + for { + names, err := store.List() + if err != nil { + loggerOnce(context.Background(), fmt.Errorf("store.List() failed with: %w", err), id) + } else { + for _, name := range names { + select { + case itemKeyCh <- strings.TrimSuffix(name, store.Extension()): + // Get next key. + case <-doneCh: + return + } + } + } + + select { + case <-retryTicker.C: + case <-doneCh: + return + } + } + }() + + return itemKeyCh +} + +// sendItems - Reads items from the store and re-plays. +func sendItems(target Target, itemKeyCh <-chan string, doneCh <-chan struct{}, loggerOnce logger.LogOnce) { + retryTicker := time.NewTicker(retryInterval) + defer retryTicker.Stop() + + send := func(itemKey string) bool { + for { + err := target.Send(itemKey) + if err == nil { + break + } + + if err != ErrNotConnected && !xnet.IsConnResetErr(err) { + loggerOnce(context.Background(), + fmt.Errorf("target.Send() failed with '%w'", err), + target.Name()) + } + + // Retrying after 3secs back-off + + select { + case <-retryTicker.C: + case <-doneCh: + return false + } + } + return true + } + + for { + select { + case itemKey, ok := <-itemKeyCh: + if !ok { + // closed channel. + return + } + + if !send(itemKey) { + return + } + case <-doneCh: + return + } + } +} + +// StreamItems reads the keys from the store and replays the corresponding item to the target. +func StreamItems[I any](store Store[I], target Target, doneCh <-chan struct{}, loggerOnce logger.LogOnce) { + go func() { + // Replays the items from the store. + itemKeyCh := replayItems(store, doneCh, loggerOnce, target.Name()) + // Send items from the store. + sendItems(target, itemKeyCh, doneCh, loggerOnce) + }() +}