mirror of
https://github.com/minio/minio.git
synced 2025-04-30 14:47:10 -04:00
Generalize the event store using go generics (#16910)
This commit is contained in:
parent
6e359c586e
commit
51f7f9aaa3
2
go.mod
2
go.mod
@ -50,7 +50,7 @@ require (
|
|||||||
github.com/minio/madmin-go/v2 v2.0.17
|
github.com/minio/madmin-go/v2 v2.0.17
|
||||||
github.com/minio/minio-go/v7 v7.0.50
|
github.com/minio/minio-go/v7 v7.0.50
|
||||||
github.com/minio/mux v1.9.0
|
github.com/minio/mux v1.9.0
|
||||||
github.com/minio/pkg v1.6.5
|
github.com/minio/pkg v1.6.6-0.20230330040824-5db111e5f63c
|
||||||
github.com/minio/selfupdate v0.6.0
|
github.com/minio/selfupdate v0.6.0
|
||||||
github.com/minio/sha256-simd v1.0.0
|
github.com/minio/sha256-simd v1.0.0
|
||||||
github.com/minio/simdjson-go v0.4.5
|
github.com/minio/simdjson-go v0.4.5
|
||||||
|
4
go.sum
4
go.sum
@ -786,8 +786,8 @@ github.com/minio/minio-go/v7 v7.0.50/go.mod h1:IbbodHyjUAguneyucUaahv+VMNs/EOTV9
|
|||||||
github.com/minio/mux v1.9.0 h1:dWafQFyEfGhJvK6AwLOt83bIG5bxKxKJnKMCi0XAaoA=
|
github.com/minio/mux v1.9.0 h1:dWafQFyEfGhJvK6AwLOt83bIG5bxKxKJnKMCi0XAaoA=
|
||||||
github.com/minio/mux v1.9.0/go.mod h1:1pAare17ZRL5GpmNL+9YmqHoWnLmMZF9C/ioUCfy0BQ=
|
github.com/minio/mux v1.9.0/go.mod h1:1pAare17ZRL5GpmNL+9YmqHoWnLmMZF9C/ioUCfy0BQ=
|
||||||
github.com/minio/pkg v1.5.4/go.mod h1:2MOaRFdmFKULD+uOLc3qHLGTQTuxCNPKNPfLBTxC8CA=
|
github.com/minio/pkg v1.5.4/go.mod h1:2MOaRFdmFKULD+uOLc3qHLGTQTuxCNPKNPfLBTxC8CA=
|
||||||
github.com/minio/pkg v1.6.5 h1:T9cRNcCLJTFFgQGH0Rzr1CtAWLAIchTsbE0lSztCf40=
|
github.com/minio/pkg v1.6.6-0.20230330040824-5db111e5f63c h1:Ukw0+d0T/+9lserJfodr4HGIIb3hLkt8tlgMh2vUfZg=
|
||||||
github.com/minio/pkg v1.6.5/go.mod h1:0iX1IuJGSCnMvIvrEJauk1GgQSX9JdU6Kh0P3EQRGkI=
|
github.com/minio/pkg v1.6.6-0.20230330040824-5db111e5f63c/go.mod h1:0iX1IuJGSCnMvIvrEJauk1GgQSX9JdU6Kh0P3EQRGkI=
|
||||||
github.com/minio/selfupdate v0.6.0 h1:i76PgT0K5xO9+hjzKcacQtO7+MjJ4JKA8Ak8XQ9DDwU=
|
github.com/minio/selfupdate v0.6.0 h1:i76PgT0K5xO9+hjzKcacQtO7+MjJ4JKA8Ak8XQ9DDwU=
|
||||||
github.com/minio/selfupdate v0.6.0/go.mod h1:bO02GTIPCMQFTEvE5h4DjYB58bCoZ35XLeBf0buTDdM=
|
github.com/minio/selfupdate v0.6.0/go.mod h1:bO02GTIPCMQFTEvE5h4DjYB58bCoZ35XLeBf0buTDdM=
|
||||||
github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g=
|
github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g=
|
||||||
|
@ -30,6 +30,9 @@ const (
|
|||||||
|
|
||||||
// AMZTimeFormat - event time format.
|
// AMZTimeFormat - event time format.
|
||||||
AMZTimeFormat = "2006-01-02T15:04:05.000Z"
|
AMZTimeFormat = "2006-01-02T15:04:05.000Z"
|
||||||
|
|
||||||
|
// StoreExtension - file extension of an event file in store
|
||||||
|
StoreExtension = ".event"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Identity represents access key who caused the event.
|
// Identity represents access key who caused the event.
|
||||||
|
@ -30,6 +30,7 @@ import (
|
|||||||
|
|
||||||
"github.com/minio/minio/internal/event"
|
"github.com/minio/minio/internal/event"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
|
"github.com/minio/minio/internal/store"
|
||||||
xnet "github.com/minio/pkg/net"
|
xnet "github.com/minio/pkg/net"
|
||||||
"github.com/rabbitmq/amqp091-go"
|
"github.com/rabbitmq/amqp091-go"
|
||||||
)
|
)
|
||||||
@ -117,7 +118,7 @@ type AMQPTarget struct {
|
|||||||
args AMQPArgs
|
args AMQPArgs
|
||||||
conn *amqp091.Connection
|
conn *amqp091.Connection
|
||||||
connMutex sync.Mutex
|
connMutex sync.Mutex
|
||||||
store Store
|
store store.Store[event.Event]
|
||||||
loggerOnce logger.LogOnce
|
loggerOnce logger.LogOnce
|
||||||
|
|
||||||
quitCh chan struct{}
|
quitCh chan struct{}
|
||||||
@ -128,6 +129,11 @@ func (target *AMQPTarget) ID() event.TargetID {
|
|||||||
return target.id
|
return target.id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Name - returns the Name of the target.
|
||||||
|
func (target *AMQPTarget) Name() string {
|
||||||
|
return target.ID().String()
|
||||||
|
}
|
||||||
|
|
||||||
// Store returns any underlying store if set.
|
// Store returns any underlying store if set.
|
||||||
func (target *AMQPTarget) Store() event.TargetStore {
|
func (target *AMQPTarget) Store() event.TargetStore {
|
||||||
return target.store
|
return target.store
|
||||||
@ -197,8 +203,8 @@ func (target *AMQPTarget) channel() (*amqp091.Channel, chan amqp091.Confirmation
|
|||||||
|
|
||||||
conn, err = amqp091.Dial(target.args.URL.String())
|
conn, err = amqp091.Dial(target.args.URL.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if IsConnRefusedErr(err) {
|
if xnet.IsConnRefusedErr(err) {
|
||||||
return nil, nil, errNotConnected
|
return nil, nil, store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@ -330,7 +336,7 @@ func (target *AMQPTarget) init() error {
|
|||||||
func (target *AMQPTarget) initAMQP() error {
|
func (target *AMQPTarget) initAMQP() error {
|
||||||
conn, err := amqp091.Dial(target.args.URL.String())
|
conn, err := amqp091.Dial(target.args.URL.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if IsConnRefusedErr(err) || IsConnResetErr(err) {
|
if xnet.IsConnRefusedErr(err) || xnet.IsConnResetErr(err) {
|
||||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
@ -342,11 +348,11 @@ func (target *AMQPTarget) initAMQP() error {
|
|||||||
|
|
||||||
// NewAMQPTarget - creates new AMQP target.
|
// NewAMQPTarget - creates new AMQP target.
|
||||||
func NewAMQPTarget(id string, args AMQPArgs, loggerOnce logger.LogOnce) (*AMQPTarget, error) {
|
func NewAMQPTarget(id string, args AMQPArgs, loggerOnce logger.LogOnce) (*AMQPTarget, error) {
|
||||||
var store Store
|
var queueStore store.Store[event.Event]
|
||||||
if args.QueueDir != "" {
|
if args.QueueDir != "" {
|
||||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-amqp-"+id)
|
queueDir := filepath.Join(args.QueueDir, storePrefix+"-amqp-"+id)
|
||||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
|
||||||
if err := store.Open(); err != nil {
|
if err := queueStore.Open(); err != nil {
|
||||||
return nil, fmt.Errorf("unable to initialize the queue store of AMQP `%s`: %w", id, err)
|
return nil, fmt.Errorf("unable to initialize the queue store of AMQP `%s`: %w", id, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -355,12 +361,12 @@ func NewAMQPTarget(id string, args AMQPArgs, loggerOnce logger.LogOnce) (*AMQPTa
|
|||||||
id: event.TargetID{ID: id, Name: "amqp"},
|
id: event.TargetID{ID: id, Name: "amqp"},
|
||||||
args: args,
|
args: args,
|
||||||
loggerOnce: loggerOnce,
|
loggerOnce: loggerOnce,
|
||||||
store: store,
|
store: queueStore,
|
||||||
quitCh: make(chan struct{}),
|
quitCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
if target.store != nil {
|
if target.store != nil {
|
||||||
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
|
store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
|
||||||
}
|
}
|
||||||
|
|
||||||
return target, nil
|
return target, nil
|
||||||
|
@ -1,29 +0,0 @@
|
|||||||
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
||||||
//
|
|
||||||
// This file is part of MinIO Object Storage stack
|
|
||||||
//
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// This program is distributed in the hope that it will be useful
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package target
|
|
||||||
|
|
||||||
import "github.com/google/uuid"
|
|
||||||
|
|
||||||
func getNewUUID() (string, error) {
|
|
||||||
u, err := uuid.NewRandom()
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
return u.String(), nil
|
|
||||||
}
|
|
@ -36,6 +36,7 @@ import (
|
|||||||
"github.com/minio/highwayhash"
|
"github.com/minio/highwayhash"
|
||||||
"github.com/minio/minio/internal/event"
|
"github.com/minio/minio/internal/event"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
|
"github.com/minio/minio/internal/store"
|
||||||
xnet "github.com/minio/pkg/net"
|
xnet "github.com/minio/pkg/net"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
@ -157,7 +158,7 @@ type ElasticsearchTarget struct {
|
|||||||
id event.TargetID
|
id event.TargetID
|
||||||
args ElasticsearchArgs
|
args ElasticsearchArgs
|
||||||
client esClient
|
client esClient
|
||||||
store Store
|
store store.Store[event.Event]
|
||||||
loggerOnce logger.LogOnce
|
loggerOnce logger.LogOnce
|
||||||
quitCh chan struct{}
|
quitCh chan struct{}
|
||||||
}
|
}
|
||||||
@ -167,6 +168,11 @@ func (target *ElasticsearchTarget) ID() event.TargetID {
|
|||||||
return target.id
|
return target.id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Name - returns the Name of the target.
|
||||||
|
func (target *ElasticsearchTarget) Name() string {
|
||||||
|
return target.ID().String()
|
||||||
|
}
|
||||||
|
|
||||||
// Store returns any underlying store if set.
|
// Store returns any underlying store if set.
|
||||||
func (target *ElasticsearchTarget) Store() event.TargetStore {
|
func (target *ElasticsearchTarget) Store() event.TargetStore {
|
||||||
return target.store
|
return target.store
|
||||||
@ -212,7 +218,7 @@ func (target *ElasticsearchTarget) Save(eventData event.Event) error {
|
|||||||
|
|
||||||
err = target.send(eventData)
|
err = target.send(eventData)
|
||||||
if xnet.IsNetworkOrHostDown(err, false) {
|
if xnet.IsNetworkOrHostDown(err, false) {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -284,7 +290,7 @@ func (target *ElasticsearchTarget) Send(eventKey string) error {
|
|||||||
|
|
||||||
if err := target.send(eventData); err != nil {
|
if err := target.send(eventData); err != nil {
|
||||||
if xnet.IsNetworkOrHostDown(err, false) {
|
if xnet.IsNetworkOrHostDown(err, false) {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -348,7 +354,7 @@ func (target *ElasticsearchTarget) initElasticsearch() error {
|
|||||||
|
|
||||||
err := target.checkAndInitClient(ctx)
|
err := target.checkAndInitClient(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != errNotConnected {
|
if err != store.ErrNotConnected {
|
||||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
@ -359,11 +365,11 @@ func (target *ElasticsearchTarget) initElasticsearch() error {
|
|||||||
|
|
||||||
// NewElasticsearchTarget - creates new Elasticsearch target.
|
// NewElasticsearchTarget - creates new Elasticsearch target.
|
||||||
func NewElasticsearchTarget(id string, args ElasticsearchArgs, loggerOnce logger.LogOnce) (*ElasticsearchTarget, error) {
|
func NewElasticsearchTarget(id string, args ElasticsearchArgs, loggerOnce logger.LogOnce) (*ElasticsearchTarget, error) {
|
||||||
var store Store
|
var queueStore store.Store[event.Event]
|
||||||
if args.QueueDir != "" {
|
if args.QueueDir != "" {
|
||||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id)
|
queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id)
|
||||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
|
||||||
if err := store.Open(); err != nil {
|
if err := queueStore.Open(); err != nil {
|
||||||
return nil, fmt.Errorf("unable to initialize the queue store of Elasticsearch `%s`: %w", id, err)
|
return nil, fmt.Errorf("unable to initialize the queue store of Elasticsearch `%s`: %w", id, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -371,13 +377,13 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, loggerOnce logger
|
|||||||
target := &ElasticsearchTarget{
|
target := &ElasticsearchTarget{
|
||||||
id: event.TargetID{ID: id, Name: "elasticsearch"},
|
id: event.TargetID{ID: id, Name: "elasticsearch"},
|
||||||
args: args,
|
args: args,
|
||||||
store: store,
|
store: queueStore,
|
||||||
loggerOnce: loggerOnce,
|
loggerOnce: loggerOnce,
|
||||||
quitCh: make(chan struct{}),
|
quitCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
if target.store != nil {
|
if target.store != nil {
|
||||||
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
|
store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
|
||||||
}
|
}
|
||||||
|
|
||||||
return target, nil
|
return target, nil
|
||||||
@ -415,7 +421,7 @@ func (c *esClientV7) getServerSupportStatus(ctx context.Context) (ESSupportStatu
|
|||||||
c.Info.WithContext(ctx),
|
c.Info.WithContext(ctx),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ESSUnknown, "", errNotConnected
|
return ESSUnknown, "", store.ErrNotConnected
|
||||||
}
|
}
|
||||||
|
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
@ -485,7 +491,7 @@ func (c *esClientV7) ping(ctx context.Context, _ ElasticsearchArgs) (bool, error
|
|||||||
c.Ping.WithContext(ctx),
|
c.Ping.WithContext(ctx),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errNotConnected
|
return false, store.ErrNotConnected
|
||||||
}
|
}
|
||||||
io.Copy(io.Discard, resp.Body)
|
io.Copy(io.Discard, resp.Body)
|
||||||
resp.Body.Close()
|
resp.Body.Close()
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
|
|
||||||
"github.com/minio/minio/internal/event"
|
"github.com/minio/minio/internal/event"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
|
"github.com/minio/minio/internal/store"
|
||||||
xnet "github.com/minio/pkg/net"
|
xnet "github.com/minio/pkg/net"
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
@ -129,7 +130,7 @@ type KafkaTarget struct {
|
|||||||
args KafkaArgs
|
args KafkaArgs
|
||||||
producer sarama.SyncProducer
|
producer sarama.SyncProducer
|
||||||
config *sarama.Config
|
config *sarama.Config
|
||||||
store Store
|
store store.Store[event.Event]
|
||||||
loggerOnce logger.LogOnce
|
loggerOnce logger.LogOnce
|
||||||
quitCh chan struct{}
|
quitCh chan struct{}
|
||||||
}
|
}
|
||||||
@ -139,6 +140,11 @@ func (target *KafkaTarget) ID() event.TargetID {
|
|||||||
return target.id
|
return target.id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Name - returns the Name of the target.
|
||||||
|
func (target *KafkaTarget) Name() string {
|
||||||
|
return target.ID().String()
|
||||||
|
}
|
||||||
|
|
||||||
// Store returns any underlying store if set.
|
// Store returns any underlying store if set.
|
||||||
func (target *KafkaTarget) Store() event.TargetStore {
|
func (target *KafkaTarget) Store() event.TargetStore {
|
||||||
return target.store
|
return target.store
|
||||||
@ -154,7 +160,7 @@ func (target *KafkaTarget) IsActive() (bool, error) {
|
|||||||
|
|
||||||
func (target *KafkaTarget) isActive() (bool, error) {
|
func (target *KafkaTarget) isActive() (bool, error) {
|
||||||
if !target.args.pingBrokers() {
|
if !target.args.pingBrokers() {
|
||||||
return false, errNotConnected
|
return false, store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
@ -178,7 +184,7 @@ func (target *KafkaTarget) Save(eventData event.Event) error {
|
|||||||
// send - sends an event to the kafka.
|
// send - sends an event to the kafka.
|
||||||
func (target *KafkaTarget) send(eventData event.Event) error {
|
func (target *KafkaTarget) send(eventData event.Event) error {
|
||||||
if target.producer == nil {
|
if target.producer == nil {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
|
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -224,7 +230,7 @@ func (target *KafkaTarget) Send(eventKey string) error {
|
|||||||
if err != sarama.ErrOutOfBrokers {
|
if err != sarama.ErrOutOfBrokers {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -242,7 +248,7 @@ func (target *KafkaTarget) Send(eventKey string) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
// Sarama opens the ciruit breaker after 3 consecutive connection failures.
|
// Sarama opens the ciruit breaker after 3 consecutive connection failures.
|
||||||
if err == sarama.ErrLeaderNotAvailable || err.Error() == "circuit breaker is open" {
|
if err == sarama.ErrLeaderNotAvailable || err.Error() == "circuit breaker is open" {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -330,7 +336,7 @@ func (target *KafkaTarget) initKafka() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !yes {
|
if !yes {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -338,11 +344,11 @@ func (target *KafkaTarget) initKafka() error {
|
|||||||
|
|
||||||
// NewKafkaTarget - creates new Kafka target with auth credentials.
|
// NewKafkaTarget - creates new Kafka target with auth credentials.
|
||||||
func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*KafkaTarget, error) {
|
func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*KafkaTarget, error) {
|
||||||
var store Store
|
var queueStore store.Store[event.Event]
|
||||||
if args.QueueDir != "" {
|
if args.QueueDir != "" {
|
||||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-kafka-"+id)
|
queueDir := filepath.Join(args.QueueDir, storePrefix+"-kafka-"+id)
|
||||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
|
||||||
if err := store.Open(); err != nil {
|
if err := queueStore.Open(); err != nil {
|
||||||
return nil, fmt.Errorf("unable to initialize the queue store of Kafka `%s`: %w", id, err)
|
return nil, fmt.Errorf("unable to initialize the queue store of Kafka `%s`: %w", id, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -350,13 +356,13 @@ func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*Kafk
|
|||||||
target := &KafkaTarget{
|
target := &KafkaTarget{
|
||||||
id: event.TargetID{ID: id, Name: "kafka"},
|
id: event.TargetID{ID: id, Name: "kafka"},
|
||||||
args: args,
|
args: args,
|
||||||
store: store,
|
store: queueStore,
|
||||||
loggerOnce: loggerOnce,
|
loggerOnce: loggerOnce,
|
||||||
quitCh: make(chan struct{}),
|
quitCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
if target.store != nil {
|
if target.store != nil {
|
||||||
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
|
store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
|
||||||
}
|
}
|
||||||
|
|
||||||
return target, nil
|
return target, nil
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||||
"github.com/minio/minio/internal/event"
|
"github.com/minio/minio/internal/event"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
|
"github.com/minio/minio/internal/store"
|
||||||
xnet "github.com/minio/pkg/net"
|
xnet "github.com/minio/pkg/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -111,7 +112,7 @@ type MQTTTarget struct {
|
|||||||
id event.TargetID
|
id event.TargetID
|
||||||
args MQTTArgs
|
args MQTTArgs
|
||||||
client mqtt.Client
|
client mqtt.Client
|
||||||
store Store
|
store store.Store[event.Event]
|
||||||
quitCh chan struct{}
|
quitCh chan struct{}
|
||||||
loggerOnce logger.LogOnce
|
loggerOnce logger.LogOnce
|
||||||
}
|
}
|
||||||
@ -121,6 +122,11 @@ func (target *MQTTTarget) ID() event.TargetID {
|
|||||||
return target.id
|
return target.id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Name - returns the Name of the target.
|
||||||
|
func (target *MQTTTarget) Name() string {
|
||||||
|
return target.ID().String()
|
||||||
|
}
|
||||||
|
|
||||||
// Store returns any underlying store if set.
|
// Store returns any underlying store if set.
|
||||||
func (target *MQTTTarget) Store() event.TargetStore {
|
func (target *MQTTTarget) Store() event.TargetStore {
|
||||||
return target.store
|
return target.store
|
||||||
@ -136,7 +142,7 @@ func (target *MQTTTarget) IsActive() (bool, error) {
|
|||||||
|
|
||||||
func (target *MQTTTarget) isActive() (bool, error) {
|
func (target *MQTTTarget) isActive() (bool, error) {
|
||||||
if !target.client.IsConnectionOpen() {
|
if !target.client.IsConnectionOpen() {
|
||||||
return false, errNotConnected
|
return false, store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
@ -156,7 +162,7 @@ func (target *MQTTTarget) send(eventData event.Event) error {
|
|||||||
|
|
||||||
token := target.client.Publish(target.args.Topic, target.args.QoS, false, string(data))
|
token := target.client.Publish(target.args.Topic, target.args.QoS, false, string(data))
|
||||||
if !token.WaitTimeout(reconnectInterval) {
|
if !token.WaitTimeout(reconnectInterval) {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return token.Error()
|
return token.Error()
|
||||||
}
|
}
|
||||||
@ -245,7 +251,7 @@ func (target *MQTTTarget) initMQTT() error {
|
|||||||
token := target.client.Connect()
|
token := target.client.Connect()
|
||||||
ok := token.WaitTimeout(reconnectInterval)
|
ok := token.WaitTimeout(reconnectInterval)
|
||||||
if !ok {
|
if !ok {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
if token.Error() != nil {
|
if token.Error() != nil {
|
||||||
return token.Error()
|
return token.Error()
|
||||||
@ -256,7 +262,7 @@ func (target *MQTTTarget) initMQTT() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !yes {
|
if !yes {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -274,11 +280,11 @@ func NewMQTTTarget(id string, args MQTTArgs, loggerOnce logger.LogOnce) (*MQTTTa
|
|||||||
args.KeepAlive = 10 * time.Second
|
args.KeepAlive = 10 * time.Second
|
||||||
}
|
}
|
||||||
|
|
||||||
var store Store
|
var queueStore store.Store[event.Event]
|
||||||
if args.QueueDir != "" {
|
if args.QueueDir != "" {
|
||||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id)
|
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id)
|
||||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
|
||||||
if err := store.Open(); err != nil {
|
if err := queueStore.Open(); err != nil {
|
||||||
return nil, fmt.Errorf("unable to initialize the queue store of MQTT `%s`: %w", id, err)
|
return nil, fmt.Errorf("unable to initialize the queue store of MQTT `%s`: %w", id, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -286,13 +292,13 @@ func NewMQTTTarget(id string, args MQTTArgs, loggerOnce logger.LogOnce) (*MQTTTa
|
|||||||
target := &MQTTTarget{
|
target := &MQTTTarget{
|
||||||
id: event.TargetID{ID: id, Name: "mqtt"},
|
id: event.TargetID{ID: id, Name: "mqtt"},
|
||||||
args: args,
|
args: args,
|
||||||
store: store,
|
store: queueStore,
|
||||||
quitCh: make(chan struct{}),
|
quitCh: make(chan struct{}),
|
||||||
loggerOnce: loggerOnce,
|
loggerOnce: loggerOnce,
|
||||||
}
|
}
|
||||||
|
|
||||||
if target.store != nil {
|
if target.store != nil {
|
||||||
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
|
store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
|
||||||
}
|
}
|
||||||
|
|
||||||
return target, nil
|
return target, nil
|
||||||
|
@ -33,6 +33,7 @@ import (
|
|||||||
"github.com/go-sql-driver/mysql"
|
"github.com/go-sql-driver/mysql"
|
||||||
"github.com/minio/minio/internal/event"
|
"github.com/minio/minio/internal/event"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
|
"github.com/minio/minio/internal/store"
|
||||||
xnet "github.com/minio/pkg/net"
|
xnet "github.com/minio/pkg/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -153,7 +154,7 @@ type MySQLTarget struct {
|
|||||||
deleteStmt *sql.Stmt
|
deleteStmt *sql.Stmt
|
||||||
insertStmt *sql.Stmt
|
insertStmt *sql.Stmt
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
store Store
|
store store.Store[event.Event]
|
||||||
firstPing bool
|
firstPing bool
|
||||||
loggerOnce logger.LogOnce
|
loggerOnce logger.LogOnce
|
||||||
|
|
||||||
@ -165,6 +166,11 @@ func (target *MySQLTarget) ID() event.TargetID {
|
|||||||
return target.id
|
return target.id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Name - returns the Name of the target.
|
||||||
|
func (target *MySQLTarget) Name() string {
|
||||||
|
return target.ID().String()
|
||||||
|
}
|
||||||
|
|
||||||
// Store returns any underlying store if set.
|
// Store returns any underlying store if set.
|
||||||
func (target *MySQLTarget) Store() event.TargetStore {
|
func (target *MySQLTarget) Store() event.TargetStore {
|
||||||
return target.store
|
return target.store
|
||||||
@ -181,7 +187,7 @@ func (target *MySQLTarget) IsActive() (bool, error) {
|
|||||||
func (target *MySQLTarget) isActive() (bool, error) {
|
func (target *MySQLTarget) isActive() (bool, error) {
|
||||||
if err := target.db.Ping(); err != nil {
|
if err := target.db.Ping(); err != nil {
|
||||||
if IsConnErr(err) {
|
if IsConnErr(err) {
|
||||||
return false, errNotConnected
|
return false, store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
@ -260,7 +266,7 @@ func (target *MySQLTarget) Send(eventKey string) error {
|
|||||||
if !target.firstPing {
|
if !target.firstPing {
|
||||||
if err := target.executeStmts(); err != nil {
|
if err := target.executeStmts(); err != nil {
|
||||||
if IsConnErr(err) {
|
if IsConnErr(err) {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -278,7 +284,7 @@ func (target *MySQLTarget) Send(eventKey string) error {
|
|||||||
|
|
||||||
if err := target.send(eventData); err != nil {
|
if err := target.send(eventData); err != nil {
|
||||||
if IsConnErr(err) {
|
if IsConnErr(err) {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -363,7 +369,7 @@ func (target *MySQLTarget) initMySQL() error {
|
|||||||
|
|
||||||
err = target.db.Ping()
|
err = target.db.Ping()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
|
if !(xnet.IsConnRefusedErr(err) || xnet.IsConnResetErr(err)) {
|
||||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -384,7 +390,7 @@ func (target *MySQLTarget) initMySQL() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !yes {
|
if !yes {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -392,11 +398,11 @@ func (target *MySQLTarget) initMySQL() error {
|
|||||||
|
|
||||||
// NewMySQLTarget - creates new MySQL target.
|
// NewMySQLTarget - creates new MySQL target.
|
||||||
func NewMySQLTarget(id string, args MySQLArgs, loggerOnce logger.LogOnce) (*MySQLTarget, error) {
|
func NewMySQLTarget(id string, args MySQLArgs, loggerOnce logger.LogOnce) (*MySQLTarget, error) {
|
||||||
var store Store
|
var queueStore store.Store[event.Event]
|
||||||
if args.QueueDir != "" {
|
if args.QueueDir != "" {
|
||||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mysql-"+id)
|
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mysql-"+id)
|
||||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
|
||||||
if err := store.Open(); err != nil {
|
if err := queueStore.Open(); err != nil {
|
||||||
return nil, fmt.Errorf("unable to initialize the queue store of MySQL `%s`: %w", id, err)
|
return nil, fmt.Errorf("unable to initialize the queue store of MySQL `%s`: %w", id, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -419,13 +425,13 @@ func NewMySQLTarget(id string, args MySQLArgs, loggerOnce logger.LogOnce) (*MySQ
|
|||||||
id: event.TargetID{ID: id, Name: "mysql"},
|
id: event.TargetID{ID: id, Name: "mysql"},
|
||||||
args: args,
|
args: args,
|
||||||
firstPing: false,
|
firstPing: false,
|
||||||
store: store,
|
store: queueStore,
|
||||||
loggerOnce: loggerOnce,
|
loggerOnce: loggerOnce,
|
||||||
quitCh: make(chan struct{}),
|
quitCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
if target.store != nil {
|
if target.store != nil {
|
||||||
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
|
store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
|
||||||
}
|
}
|
||||||
|
|
||||||
return target, nil
|
return target, nil
|
||||||
|
@ -28,8 +28,10 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
"github.com/minio/minio/internal/event"
|
"github.com/minio/minio/internal/event"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
|
"github.com/minio/minio/internal/store"
|
||||||
xnet "github.com/minio/pkg/net"
|
xnet "github.com/minio/pkg/net"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
"github.com/nats-io/stan.go"
|
"github.com/nats-io/stan.go"
|
||||||
@ -198,10 +200,11 @@ func (n NATSArgs) connectStan() (stan.Conn, error) {
|
|||||||
addressURL = scheme + "://" + n.Address.String()
|
addressURL = scheme + "://" + n.Address.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
clientID, err := getNewUUID()
|
u, err := uuid.NewRandom()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
clientID := u.String()
|
||||||
|
|
||||||
connOpts := []stan.Option{stan.NatsURL(addressURL)}
|
connOpts := []stan.Option{stan.NatsURL(addressURL)}
|
||||||
if n.Streaming.MaxPubAcksInflight > 0 {
|
if n.Streaming.MaxPubAcksInflight > 0 {
|
||||||
@ -220,7 +223,7 @@ type NATSTarget struct {
|
|||||||
natsConn *nats.Conn
|
natsConn *nats.Conn
|
||||||
stanConn stan.Conn
|
stanConn stan.Conn
|
||||||
jstream nats.JetStream
|
jstream nats.JetStream
|
||||||
store Store
|
store store.Store[event.Event]
|
||||||
loggerOnce logger.LogOnce
|
loggerOnce logger.LogOnce
|
||||||
quitCh chan struct{}
|
quitCh chan struct{}
|
||||||
}
|
}
|
||||||
@ -230,6 +233,11 @@ func (target *NATSTarget) ID() event.TargetID {
|
|||||||
return target.id
|
return target.id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Name - returns the Name of the target.
|
||||||
|
func (target *NATSTarget) Name() string {
|
||||||
|
return target.ID().String()
|
||||||
|
}
|
||||||
|
|
||||||
// Store returns any underlying store if set.
|
// Store returns any underlying store if set.
|
||||||
func (target *NATSTarget) Store() event.TargetStore {
|
func (target *NATSTarget) Store() event.TargetStore {
|
||||||
return target.store
|
return target.store
|
||||||
@ -249,19 +257,19 @@ func (target *NATSTarget) isActive() (bool, error) {
|
|||||||
if target.stanConn == nil || target.stanConn.NatsConn() == nil {
|
if target.stanConn == nil || target.stanConn.NatsConn() == nil {
|
||||||
target.stanConn, connErr = target.args.connectStan()
|
target.stanConn, connErr = target.args.connectStan()
|
||||||
} else if !target.stanConn.NatsConn().IsConnected() {
|
} else if !target.stanConn.NatsConn().IsConnected() {
|
||||||
return false, errNotConnected
|
return false, store.ErrNotConnected
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if target.natsConn == nil {
|
if target.natsConn == nil {
|
||||||
target.natsConn, connErr = target.args.connectNats()
|
target.natsConn, connErr = target.args.connectNats()
|
||||||
} else if !target.natsConn.IsConnected() {
|
} else if !target.natsConn.IsConnected() {
|
||||||
return false, errNotConnected
|
return false, store.ErrNotConnected
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if connErr != nil {
|
if connErr != nil {
|
||||||
if connErr.Error() == nats.ErrNoServers.Error() {
|
if connErr.Error() == nats.ErrNoServers.Error() {
|
||||||
return false, errNotConnected
|
return false, store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return false, connErr
|
return false, connErr
|
||||||
}
|
}
|
||||||
@ -270,7 +278,7 @@ func (target *NATSTarget) isActive() (bool, error) {
|
|||||||
target.jstream, connErr = target.natsConn.JetStream()
|
target.jstream, connErr = target.natsConn.JetStream()
|
||||||
if connErr != nil {
|
if connErr != nil {
|
||||||
if connErr.Error() == nats.ErrNoServers.Error() {
|
if connErr.Error() == nats.ErrNoServers.Error() {
|
||||||
return false, errNotConnected
|
return false, store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return false, connErr
|
return false, connErr
|
||||||
}
|
}
|
||||||
@ -412,7 +420,7 @@ func (target *NATSTarget) initNATS() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !yes {
|
if !yes {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -420,11 +428,11 @@ func (target *NATSTarget) initNATS() error {
|
|||||||
|
|
||||||
// NewNATSTarget - creates new NATS target.
|
// NewNATSTarget - creates new NATS target.
|
||||||
func NewNATSTarget(id string, args NATSArgs, loggerOnce logger.LogOnce) (*NATSTarget, error) {
|
func NewNATSTarget(id string, args NATSArgs, loggerOnce logger.LogOnce) (*NATSTarget, error) {
|
||||||
var store Store
|
var queueStore store.Store[event.Event]
|
||||||
if args.QueueDir != "" {
|
if args.QueueDir != "" {
|
||||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-nats-"+id)
|
queueDir := filepath.Join(args.QueueDir, storePrefix+"-nats-"+id)
|
||||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
|
||||||
if err := store.Open(); err != nil {
|
if err := queueStore.Open(); err != nil {
|
||||||
return nil, fmt.Errorf("unable to initialize the queue store of NATS `%s`: %w", id, err)
|
return nil, fmt.Errorf("unable to initialize the queue store of NATS `%s`: %w", id, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -433,12 +441,12 @@ func NewNATSTarget(id string, args NATSArgs, loggerOnce logger.LogOnce) (*NATSTa
|
|||||||
id: event.TargetID{ID: id, Name: "nats"},
|
id: event.TargetID{ID: id, Name: "nats"},
|
||||||
args: args,
|
args: args,
|
||||||
loggerOnce: loggerOnce,
|
loggerOnce: loggerOnce,
|
||||||
store: store,
|
store: queueStore,
|
||||||
quitCh: make(chan struct{}),
|
quitCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
if target.store != nil {
|
if target.store != nil {
|
||||||
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
|
store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
|
||||||
}
|
}
|
||||||
|
|
||||||
return target, nil
|
return target, nil
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
|
|
||||||
"github.com/minio/minio/internal/event"
|
"github.com/minio/minio/internal/event"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
|
"github.com/minio/minio/internal/store"
|
||||||
xnet "github.com/minio/pkg/net"
|
xnet "github.com/minio/pkg/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -94,7 +95,7 @@ type NSQTarget struct {
|
|||||||
id event.TargetID
|
id event.TargetID
|
||||||
args NSQArgs
|
args NSQArgs
|
||||||
producer *nsq.Producer
|
producer *nsq.Producer
|
||||||
store Store
|
store store.Store[event.Event]
|
||||||
config *nsq.Config
|
config *nsq.Config
|
||||||
loggerOnce logger.LogOnce
|
loggerOnce logger.LogOnce
|
||||||
quitCh chan struct{}
|
quitCh chan struct{}
|
||||||
@ -105,6 +106,11 @@ func (target *NSQTarget) ID() event.TargetID {
|
|||||||
return target.id
|
return target.id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Name - returns the Name of the target.
|
||||||
|
func (target *NSQTarget) Name() string {
|
||||||
|
return target.ID().String()
|
||||||
|
}
|
||||||
|
|
||||||
// Store returns any underlying store if set.
|
// Store returns any underlying store if set.
|
||||||
func (target *NSQTarget) Store() event.TargetStore {
|
func (target *NSQTarget) Store() event.TargetStore {
|
||||||
return target.store
|
return target.store
|
||||||
@ -129,8 +135,8 @@ func (target *NSQTarget) isActive() (bool, error) {
|
|||||||
|
|
||||||
if err := target.producer.Ping(); err != nil {
|
if err := target.producer.Ping(); err != nil {
|
||||||
// To treat "connection refused" errors as errNotConnected.
|
// To treat "connection refused" errors as errNotConnected.
|
||||||
if IsConnRefusedErr(err) {
|
if xnet.IsConnRefusedErr(err) {
|
||||||
return false, errNotConnected
|
return false, store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
@ -234,7 +240,7 @@ func (target *NSQTarget) initNSQ() error {
|
|||||||
err = target.producer.Ping()
|
err = target.producer.Ping()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// To treat "connection refused" errors as errNotConnected.
|
// To treat "connection refused" errors as errNotConnected.
|
||||||
if !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
|
if !(xnet.IsConnRefusedErr(err) || xnet.IsConnResetErr(err)) {
|
||||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||||
}
|
}
|
||||||
target.producer.Stop()
|
target.producer.Stop()
|
||||||
@ -246,7 +252,7 @@ func (target *NSQTarget) initNSQ() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !yes {
|
if !yes {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -254,11 +260,11 @@ func (target *NSQTarget) initNSQ() error {
|
|||||||
|
|
||||||
// NewNSQTarget - creates new NSQ target.
|
// NewNSQTarget - creates new NSQ target.
|
||||||
func NewNSQTarget(id string, args NSQArgs, loggerOnce logger.LogOnce) (*NSQTarget, error) {
|
func NewNSQTarget(id string, args NSQArgs, loggerOnce logger.LogOnce) (*NSQTarget, error) {
|
||||||
var store Store
|
var queueStore store.Store[event.Event]
|
||||||
if args.QueueDir != "" {
|
if args.QueueDir != "" {
|
||||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-nsq-"+id)
|
queueDir := filepath.Join(args.QueueDir, storePrefix+"-nsq-"+id)
|
||||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
|
||||||
if err := store.Open(); err != nil {
|
if err := queueStore.Open(); err != nil {
|
||||||
return nil, fmt.Errorf("unable to initialize the queue store of NSQ `%s`: %w", id, err)
|
return nil, fmt.Errorf("unable to initialize the queue store of NSQ `%s`: %w", id, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -267,12 +273,12 @@ func NewNSQTarget(id string, args NSQArgs, loggerOnce logger.LogOnce) (*NSQTarge
|
|||||||
id: event.TargetID{ID: id, Name: "nsq"},
|
id: event.TargetID{ID: id, Name: "nsq"},
|
||||||
args: args,
|
args: args,
|
||||||
loggerOnce: loggerOnce,
|
loggerOnce: loggerOnce,
|
||||||
store: store,
|
store: queueStore,
|
||||||
quitCh: make(chan struct{}),
|
quitCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
if target.store != nil {
|
if target.store != nil {
|
||||||
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
|
store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
|
||||||
}
|
}
|
||||||
|
|
||||||
return target, nil
|
return target, nil
|
||||||
|
@ -34,6 +34,7 @@ import (
|
|||||||
|
|
||||||
"github.com/minio/minio/internal/event"
|
"github.com/minio/minio/internal/event"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
|
"github.com/minio/minio/internal/store"
|
||||||
xnet "github.com/minio/pkg/net"
|
xnet "github.com/minio/pkg/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -145,7 +146,7 @@ type PostgreSQLTarget struct {
|
|||||||
deleteStmt *sql.Stmt
|
deleteStmt *sql.Stmt
|
||||||
insertStmt *sql.Stmt
|
insertStmt *sql.Stmt
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
store Store
|
store store.Store[event.Event]
|
||||||
firstPing bool
|
firstPing bool
|
||||||
connString string
|
connString string
|
||||||
loggerOnce logger.LogOnce
|
loggerOnce logger.LogOnce
|
||||||
@ -157,6 +158,11 @@ func (target *PostgreSQLTarget) ID() event.TargetID {
|
|||||||
return target.id
|
return target.id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Name - returns the Name of the target.
|
||||||
|
func (target *PostgreSQLTarget) Name() string {
|
||||||
|
return target.ID().String()
|
||||||
|
}
|
||||||
|
|
||||||
// Store returns any underlying store if set.
|
// Store returns any underlying store if set.
|
||||||
func (target *PostgreSQLTarget) Store() event.TargetStore {
|
func (target *PostgreSQLTarget) Store() event.TargetStore {
|
||||||
return target.store
|
return target.store
|
||||||
@ -173,7 +179,7 @@ func (target *PostgreSQLTarget) IsActive() (bool, error) {
|
|||||||
func (target *PostgreSQLTarget) isActive() (bool, error) {
|
func (target *PostgreSQLTarget) isActive() (bool, error) {
|
||||||
if err := target.db.Ping(); err != nil {
|
if err := target.db.Ping(); err != nil {
|
||||||
if IsConnErr(err) {
|
if IsConnErr(err) {
|
||||||
return false, errNotConnected
|
return false, store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
@ -198,7 +204,7 @@ func (target *PostgreSQLTarget) Save(eventData event.Event) error {
|
|||||||
|
|
||||||
// IsConnErr - To detect a connection error.
|
// IsConnErr - To detect a connection error.
|
||||||
func IsConnErr(err error) bool {
|
func IsConnErr(err error) bool {
|
||||||
return IsConnRefusedErr(err) || err.Error() == "sql: database is closed" || err.Error() == "sql: statement is closed" || err.Error() == "invalid connection"
|
return xnet.IsConnRefusedErr(err) || err.Error() == "sql: database is closed" || err.Error() == "sql: statement is closed" || err.Error() == "invalid connection"
|
||||||
}
|
}
|
||||||
|
|
||||||
// send - sends an event to the PostgreSQL.
|
// send - sends an event to the PostgreSQL.
|
||||||
@ -255,7 +261,7 @@ func (target *PostgreSQLTarget) Send(eventKey string) error {
|
|||||||
if !target.firstPing {
|
if !target.firstPing {
|
||||||
if err := target.executeStmts(); err != nil {
|
if err := target.executeStmts(); err != nil {
|
||||||
if IsConnErr(err) {
|
if IsConnErr(err) {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -273,7 +279,7 @@ func (target *PostgreSQLTarget) Send(eventKey string) error {
|
|||||||
|
|
||||||
if err := target.send(eventData); err != nil {
|
if err := target.send(eventData); err != nil {
|
||||||
if IsConnErr(err) {
|
if IsConnErr(err) {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -357,7 +363,7 @@ func (target *PostgreSQLTarget) initPostgreSQL() error {
|
|||||||
|
|
||||||
err = target.db.Ping()
|
err = target.db.Ping()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
|
if !(xnet.IsConnRefusedErr(err) || xnet.IsConnResetErr(err)) {
|
||||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -378,7 +384,7 @@ func (target *PostgreSQLTarget) initPostgreSQL() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !yes {
|
if !yes {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -407,11 +413,11 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, loggerOnce logger.LogOn
|
|||||||
}
|
}
|
||||||
connStr := strings.Join(params, " ")
|
connStr := strings.Join(params, " ")
|
||||||
|
|
||||||
var store Store
|
var queueStore store.Store[event.Event]
|
||||||
if args.QueueDir != "" {
|
if args.QueueDir != "" {
|
||||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-postgresql-"+id)
|
queueDir := filepath.Join(args.QueueDir, storePrefix+"-postgresql-"+id)
|
||||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
|
||||||
if err := store.Open(); err != nil {
|
if err := queueStore.Open(); err != nil {
|
||||||
return nil, fmt.Errorf("unable to initialize the queue store of PostgreSQL `%s`: %w", id, err)
|
return nil, fmt.Errorf("unable to initialize the queue store of PostgreSQL `%s`: %w", id, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -420,14 +426,14 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, loggerOnce logger.LogOn
|
|||||||
id: event.TargetID{ID: id, Name: "postgresql"},
|
id: event.TargetID{ID: id, Name: "postgresql"},
|
||||||
args: args,
|
args: args,
|
||||||
firstPing: false,
|
firstPing: false,
|
||||||
store: store,
|
store: queueStore,
|
||||||
connString: connStr,
|
connString: connStr,
|
||||||
loggerOnce: loggerOnce,
|
loggerOnce: loggerOnce,
|
||||||
quitCh: make(chan struct{}),
|
quitCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
if target.store != nil {
|
if target.store != nil {
|
||||||
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
|
store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
|
||||||
}
|
}
|
||||||
|
|
||||||
return target, nil
|
return target, nil
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
"github.com/gomodule/redigo/redis"
|
"github.com/gomodule/redigo/redis"
|
||||||
"github.com/minio/minio/internal/event"
|
"github.com/minio/minio/internal/event"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
|
"github.com/minio/minio/internal/store"
|
||||||
xnet "github.com/minio/pkg/net"
|
xnet "github.com/minio/pkg/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -122,7 +123,7 @@ type RedisTarget struct {
|
|||||||
id event.TargetID
|
id event.TargetID
|
||||||
args RedisArgs
|
args RedisArgs
|
||||||
pool *redis.Pool
|
pool *redis.Pool
|
||||||
store Store
|
store store.Store[event.Event]
|
||||||
firstPing bool
|
firstPing bool
|
||||||
loggerOnce logger.LogOnce
|
loggerOnce logger.LogOnce
|
||||||
quitCh chan struct{}
|
quitCh chan struct{}
|
||||||
@ -133,6 +134,11 @@ func (target *RedisTarget) ID() event.TargetID {
|
|||||||
return target.id
|
return target.id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Name - returns the Name of the target.
|
||||||
|
func (target *RedisTarget) Name() string {
|
||||||
|
return target.ID().String()
|
||||||
|
}
|
||||||
|
|
||||||
// Store returns any underlying store if set.
|
// Store returns any underlying store if set.
|
||||||
func (target *RedisTarget) Store() event.TargetStore {
|
func (target *RedisTarget) Store() event.TargetStore {
|
||||||
return target.store
|
return target.store
|
||||||
@ -152,8 +158,8 @@ func (target *RedisTarget) isActive() (bool, error) {
|
|||||||
|
|
||||||
_, pingErr := conn.Do("PING")
|
_, pingErr := conn.Do("PING")
|
||||||
if pingErr != nil {
|
if pingErr != nil {
|
||||||
if IsConnRefusedErr(pingErr) {
|
if xnet.IsConnRefusedErr(pingErr) {
|
||||||
return false, errNotConnected
|
return false, store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return false, pingErr
|
return false, pingErr
|
||||||
}
|
}
|
||||||
@ -227,16 +233,16 @@ func (target *RedisTarget) Send(eventKey string) error {
|
|||||||
|
|
||||||
_, pingErr := conn.Do("PING")
|
_, pingErr := conn.Do("PING")
|
||||||
if pingErr != nil {
|
if pingErr != nil {
|
||||||
if IsConnRefusedErr(pingErr) {
|
if xnet.IsConnRefusedErr(pingErr) {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return pingErr
|
return pingErr
|
||||||
}
|
}
|
||||||
|
|
||||||
if !target.firstPing {
|
if !target.firstPing {
|
||||||
if err := target.args.validateFormat(conn); err != nil {
|
if err := target.args.validateFormat(conn); err != nil {
|
||||||
if IsConnRefusedErr(err) {
|
if xnet.IsConnRefusedErr(err) {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -254,8 +260,8 @@ func (target *RedisTarget) Send(eventKey string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := target.send(eventData); err != nil {
|
if err := target.send(eventData); err != nil {
|
||||||
if IsConnRefusedErr(err) {
|
if xnet.IsConnRefusedErr(err) {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -280,7 +286,7 @@ func (target *RedisTarget) initRedis() error {
|
|||||||
|
|
||||||
_, pingErr := conn.Do("PING")
|
_, pingErr := conn.Do("PING")
|
||||||
if pingErr != nil {
|
if pingErr != nil {
|
||||||
if !(IsConnRefusedErr(pingErr) || IsConnResetErr(pingErr)) {
|
if !(xnet.IsConnRefusedErr(pingErr) || xnet.IsConnResetErr(pingErr)) {
|
||||||
target.loggerOnce(context.Background(), pingErr, target.ID().String())
|
target.loggerOnce(context.Background(), pingErr, target.ID().String())
|
||||||
}
|
}
|
||||||
return pingErr
|
return pingErr
|
||||||
@ -298,7 +304,7 @@ func (target *RedisTarget) initRedis() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !yes {
|
if !yes {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -306,11 +312,11 @@ func (target *RedisTarget) initRedis() error {
|
|||||||
|
|
||||||
// NewRedisTarget - creates new Redis target.
|
// NewRedisTarget - creates new Redis target.
|
||||||
func NewRedisTarget(id string, args RedisArgs, loggerOnce logger.LogOnce) (*RedisTarget, error) {
|
func NewRedisTarget(id string, args RedisArgs, loggerOnce logger.LogOnce) (*RedisTarget, error) {
|
||||||
var store Store
|
var queueStore store.Store[event.Event]
|
||||||
if args.QueueDir != "" {
|
if args.QueueDir != "" {
|
||||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-redis-"+id)
|
queueDir := filepath.Join(args.QueueDir, storePrefix+"-redis-"+id)
|
||||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
|
||||||
if err := store.Open(); err != nil {
|
if err := queueStore.Open(); err != nil {
|
||||||
return nil, fmt.Errorf("unable to initialize the queue store of Redis `%s`: %w", id, err)
|
return nil, fmt.Errorf("unable to initialize the queue store of Redis `%s`: %w", id, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -349,13 +355,13 @@ func NewRedisTarget(id string, args RedisArgs, loggerOnce logger.LogOnce) (*Redi
|
|||||||
id: event.TargetID{ID: id, Name: "redis"},
|
id: event.TargetID{ID: id, Name: "redis"},
|
||||||
args: args,
|
args: args,
|
||||||
pool: pool,
|
pool: pool,
|
||||||
store: store,
|
store: queueStore,
|
||||||
loggerOnce: loggerOnce,
|
loggerOnce: loggerOnce,
|
||||||
quitCh: make(chan struct{}),
|
quitCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
if target.store != nil {
|
if target.store != nil {
|
||||||
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
|
store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
|
||||||
}
|
}
|
||||||
|
|
||||||
return target, nil
|
return target, nil
|
||||||
|
@ -1,153 +0,0 @@
|
|||||||
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
||||||
//
|
|
||||||
// This file is part of MinIO Object Storage stack
|
|
||||||
//
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// This program is distributed in the hope that it will be useful
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package target
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/minio/minio/internal/event"
|
|
||||||
"github.com/minio/minio/internal/logger"
|
|
||||||
)
|
|
||||||
|
|
||||||
const retryInterval = 3 * time.Second
|
|
||||||
|
|
||||||
// errNotConnected - indicates that the target connection is not active.
|
|
||||||
var errNotConnected = errors.New("not connected to target server/service")
|
|
||||||
|
|
||||||
// errLimitExceeded error is sent when the maximum limit is reached.
|
|
||||||
var errLimitExceeded = errors.New("the maximum store limit reached")
|
|
||||||
|
|
||||||
// Store - To persist the events.
|
|
||||||
type Store interface {
|
|
||||||
Put(event event.Event) error
|
|
||||||
Get(key string) (event.Event, error)
|
|
||||||
Len() int
|
|
||||||
List() ([]string, error)
|
|
||||||
Del(key string) error
|
|
||||||
Open() error
|
|
||||||
}
|
|
||||||
|
|
||||||
// replayEvents - Reads the events from the store and replays.
|
|
||||||
func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce logger.LogOnce, id string) <-chan string {
|
|
||||||
eventKeyCh := make(chan string)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer close(eventKeyCh)
|
|
||||||
|
|
||||||
retryTicker := time.NewTicker(retryInterval)
|
|
||||||
defer retryTicker.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
names, err := store.List()
|
|
||||||
if err != nil {
|
|
||||||
loggerOnce(context.Background(), fmt.Errorf("eventStore.List() failed with: %w", err), id)
|
|
||||||
} else {
|
|
||||||
for _, name := range names {
|
|
||||||
select {
|
|
||||||
case eventKeyCh <- strings.TrimSuffix(name, eventExt):
|
|
||||||
// Get next key.
|
|
||||||
case <-doneCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-retryTicker.C:
|
|
||||||
case <-doneCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return eventKeyCh
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsConnRefusedErr - To check fot "connection refused" error.
|
|
||||||
func IsConnRefusedErr(err error) bool {
|
|
||||||
return errors.Is(err, syscall.ECONNREFUSED)
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsConnResetErr - Checks for connection reset errors.
|
|
||||||
func IsConnResetErr(err error) bool {
|
|
||||||
if strings.Contains(err.Error(), "connection reset by peer") {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
// incase if error message is wrapped.
|
|
||||||
return errors.Is(err, syscall.ECONNRESET)
|
|
||||||
}
|
|
||||||
|
|
||||||
// sendEvents - Reads events from the store and re-plays.
|
|
||||||
func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}, loggerOnce logger.LogOnce) {
|
|
||||||
retryTicker := time.NewTicker(retryInterval)
|
|
||||||
defer retryTicker.Stop()
|
|
||||||
|
|
||||||
send := func(eventKey string) bool {
|
|
||||||
for {
|
|
||||||
err := target.Send(eventKey)
|
|
||||||
if err == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != errNotConnected && !IsConnResetErr(err) {
|
|
||||||
loggerOnce(context.Background(),
|
|
||||||
fmt.Errorf("target.Send() failed with '%w'", err),
|
|
||||||
target.ID().String())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Retrying after 3secs back-off
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-retryTicker.C:
|
|
||||||
case <-doneCh:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case eventKey, ok := <-eventKeyCh:
|
|
||||||
if !ok {
|
|
||||||
// closed channel.
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if !send(eventKey) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-doneCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func streamEventsFromStore(store Store, target event.Target, doneCh <-chan struct{}, loggerOnce logger.LogOnce) {
|
|
||||||
go func() {
|
|
||||||
// Replays the events from the store.
|
|
||||||
eventKeyCh := replayEvents(store, doneCh, loggerOnce, target.ID().String())
|
|
||||||
// Send events from the store.
|
|
||||||
sendEvents(target, eventKeyCh, doneCh, loggerOnce)
|
|
||||||
}()
|
|
||||||
}
|
|
@ -35,6 +35,7 @@ import (
|
|||||||
|
|
||||||
"github.com/minio/minio/internal/event"
|
"github.com/minio/minio/internal/event"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
|
"github.com/minio/minio/internal/store"
|
||||||
"github.com/minio/pkg/certs"
|
"github.com/minio/pkg/certs"
|
||||||
xnet "github.com/minio/pkg/net"
|
xnet "github.com/minio/pkg/net"
|
||||||
)
|
)
|
||||||
@ -96,7 +97,7 @@ type WebhookTarget struct {
|
|||||||
args WebhookArgs
|
args WebhookArgs
|
||||||
transport *http.Transport
|
transport *http.Transport
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
store Store
|
store store.Store[event.Event]
|
||||||
loggerOnce logger.LogOnce
|
loggerOnce logger.LogOnce
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
cancelCh <-chan struct{}
|
cancelCh <-chan struct{}
|
||||||
@ -107,6 +108,11 @@ func (target *WebhookTarget) ID() event.TargetID {
|
|||||||
return target.id
|
return target.id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Name - returns the Name of the target.
|
||||||
|
func (target *WebhookTarget) Name() string {
|
||||||
|
return target.ID().String()
|
||||||
|
}
|
||||||
|
|
||||||
// IsActive - Return true if target is up and active
|
// IsActive - Return true if target is up and active
|
||||||
func (target *WebhookTarget) IsActive() (bool, error) {
|
func (target *WebhookTarget) IsActive() (bool, error) {
|
||||||
if err := target.init(); err != nil {
|
if err := target.init(); err != nil {
|
||||||
@ -127,7 +133,7 @@ func (target *WebhookTarget) isActive() (bool, error) {
|
|||||||
req, err := http.NewRequestWithContext(ctx, http.MethodHead, target.args.Endpoint.String(), nil)
|
req, err := http.NewRequestWithContext(ctx, http.MethodHead, target.args.Endpoint.String(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if xnet.IsNetworkOrHostDown(err, false) {
|
if xnet.IsNetworkOrHostDown(err, false) {
|
||||||
return false, errNotConnected
|
return false, store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
@ -142,7 +148,7 @@ func (target *WebhookTarget) isActive() (bool, error) {
|
|||||||
resp, err := target.httpClient.Do(req)
|
resp, err := target.httpClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if xnet.IsNetworkOrHostDown(err, true) {
|
if xnet.IsNetworkOrHostDown(err, true) {
|
||||||
return false, errNotConnected
|
return false, store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
@ -165,7 +171,7 @@ func (target *WebhookTarget) Save(eventData event.Event) error {
|
|||||||
err := target.send(eventData)
|
err := target.send(eventData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if xnet.IsNetworkOrHostDown(err, false) {
|
if xnet.IsNetworkOrHostDown(err, false) {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
@ -235,7 +241,7 @@ func (target *WebhookTarget) Send(eventKey string) error {
|
|||||||
|
|
||||||
if err := target.send(eventData); err != nil {
|
if err := target.send(eventData); err != nil {
|
||||||
if xnet.IsNetworkOrHostDown(err, false) {
|
if xnet.IsNetworkOrHostDown(err, false) {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -274,7 +280,7 @@ func (target *WebhookTarget) initWebhook() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !yes {
|
if !yes {
|
||||||
return errNotConnected
|
return store.ErrNotConnected
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -284,11 +290,11 @@ func (target *WebhookTarget) initWebhook() error {
|
|||||||
func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOnce logger.LogOnce, transport *http.Transport) (*WebhookTarget, error) {
|
func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOnce logger.LogOnce, transport *http.Transport) (*WebhookTarget, error) {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
var store Store
|
var queueStore store.Store[event.Event]
|
||||||
if args.QueueDir != "" {
|
if args.QueueDir != "" {
|
||||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-webhook-"+id)
|
queueDir := filepath.Join(args.QueueDir, storePrefix+"-webhook-"+id)
|
||||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
|
||||||
if err := store.Open(); err != nil {
|
if err := queueStore.Open(); err != nil {
|
||||||
cancel()
|
cancel()
|
||||||
return nil, fmt.Errorf("unable to initialize the queue store of Webhook `%s`: %w", id, err)
|
return nil, fmt.Errorf("unable to initialize the queue store of Webhook `%s`: %w", id, err)
|
||||||
}
|
}
|
||||||
@ -299,13 +305,13 @@ func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOn
|
|||||||
args: args,
|
args: args,
|
||||||
loggerOnce: loggerOnce,
|
loggerOnce: loggerOnce,
|
||||||
transport: transport,
|
transport: transport,
|
||||||
store: store,
|
store: queueStore,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
cancelCh: ctx.Done(),
|
cancelCh: ctx.Done(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if target.store != nil {
|
if target.store != nil {
|
||||||
streamEventsFromStore(target.store, target, target.cancelCh, target.loggerOnce)
|
store.StreamItems(target.store, target, target.cancelCh, target.loggerOnce)
|
||||||
}
|
}
|
||||||
|
|
||||||
return target, nil
|
return target, nil
|
||||||
|
@ -15,10 +15,11 @@
|
|||||||
// You should have received a copy of the GNU Affero General Public License
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
package target
|
package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
@ -26,38 +27,47 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/minio/minio/internal/event"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
defaultLimit = 100000 // Default store limit.
|
defaultLimit = 100000 // Default store limit.
|
||||||
eventExt = ".event"
|
defaultExt = ".unknown"
|
||||||
)
|
)
|
||||||
|
|
||||||
// QueueStore - Filestore for persisting events.
|
// errLimitExceeded error is sent when the maximum limit is reached.
|
||||||
type QueueStore struct {
|
var errLimitExceeded = errors.New("the maximum store limit reached")
|
||||||
|
|
||||||
|
// QueueStore - Filestore for persisting items.
|
||||||
|
type QueueStore[_ any] struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
entryLimit uint64
|
entryLimit uint64
|
||||||
directory string
|
directory string
|
||||||
|
fileExt string
|
||||||
|
|
||||||
entries map[string]int64 // key -> modtime as unix nano
|
entries map[string]int64 // key -> modtime as unix nano
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewQueueStore - Creates an instance for QueueStore.
|
// NewQueueStore - Creates an instance for QueueStore.
|
||||||
func NewQueueStore(directory string, limit uint64) Store {
|
func NewQueueStore[I any](directory string, limit uint64, ext string) *QueueStore[I] {
|
||||||
if limit == 0 {
|
if limit == 0 {
|
||||||
limit = defaultLimit
|
limit = defaultLimit
|
||||||
}
|
}
|
||||||
|
|
||||||
return &QueueStore{
|
if ext == "" {
|
||||||
|
ext = defaultExt
|
||||||
|
}
|
||||||
|
|
||||||
|
return &QueueStore[I]{
|
||||||
directory: directory,
|
directory: directory,
|
||||||
entryLimit: limit,
|
entryLimit: limit,
|
||||||
|
fileExt: ext,
|
||||||
entries: make(map[string]int64, limit),
|
entries: make(map[string]int64, limit),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open - Creates the directory if not present.
|
// Open - Creates the directory if not present.
|
||||||
func (store *QueueStore) Open() error {
|
func (store *QueueStore[_]) Open() error {
|
||||||
store.Lock()
|
store.Lock()
|
||||||
defer store.Unlock()
|
defer store.Unlock()
|
||||||
|
|
||||||
@ -78,7 +88,7 @@ func (store *QueueStore) Open() error {
|
|||||||
if file.IsDir() {
|
if file.IsDir() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
key := strings.TrimSuffix(file.Name(), eventExt)
|
key := strings.TrimSuffix(file.Name(), store.fileExt)
|
||||||
if fi, err := file.Info(); err == nil {
|
if fi, err := file.Info(); err == nil {
|
||||||
store.entries[key] = fi.ModTime().UnixNano()
|
store.entries[key] = fi.ModTime().UnixNano()
|
||||||
}
|
}
|
||||||
@ -87,44 +97,45 @@ func (store *QueueStore) Open() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// write - writes event to the directory.
|
// write - writes an item to the directory.
|
||||||
func (store *QueueStore) write(key string, e event.Event) error {
|
func (store *QueueStore[I]) write(key string, item I) error {
|
||||||
// Marshalls the event.
|
// Marshalls the item.
|
||||||
eventData, err := json.Marshal(e)
|
eventData, err := json.Marshal(item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
path := filepath.Join(store.directory, key+eventExt)
|
path := filepath.Join(store.directory, key+store.fileExt)
|
||||||
if err := os.WriteFile(path, eventData, os.FileMode(0o770)); err != nil {
|
if err := os.WriteFile(path, eventData, os.FileMode(0o770)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Increment the event count.
|
// Increment the item count.
|
||||||
store.entries[key] = time.Now().UnixNano()
|
store.entries[key] = time.Now().UnixNano()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put - puts a event to the store.
|
// Put - puts an item to the store.
|
||||||
func (store *QueueStore) Put(e event.Event) error {
|
func (store *QueueStore[I]) Put(item I) error {
|
||||||
store.Lock()
|
store.Lock()
|
||||||
defer store.Unlock()
|
defer store.Unlock()
|
||||||
if uint64(len(store.entries)) >= store.entryLimit {
|
if uint64(len(store.entries)) >= store.entryLimit {
|
||||||
return errLimitExceeded
|
return errLimitExceeded
|
||||||
}
|
}
|
||||||
key, err := getNewUUID()
|
// Generate a new UUID for the key.
|
||||||
|
key, err := uuid.NewRandom()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return store.write(key, e)
|
return store.write(key.String(), item)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get - gets a event from the store.
|
// Get - gets an item from the store.
|
||||||
func (store *QueueStore) Get(key string) (event event.Event, err error) {
|
func (store *QueueStore[I]) Get(key string) (item I, err error) {
|
||||||
store.RLock()
|
store.RLock()
|
||||||
|
|
||||||
defer func(store *QueueStore) {
|
defer func(store *QueueStore[I]) {
|
||||||
store.RUnlock()
|
store.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Upon error we remove the entry.
|
// Upon error we remove the entry.
|
||||||
@ -133,31 +144,31 @@ func (store *QueueStore) Get(key string) (event event.Event, err error) {
|
|||||||
}(store)
|
}(store)
|
||||||
|
|
||||||
var eventData []byte
|
var eventData []byte
|
||||||
eventData, err = os.ReadFile(filepath.Join(store.directory, key+eventExt))
|
eventData, err = os.ReadFile(filepath.Join(store.directory, key+store.fileExt))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return event, err
|
return item, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(eventData) == 0 {
|
if len(eventData) == 0 {
|
||||||
return event, os.ErrNotExist
|
return item, os.ErrNotExist
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = json.Unmarshal(eventData, &event); err != nil {
|
if err = json.Unmarshal(eventData, &item); err != nil {
|
||||||
return event, err
|
return item, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return event, nil
|
return item, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Del - Deletes an entry from the store.
|
// Del - Deletes an entry from the store.
|
||||||
func (store *QueueStore) Del(key string) error {
|
func (store *QueueStore[_]) Del(key string) error {
|
||||||
store.Lock()
|
store.Lock()
|
||||||
defer store.Unlock()
|
defer store.Unlock()
|
||||||
return store.del(key)
|
return store.del(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Len returns the entry count.
|
// Len returns the entry count.
|
||||||
func (store *QueueStore) Len() int {
|
func (store *QueueStore[_]) Len() int {
|
||||||
store.RLock()
|
store.RLock()
|
||||||
l := len(store.entries)
|
l := len(store.entries)
|
||||||
defer store.RUnlock()
|
defer store.RUnlock()
|
||||||
@ -165,8 +176,8 @@ func (store *QueueStore) Len() int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// lockless call
|
// lockless call
|
||||||
func (store *QueueStore) del(key string) error {
|
func (store *QueueStore[_]) del(key string) error {
|
||||||
err := os.Remove(filepath.Join(store.directory, key+eventExt))
|
err := os.Remove(filepath.Join(store.directory, key+store.fileExt))
|
||||||
|
|
||||||
// Delete as entry no matter the result
|
// Delete as entry no matter the result
|
||||||
delete(store.entries, key)
|
delete(store.entries, key)
|
||||||
@ -175,7 +186,7 @@ func (store *QueueStore) del(key string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// List - lists all files registered in the store.
|
// List - lists all files registered in the store.
|
||||||
func (store *QueueStore) List() ([]string, error) {
|
func (store *QueueStore[_]) List() ([]string, error) {
|
||||||
store.RLock()
|
store.RLock()
|
||||||
l := make([]string, 0, len(store.entries))
|
l := make([]string, 0, len(store.entries))
|
||||||
for k := range store.entries {
|
for k := range store.entries {
|
||||||
@ -194,7 +205,7 @@ func (store *QueueStore) List() ([]string, error) {
|
|||||||
// list will read all entries from disk.
|
// list will read all entries from disk.
|
||||||
// Entries are returned sorted by modtime, oldest first.
|
// Entries are returned sorted by modtime, oldest first.
|
||||||
// Underlying entry list in store is *not* updated.
|
// Underlying entry list in store is *not* updated.
|
||||||
func (store *QueueStore) list() ([]os.DirEntry, error) {
|
func (store *QueueStore[_]) list() ([]os.DirEntry, error) {
|
||||||
files, err := os.ReadDir(store.directory)
|
files, err := os.ReadDir(store.directory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -215,3 +226,9 @@ func (store *QueueStore) list() ([]os.DirEntry, error) {
|
|||||||
|
|
||||||
return files, nil
|
return files, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Extension will return the file extension used
|
||||||
|
// for the items stored in the queue.
|
||||||
|
func (store *QueueStore[_]) Extension() string {
|
||||||
|
return store.fileExt
|
||||||
|
}
|
@ -15,7 +15,7 @@
|
|||||||
// You should have received a copy of the GNU Affero General Public License
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
package target
|
package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
@ -23,48 +23,54 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/minio/minio/internal/event"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type TestItem struct {
|
||||||
|
Name string `json:"Name"`
|
||||||
|
Property string `json:"property"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
// TestDir
|
// TestDir
|
||||||
var queueDir = filepath.Join(os.TempDir(), "minio_test")
|
queueDir = filepath.Join(os.TempDir(), "minio_test")
|
||||||
|
// Sample test item.
|
||||||
|
testItem = TestItem{Name: "test-item", Property: "property"}
|
||||||
|
// Ext for test item
|
||||||
|
testItemExt = ".test"
|
||||||
|
)
|
||||||
|
|
||||||
// Sample test event.
|
// Initialize the queue store.
|
||||||
var testEvent = event.Event{EventVersion: "1.0", EventSource: "test_source", AwsRegion: "test_region", EventTime: "test_time", EventName: event.ObjectAccessedGet}
|
func setUpQueueStore(directory string, limit uint64) (Store[TestItem], error) {
|
||||||
|
queueStore := NewQueueStore[TestItem](queueDir, limit, testItemExt)
|
||||||
// Initialize the store.
|
if oErr := queueStore.Open(); oErr != nil {
|
||||||
func setUpStore(directory string, limit uint64) (Store, error) {
|
|
||||||
store := NewQueueStore(queueDir, limit)
|
|
||||||
if oErr := store.Open(); oErr != nil {
|
|
||||||
return nil, oErr
|
return nil, oErr
|
||||||
}
|
}
|
||||||
return store, nil
|
return queueStore, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tear down store
|
// Tear down queue store.
|
||||||
func tearDownStore() error {
|
func tearDownQueueStore() error {
|
||||||
return os.RemoveAll(queueDir)
|
return os.RemoveAll(queueDir)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestQueueStorePut - tests for store.Put
|
// TestQueueStorePut - tests for store.Put
|
||||||
func TestQueueStorePut(t *testing.T) {
|
func TestQueueStorePut(t *testing.T) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := tearDownStore(); err != nil {
|
if err := tearDownQueueStore(); err != nil {
|
||||||
t.Fatal("Failed to tear down store ", err)
|
t.Fatal("Failed to tear down store ", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
store, err := setUpStore(queueDir, 100)
|
store, err := setUpQueueStore(queueDir, 100)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Failed to create a queue store ", err)
|
t.Fatal("Failed to create a queue store ", err)
|
||||||
}
|
}
|
||||||
// Put 100 events.
|
// Put 100 items.
|
||||||
for i := 0; i < 100; i++ {
|
for i := 0; i < 100; i++ {
|
||||||
if err := store.Put(testEvent); err != nil {
|
if err := store.Put(testItem); err != nil {
|
||||||
t.Fatal("Failed to put to queue store ", err)
|
t.Fatal("Failed to put to queue store ", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Count the events.
|
// Count the items.
|
||||||
names, err := store.List()
|
names, err := store.List()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -77,71 +83,71 @@ func TestQueueStorePut(t *testing.T) {
|
|||||||
// TestQueueStoreGet - tests for store.Get
|
// TestQueueStoreGet - tests for store.Get
|
||||||
func TestQueueStoreGet(t *testing.T) {
|
func TestQueueStoreGet(t *testing.T) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := tearDownStore(); err != nil {
|
if err := tearDownQueueStore(); err != nil {
|
||||||
t.Fatal("Failed to tear down store ", err)
|
t.Fatal("Failed to tear down store ", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
store, err := setUpStore(queueDir, 10)
|
store, err := setUpQueueStore(queueDir, 10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Failed to create a queue store ", err)
|
t.Fatal("Failed to create a queue store ", err)
|
||||||
}
|
}
|
||||||
// Put 10 events
|
// Put 10 items
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
if err := store.Put(testEvent); err != nil {
|
if err := store.Put(testItem); err != nil {
|
||||||
t.Fatal("Failed to put to queue store ", err)
|
t.Fatal("Failed to put to queue store ", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
eventKeys, err := store.List()
|
itemKeys, err := store.List()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
// Get 10 events.
|
// Get 10 items.
|
||||||
if len(eventKeys) == 10 {
|
if len(itemKeys) == 10 {
|
||||||
for _, key := range eventKeys {
|
for _, key := range itemKeys {
|
||||||
event, eErr := store.Get(strings.TrimSuffix(key, eventExt))
|
item, eErr := store.Get(strings.TrimSuffix(key, testItemExt))
|
||||||
if eErr != nil {
|
if eErr != nil {
|
||||||
t.Fatal("Failed to Get the event from the queue store ", eErr)
|
t.Fatal("Failed to Get the item from the queue store ", eErr)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(testEvent, event) {
|
if !reflect.DeepEqual(testItem, item) {
|
||||||
t.Fatalf("Failed to read the event: error: expected = %v, got = %v", testEvent, event)
|
t.Fatalf("Failed to read the item: error: expected = %v, got = %v", testItem, item)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
t.Fatalf("List() Expected: 10, got %d", len(eventKeys))
|
t.Fatalf("List() Expected: 10, got %d", len(itemKeys))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestQueueStoreDel - tests for store.Del
|
// TestQueueStoreDel - tests for store.Del
|
||||||
func TestQueueStoreDel(t *testing.T) {
|
func TestQueueStoreDel(t *testing.T) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := tearDownStore(); err != nil {
|
if err := tearDownQueueStore(); err != nil {
|
||||||
t.Fatal("Failed to tear down store ", err)
|
t.Fatal("Failed to tear down store ", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
store, err := setUpStore(queueDir, 20)
|
store, err := setUpQueueStore(queueDir, 20)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Failed to create a queue store ", err)
|
t.Fatal("Failed to create a queue store ", err)
|
||||||
}
|
}
|
||||||
// Put 20 events.
|
// Put 20 items.
|
||||||
for i := 0; i < 20; i++ {
|
for i := 0; i < 20; i++ {
|
||||||
if err := store.Put(testEvent); err != nil {
|
if err := store.Put(testItem); err != nil {
|
||||||
t.Fatal("Failed to put to queue store ", err)
|
t.Fatal("Failed to put to queue store ", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
eventKeys, err := store.List()
|
itemKeys, err := store.List()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
// Remove all the events.
|
// Remove all the items.
|
||||||
if len(eventKeys) == 20 {
|
if len(itemKeys) == 20 {
|
||||||
for _, key := range eventKeys {
|
for _, key := range itemKeys {
|
||||||
err := store.Del(strings.TrimSuffix(key, eventExt))
|
err := store.Del(strings.TrimSuffix(key, testItemExt))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("queue store Del failed with ", err)
|
t.Fatal("queue store Del failed with ", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
t.Fatalf("List() Expected: 20, got %d", len(eventKeys))
|
t.Fatalf("List() Expected: 20, got %d", len(itemKeys))
|
||||||
}
|
}
|
||||||
|
|
||||||
names, err := store.List()
|
names, err := store.List()
|
||||||
@ -153,25 +159,25 @@ func TestQueueStoreDel(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestQueueStoreLimit - tests the event limit for the store.
|
// TestQueueStoreLimit - tests the item limit for the store.
|
||||||
func TestQueueStoreLimit(t *testing.T) {
|
func TestQueueStoreLimit(t *testing.T) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := tearDownStore(); err != nil {
|
if err := tearDownQueueStore(); err != nil {
|
||||||
t.Fatal("Failed to tear down store ", err)
|
t.Fatal("Failed to tear down store ", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
// The max limit is set to 5.
|
// The max limit is set to 5.
|
||||||
store, err := setUpStore(queueDir, 5)
|
store, err := setUpQueueStore(queueDir, 5)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Failed to create a queue store ", err)
|
t.Fatal("Failed to create a queue store ", err)
|
||||||
}
|
}
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
if err := store.Put(testEvent); err != nil {
|
if err := store.Put(testItem); err != nil {
|
||||||
t.Fatal("Failed to put to queue store ", err)
|
t.Fatal("Failed to put to queue store ", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Should not allow 6th Put.
|
// Should not allow 6th Put.
|
||||||
if err := store.Put(testEvent); err == nil {
|
if err := store.Put(testItem); err == nil {
|
||||||
t.Fatalf("Expected to fail with %s, but passes", errLimitExceeded)
|
t.Fatalf("Expected to fail with %s, but passes", errLimitExceeded)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -179,20 +185,20 @@ func TestQueueStoreLimit(t *testing.T) {
|
|||||||
// TestQueueStoreLimit - tests for store.LimitN.
|
// TestQueueStoreLimit - tests for store.LimitN.
|
||||||
func TestQueueStoreListN(t *testing.T) {
|
func TestQueueStoreListN(t *testing.T) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := tearDownStore(); err != nil {
|
if err := tearDownQueueStore(); err != nil {
|
||||||
t.Fatal("Failed to tear down store ", err)
|
t.Fatal("Failed to tear down store ", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
store, err := setUpStore(queueDir, 10)
|
store, err := setUpQueueStore(queueDir, 10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Failed to create a queue store ", err)
|
t.Fatal("Failed to create a queue store ", err)
|
||||||
}
|
}
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
if err := store.Put(testEvent); err != nil {
|
if err := store.Put(testItem); err != nil {
|
||||||
t.Fatal("Failed to put to queue store ", err)
|
t.Fatal("Failed to put to queue store ", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Should return all the event keys in the store.
|
// Should return all the item keys in the store.
|
||||||
names, err := store.List()
|
names, err := store.List()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -203,7 +209,7 @@ func TestQueueStoreListN(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// re-open
|
// re-open
|
||||||
store, err = setUpStore(queueDir, 10)
|
store, err = setUpQueueStore(queueDir, 10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Failed to create a queue store ", err)
|
t.Fatal("Failed to create a queue store ", err)
|
||||||
}
|
}
|
145
internal/store/store.go
Normal file
145
internal/store/store.go
Normal file
@ -0,0 +1,145 @@
|
|||||||
|
// Copyright (c) 2015-2023 MinIO, Inc.
|
||||||
|
//
|
||||||
|
// This file is part of MinIO Object Storage stack
|
||||||
|
//
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// This program is distributed in the hope that it will be useful
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/minio/minio/internal/logger"
|
||||||
|
xnet "github.com/minio/pkg/net"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
retryInterval = 3 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// ErrNotConnected - indicates that the target connection is not active.
|
||||||
|
var ErrNotConnected = errors.New("not connected to target server/service")
|
||||||
|
|
||||||
|
// Target - store target interface
|
||||||
|
type Target interface {
|
||||||
|
Name() string
|
||||||
|
Send(key string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store - Used to persist items.
|
||||||
|
type Store[I any] interface {
|
||||||
|
Put(item I) error
|
||||||
|
Get(key string) (I, error)
|
||||||
|
Len() int
|
||||||
|
List() ([]string, error)
|
||||||
|
Del(key string) error
|
||||||
|
Open() error
|
||||||
|
Extension() string
|
||||||
|
}
|
||||||
|
|
||||||
|
// replayItems - Reads the items from the store and replays.
|
||||||
|
func replayItems[I any](store Store[I], doneCh <-chan struct{}, loggerOnce logger.LogOnce, id string) <-chan string {
|
||||||
|
itemKeyCh := make(chan string)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(itemKeyCh)
|
||||||
|
|
||||||
|
retryTicker := time.NewTicker(retryInterval)
|
||||||
|
defer retryTicker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
names, err := store.List()
|
||||||
|
if err != nil {
|
||||||
|
loggerOnce(context.Background(), fmt.Errorf("store.List() failed with: %w", err), id)
|
||||||
|
} else {
|
||||||
|
for _, name := range names {
|
||||||
|
select {
|
||||||
|
case itemKeyCh <- strings.TrimSuffix(name, store.Extension()):
|
||||||
|
// Get next key.
|
||||||
|
case <-doneCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-retryTicker.C:
|
||||||
|
case <-doneCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return itemKeyCh
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendItems - Reads items from the store and re-plays.
|
||||||
|
func sendItems(target Target, itemKeyCh <-chan string, doneCh <-chan struct{}, loggerOnce logger.LogOnce) {
|
||||||
|
retryTicker := time.NewTicker(retryInterval)
|
||||||
|
defer retryTicker.Stop()
|
||||||
|
|
||||||
|
send := func(itemKey string) bool {
|
||||||
|
for {
|
||||||
|
err := target.Send(itemKey)
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != ErrNotConnected && !xnet.IsConnResetErr(err) {
|
||||||
|
loggerOnce(context.Background(),
|
||||||
|
fmt.Errorf("target.Send() failed with '%w'", err),
|
||||||
|
target.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrying after 3secs back-off
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-retryTicker.C:
|
||||||
|
case <-doneCh:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case itemKey, ok := <-itemKeyCh:
|
||||||
|
if !ok {
|
||||||
|
// closed channel.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !send(itemKey) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-doneCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// StreamItems reads the keys from the store and replays the corresponding item to the target.
|
||||||
|
func StreamItems[I any](store Store[I], target Target, doneCh <-chan struct{}, loggerOnce logger.LogOnce) {
|
||||||
|
go func() {
|
||||||
|
// Replays the items from the store.
|
||||||
|
itemKeyCh := replayItems(store, doneCh, loggerOnce, target.Name())
|
||||||
|
// Send items from the store.
|
||||||
|
sendItems(target, itemKeyCh, doneCh, loggerOnce)
|
||||||
|
}()
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user