non-blocking initialization of bucket target notifications (#15571)

This commit is contained in:
Anis Elleuch
2022-09-28 01:23:28 +01:00
committed by GitHub
parent 94dbb4a427
commit 86bb48792c
21 changed files with 827 additions and 732 deletions

View File

@@ -111,12 +111,16 @@ func (a *AMQPArgs) Validate() error {
// AMQPTarget - AMQP target
type AMQPTarget struct {
lazyInit lazyInit
id event.TargetID
args AMQPArgs
conn *amqp.Connection
connMutex sync.Mutex
store Store
loggerOnce logger.LogOnce
quitCh chan struct{}
}
// ID - returns TargetID.
@@ -126,6 +130,14 @@ func (target *AMQPTarget) ID() event.TargetID {
// IsActive - Return true if target is up and active
func (target *AMQPTarget) IsActive() (bool, error) {
if err := target.init(); err != nil {
return false, err
}
return target.isActive()
}
func (target *AMQPTarget) isActive() (bool, error) {
ch, _, err := target.channel()
if err != nil {
return false, err
@@ -136,11 +148,6 @@ func (target *AMQPTarget) IsActive() (bool, error) {
return true, nil
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *AMQPTarget) HasQueueStore() bool {
return target.store != nil
}
func (target *AMQPTarget) channel() (*amqp.Channel, chan amqp.Confirmation, error) {
var err error
var conn *amqp.Connection
@@ -256,6 +263,10 @@ func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel, confirms
// Save - saves the events to the store which will be replayed when the amqp connection is active.
func (target *AMQPTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
@@ -270,6 +281,10 @@ func (target *AMQPTarget) Save(eventData event.Event) error {
// Send - sends event to AMQP.
func (target *AMQPTarget) Send(eventKey string) error {
if err := target.init(); err != nil {
return err
}
ch, confirms, err := target.channel()
if err != nil {
return err
@@ -296,51 +311,49 @@ func (target *AMQPTarget) Send(eventKey string) error {
// Close - does nothing and available for interface compatibility.
func (target *AMQPTarget) Close() error {
close(target.quitCh)
if target.conn != nil {
return target.conn.Close()
}
return nil
}
// NewAMQPTarget - creates new AMQP target.
func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*AMQPTarget, error) {
var conn *amqp.Connection
var err error
func (target *AMQPTarget) init() error {
return target.lazyInit.Do(target.initAMQP)
}
var store Store
target := &AMQPTarget{
id: event.TargetID{ID: id, Name: "amqp"},
args: args,
loggerOnce: loggerOnce,
}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-amqp-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
target.loggerOnce(context.Background(), oErr, target.ID().String())
return target, oErr
}
target.store = store
}
conn, err = amqp.Dial(args.URL.String())
func (target *AMQPTarget) initAMQP() error {
conn, err := amqp.Dial(target.args.URL.String())
if err != nil {
if store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
if IsConnRefusedErr(err) || IsConnResetErr(err) {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
}
return err
}
target.conn = conn
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
// NewAMQPTarget - creates new AMQP target.
func NewAMQPTarget(id string, args AMQPArgs, loggerOnce logger.LogOnce) (*AMQPTarget, error) {
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-amqp-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if err := store.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of AMQP `%s`: %w", id, err)
}
}
return target, nil
return &AMQPTarget{
id: event.TargetID{ID: id, Name: "amqp"},
args: args,
loggerOnce: loggerOnce,
store: store,
quitCh: make(chan struct{}),
}, nil
}

View File

@@ -152,11 +152,14 @@ func (a ElasticsearchArgs) Validate() error {
// ElasticsearchTarget - Elasticsearch target.
type ElasticsearchTarget struct {
lazyInit lazyInit
id event.TargetID
args ElasticsearchArgs
client esClient
store Store
loggerOnce logger.LogOnce
quitCh chan struct{}
}
// ID - returns target ID.
@@ -164,13 +167,15 @@ func (target *ElasticsearchTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *ElasticsearchTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *ElasticsearchTarget) IsActive() (bool, error) {
if err := target.init(); err != nil {
return false, err
}
return target.isActive()
}
func (target *ElasticsearchTarget) isActive() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
@@ -184,6 +189,10 @@ func (target *ElasticsearchTarget) IsActive() (bool, error) {
// Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active.
func (target *ElasticsearchTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
@@ -246,6 +255,10 @@ func (target *ElasticsearchTarget) send(eventData event.Event) error {
// Send - reads an event from store and sends it to Elasticsearch.
func (target *ElasticsearchTarget) Send(eventKey string) error {
if err := target.init(); err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
@@ -277,6 +290,7 @@ func (target *ElasticsearchTarget) Send(eventKey string) error {
// Close - does nothing and available for interface compatibility.
func (target *ElasticsearchTarget) Close() error {
close(target.quitCh)
if target.client != nil {
// Stops the background processes that the client is running.
target.client.stop()
@@ -319,42 +333,46 @@ func (target *ElasticsearchTarget) checkAndInitClient(ctx context.Context) error
return nil
}
// NewElasticsearchTarget - creates new Elasticsearch target.
func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*ElasticsearchTarget, error) {
target := &ElasticsearchTarget{
id: event.TargetID{ID: id, Name: "elasticsearch"},
args: args,
loggerOnce: loggerOnce,
}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id)
target.store = NewQueueStore(queueDir, args.QueueLimit)
if err := target.store.Open(); err != nil {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
}
}
func (target *ElasticsearchTarget) init() error {
return target.lazyInit.Do(target.initElasticsearch)
}
func (target *ElasticsearchTarget) initElasticsearch() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := target.checkAndInitClient(ctx)
if err != nil {
if target.store == nil || err != errNotConnected {
if err != errNotConnected {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
}
return err
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
// NewElasticsearchTarget - creates new Elasticsearch target.
func NewElasticsearchTarget(id string, args ElasticsearchArgs, loggerOnce logger.LogOnce) (*ElasticsearchTarget, error) {
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if err := store.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of Elasticsearch `%s`: %w", id, err)
}
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil
return &ElasticsearchTarget{
id: event.TargetID{ID: id, Name: "elasticsearch"},
args: args,
store: store,
loggerOnce: loggerOnce,
quitCh: make(chan struct{}),
}, nil
}
// ES Client definitions and methods

View File

@@ -23,6 +23,7 @@ import (
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"net"
"net/url"
"os"
@@ -122,12 +123,15 @@ func (k KafkaArgs) Validate() error {
// KafkaTarget - Kafka target.
type KafkaTarget struct {
lazyInit lazyInit
id event.TargetID
args KafkaArgs
producer sarama.SyncProducer
config *sarama.Config
store Store
loggerOnce logger.LogOnce
quitCh chan struct{}
}
// ID - returns target ID.
@@ -135,13 +139,15 @@ func (target *KafkaTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *KafkaTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *KafkaTarget) IsActive() (bool, error) {
if err := target.init(); err != nil {
return false, err
}
return target.isActive()
}
func (target *KafkaTarget) isActive() (bool, error) {
if !target.args.pingBrokers() {
return false, errNotConnected
}
@@ -150,10 +156,14 @@ func (target *KafkaTarget) IsActive() (bool, error) {
// Save - saves the events to the store which will be replayed when the Kafka connection is active.
func (target *KafkaTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.IsActive()
_, err := target.isActive()
if err != nil {
return err
}
@@ -189,8 +199,12 @@ func (target *KafkaTarget) send(eventData event.Event) error {
// Send - reads an event from store and sends it to Kafka.
func (target *KafkaTarget) Send(eventKey string) error {
if err := target.init(); err != nil {
return err
}
var err error
_, err = target.IsActive()
_, err = target.isActive()
if err != nil {
return err
}
@@ -234,6 +248,7 @@ func (target *KafkaTarget) Send(eventKey string) error {
// Close - closes underneath kafka connection.
func (target *KafkaTarget) Close() error {
close(target.quitCh)
if target.producer != nil {
return target.producer.Close()
}
@@ -251,21 +266,19 @@ func (k KafkaArgs) pingBrokers() bool {
return false
}
// NewKafkaTarget - creates new Kafka target with auth credentials.
func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*KafkaTarget, error) {
func (target *KafkaTarget) init() error {
return target.lazyInit.Do(target.initKafka)
}
func (target *KafkaTarget) initKafka() error {
args := target.args
config := sarama.NewConfig()
target := &KafkaTarget{
id: event.TargetID{ID: id, Name: "kafka"},
args: args,
loggerOnce: loggerOnce,
}
if args.Version != "" {
kafkaVersion, err := sarama.ParseKafkaVersion(args.Version)
if err != nil {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
return err
}
config.Version = kafkaVersion
}
@@ -278,7 +291,7 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc
tlsConfig, err := saramatls.NewConfig(args.TLS.ClientTLSCert, args.TLS.ClientTLSKey)
if err != nil {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
return err
}
config.Net.TLS.Enable = args.TLS.Enable
@@ -298,33 +311,46 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc
brokers = append(brokers, broker.String())
}
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-kafka-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
target.loggerOnce(context.Background(), oErr, target.ID().String())
return target, oErr
}
target.store = store
}
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
if store == nil || err != sarama.ErrOutOfBrokers {
if err != sarama.ErrOutOfBrokers {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
}
target.producer.Close()
return err
}
target.producer = producer
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
yes, err := target.isActive()
if err != nil {
return err
}
if !yes {
return errNotConnected
}
return target, nil
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
// NewKafkaTarget - creates new Kafka target with auth credentials.
func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*KafkaTarget, error) {
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-kafka-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if err := store.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of Kafka `%s`: %w", id, err)
}
}
return &KafkaTarget{
id: event.TargetID{ID: id, Name: "kafka"},
args: args,
store: store,
loggerOnce: loggerOnce,
quitCh: make(chan struct{}),
}, nil
}

View File

@@ -0,0 +1,50 @@
// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"sync"
"sync/atomic"
)
// Inspired from Golang sync.Once but it is only marked
// initialized when the provided function returns nil.
type lazyInit struct {
done uint32
m sync.Mutex
}
func (l *lazyInit) Do(f func() error) error {
if atomic.LoadUint32(&l.done) == 0 {
return l.doSlow(f)
}
return nil
}
func (l *lazyInit) doSlow(f func() error) error {
l.m.Lock()
defer l.m.Unlock()
if atomic.LoadUint32(&l.done) == 0 {
if f() == nil {
// Mark as done only when f() is successful
atomic.StoreUint32(&l.done, 1)
}
}
return nil
}

View File

@@ -18,7 +18,6 @@
package target
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
@@ -36,7 +35,7 @@ import (
)
const (
reconnectInterval = 5 // In Seconds
reconnectInterval = 5 * time.Second
storePrefix = "minio"
)
@@ -107,6 +106,8 @@ func (m MQTTArgs) Validate() error {
// MQTTTarget - MQTT target.
type MQTTTarget struct {
lazyInit lazyInit
id event.TargetID
args MQTTArgs
client mqtt.Client
@@ -120,13 +121,15 @@ func (target *MQTTTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *MQTTTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *MQTTTarget) IsActive() (bool, error) {
if err := target.init(); err != nil {
return false, err
}
return target.isActive()
}
func (target *MQTTTarget) isActive() (bool, error) {
if !target.client.IsConnectionOpen() {
return false, errNotConnected
}
@@ -147,7 +150,7 @@ func (target *MQTTTarget) send(eventData event.Event) error {
}
token := target.client.Publish(target.args.Topic, target.args.QoS, false, string(data))
if !token.WaitTimeout(reconnectInterval * time.Second) {
if !token.WaitTimeout(reconnectInterval) {
return errNotConnected
}
return token.Error()
@@ -155,8 +158,12 @@ func (target *MQTTTarget) send(eventData event.Event) error {
// Send - reads an event from store and sends it to MQTT.
func (target *MQTTTarget) Send(eventKey string) error {
if err := target.init(); err != nil {
return err
}
// Do not send if the connection is not active.
_, err := target.IsActive()
_, err := target.isActive()
if err != nil {
return err
}
@@ -182,12 +189,16 @@ func (target *MQTTTarget) Send(eventKey string) error {
// Save - saves the events to the store if queuestore is configured, which will
// be replayed when the mqtt connection is active.
func (target *MQTTTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
// Do not send if the connection is not active.
_, err := target.IsActive()
_, err := target.isActive()
if err != nil {
return err
}
@@ -202,17 +213,12 @@ func (target *MQTTTarget) Close() error {
return nil
}
// NewMQTTTarget - creates new MQTT target.
func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*MQTTTarget, error) {
if args.MaxReconnectInterval == 0 {
// Default interval
// https://github.com/eclipse/paho.mqtt.golang/blob/master/options.go#L115
args.MaxReconnectInterval = 10 * time.Minute
}
func (target *MQTTTarget) init() error {
return target.lazyInit.Do(target.initMQTT)
}
if args.KeepAlive == 0 {
args.KeepAlive = 10 * time.Second
}
func (target *MQTTTarget) initMQTT() error {
args := target.args
// Using hex here, to make sure we avoid 23
// character limit on client_id according to
@@ -229,61 +235,57 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce
SetTLSConfig(&tls.Config{RootCAs: args.RootCAs}).
AddBroker(args.Broker.String())
client := mqtt.NewClient(options)
target.client = mqtt.NewClient(options)
target := &MQTTTarget{
id: event.TargetID{ID: id, Name: "mqtt"},
args: args,
client: client,
quitCh: make(chan struct{}),
loggerOnce: loggerOnce,
token := target.client.Connect()
ok := token.WaitTimeout(reconnectInterval)
if !ok {
return errNotConnected
}
if token.Error() != nil {
return token.Error()
}
token := client.Connect()
retryRegister := func() {
for {
retry:
select {
case <-doneCh:
return
case <-target.quitCh:
return
default:
ok := token.WaitTimeout(reconnectInterval * time.Second)
if ok && token.Error() != nil {
target.loggerOnce(context.Background(),
fmt.Errorf("Previous connect failed with %w attempting a reconnect",
token.Error()),
target.ID().String())
time.Sleep(reconnectInterval * time.Second)
token = client.Connect()
goto retry
}
if ok {
// Successfully connected.
return
}
}
}
yes, err := target.isActive()
if err != nil {
return err
}
if !yes {
return errNotConnected
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
// NewMQTTTarget - creates new MQTT target.
func NewMQTTTarget(id string, args MQTTArgs, loggerOnce logger.LogOnce) (*MQTTTarget, error) {
if args.MaxReconnectInterval == 0 {
// Default interval
// https://github.com/eclipse/paho.mqtt.golang/blob/master/options.go#L115
args.MaxReconnectInterval = 10 * time.Minute
}
if args.KeepAlive == 0 {
args.KeepAlive = 10 * time.Second
}
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id)
target.store = NewQueueStore(queueDir, args.QueueLimit)
if err := target.store.Open(); err != nil {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
store = NewQueueStore(queueDir, args.QueueLimit)
if err := store.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of MQTT `%s`: %w", id, err)
}
if !test {
go retryRegister()
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
} else if token.Wait() && token.Error() != nil {
return target, token.Error()
}
return target, nil
return &MQTTTarget{
id: event.TargetID{ID: id, Name: "mqtt"},
args: args,
store: store,
quitCh: make(chan struct{}),
loggerOnce: loggerOnce,
}, nil
}

View File

@@ -145,6 +145,8 @@ func (m MySQLArgs) Validate() error {
// MySQLTarget - MySQL target.
type MySQLTarget struct {
lazyInit lazyInit
id event.TargetID
args MySQLArgs
updateStmt *sql.Stmt
@@ -154,6 +156,8 @@ type MySQLTarget struct {
store Store
firstPing bool
loggerOnce logger.LogOnce
quitCh chan struct{}
}
// ID - returns target ID.
@@ -161,24 +165,15 @@ func (target *MySQLTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *MySQLTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *MySQLTarget) IsActive() (bool, error) {
if target.db == nil {
db, sErr := sql.Open("mysql", target.args.DSN)
if sErr != nil {
return false, sErr
}
target.db = db
if target.args.MaxOpenConnections > 0 {
// Set the maximum connections limit
target.db.SetMaxOpenConns(target.args.MaxOpenConnections)
}
if err := target.init(); err != nil {
return false, err
}
return target.isActive()
}
func (target *MySQLTarget) isActive() (bool, error) {
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
return false, errNotConnected
@@ -190,10 +185,14 @@ func (target *MySQLTarget) IsActive() (bool, error) {
// Save - saves the events to the store which will be replayed when the SQL connection is active.
func (target *MySQLTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.IsActive()
_, err := target.isActive()
if err != nil {
return err
}
@@ -244,7 +243,11 @@ func (target *MySQLTarget) send(eventData event.Event) error {
// Send - reads an event from store and sends it to MySQL.
func (target *MySQLTarget) Send(eventKey string) error {
_, err := target.IsActive()
if err := target.init(); err != nil {
return err
}
_, err := target.isActive()
if err != nil {
return err
}
@@ -281,6 +284,7 @@ func (target *MySQLTarget) Send(eventKey string) error {
// Close - closes underneath connections to MySQL database.
func (target *MySQLTarget) Close() error {
close(target.quitCh)
if target.updateStmt != nil {
// FIXME: log returned error. ignore time being.
_ = target.updateStmt.Close()
@@ -333,8 +337,68 @@ func (target *MySQLTarget) executeStmts() error {
return nil
}
func (target *MySQLTarget) init() error {
return target.lazyInit.Do(target.initMySQL)
}
func (target *MySQLTarget) initMySQL() error {
args := target.args
db, err := sql.Open("mysql", args.DSN)
if err != nil {
target.loggerOnce(context.Background(), err, target.ID().String())
return err
}
target.db = db
if args.MaxOpenConnections > 0 {
// Set the maximum connections limit
target.db.SetMaxOpenConns(args.MaxOpenConnections)
}
err = target.db.Ping()
if err != nil {
if !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
target.loggerOnce(context.Background(), err, target.ID().String())
}
} else {
if err = target.executeStmts(); err != nil {
target.loggerOnce(context.Background(), err, target.ID().String())
} else {
target.firstPing = true
}
}
if err != nil {
target.db.Close()
return err
}
yes, err := target.isActive()
if err != nil {
return err
}
if !yes {
return errNotConnected
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
// NewMySQLTarget - creates new MySQL target.
func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*MySQLTarget, error) {
func NewMySQLTarget(id string, args MySQLArgs, loggerOnce logger.LogOnce) (*MySQLTarget, error) {
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mysql-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if err := store.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of MySQL `%s`: %w", id, err)
}
}
if args.DSN == "" {
config := mysql.Config{
User: args.User,
@@ -349,57 +413,12 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnc
args.DSN = config.FormatDSN()
}
target := &MySQLTarget{
return &MySQLTarget{
id: event.TargetID{ID: id, Name: "mysql"},
args: args,
firstPing: false,
store: store,
loggerOnce: loggerOnce,
}
db, err := sql.Open("mysql", args.DSN)
if err != nil {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
}
target.db = db
if args.MaxOpenConnections > 0 {
// Set the maximum connections limit
target.db.SetMaxOpenConns(args.MaxOpenConnections)
}
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mysql-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
target.loggerOnce(context.Background(), oErr, target.ID().String())
return target, oErr
}
target.store = store
}
err = target.db.Ping()
if err != nil {
if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
}
} else {
if err = target.executeStmts(); err != nil {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
}
target.firstPing = true
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil
quitCh: make(chan struct{}),
}, nil
}

View File

@@ -23,6 +23,7 @@ import (
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
@@ -212,6 +213,8 @@ func (n NATSArgs) connectStan() (stan.Conn, error) {
// NATSTarget - NATS target.
type NATSTarget struct {
lazyInit lazyInit
id event.TargetID
args NATSArgs
natsConn *nats.Conn
@@ -219,6 +222,7 @@ type NATSTarget struct {
jstream nats.JetStream
store Store
loggerOnce logger.LogOnce
quitCh chan struct{}
}
// ID - returns target ID.
@@ -226,13 +230,15 @@ func (target *NATSTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *NATSTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *NATSTarget) IsActive() (bool, error) {
if err := target.init(); err != nil {
return false, err
}
return target.isActive()
}
func (target *NATSTarget) isActive() (bool, error) {
var connErr error
if target.args.Streaming.Enable {
if target.stanConn == nil || target.stanConn.NatsConn() == nil {
@@ -270,10 +276,14 @@ func (target *NATSTarget) IsActive() (bool, error) {
// Save - saves the events to the store which will be replayed when the Nats connection is active.
func (target *NATSTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.IsActive()
_, err := target.isActive()
if err != nil {
return err
}
@@ -311,7 +321,11 @@ func (target *NATSTarget) send(eventData event.Event) error {
// Send - sends event to Nats.
func (target *NATSTarget) Send(eventKey string) error {
_, err := target.IsActive()
if err := target.init(); err != nil {
return err
}
_, err := target.isActive()
if err != nil {
return err
}
@@ -335,6 +349,7 @@ func (target *NATSTarget) Send(eventKey string) error {
// Close - closes underneath connections to NATS server.
func (target *NATSTarget) Close() (err error) {
close(target.quitCh)
if target.stanConn != nil {
// closing the streaming connection does not close the provided NATS connection.
if target.stanConn.NatsConn() != nil {
@@ -350,66 +365,73 @@ func (target *NATSTarget) Close() (err error) {
return nil
}
// NewNATSTarget - creates new NATS target.
func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*NATSTarget, error) {
var natsConn *nats.Conn
var stanConn stan.Conn
var jstream nats.JetStream
func (target *NATSTarget) init() error {
return target.lazyInit.Do(target.initNATS)
}
func (target *NATSTarget) initNATS() error {
args := target.args
var err error
var store Store
target := &NATSTarget{
id: event.TargetID{ID: id, Name: "nats"},
args: args,
loggerOnce: loggerOnce,
}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-nats-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
target.loggerOnce(context.Background(), oErr, target.ID().String())
return target, oErr
}
target.store = store
}
if args.Streaming.Enable {
target.loggerOnce(context.Background(), errors.New("NATS Streaming is deprecated please migrate to JetStream"), target.ID().String())
var stanConn stan.Conn
stanConn, err = args.connectStan()
target.stanConn = stanConn
} else {
var natsConn *nats.Conn
natsConn, err = args.connectNats()
target.natsConn = natsConn
}
if err != nil {
if store == nil || err.Error() != nats.ErrNoServers.Error() {
if err.Error() != nats.ErrNoServers.Error() {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
}
return err
}
if target.natsConn != nil && args.JetStream.Enable {
var jstream nats.JetStream
jstream, err = target.natsConn.JetStream()
if err != nil {
if store == nil || err.Error() != nats.ErrNoServers.Error() {
if err.Error() != nats.ErrNoServers.Error() {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
}
return err
}
target.jstream = jstream
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
yes, err := target.isActive()
if err != nil {
return err
}
if !yes {
return errNotConnected
}
return target, nil
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
// NewNATSTarget - creates new NATS target.
func NewNATSTarget(id string, args NATSArgs, loggerOnce logger.LogOnce) (*NATSTarget, error) {
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-nats-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if err := store.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of NATS `%s`: %w", id, err)
}
}
return &NATSTarget{
id: event.TargetID{ID: id, Name: "nats"},
args: args,
loggerOnce: loggerOnce,
store: store,
quitCh: make(chan struct{}),
}, nil
}

View File

@@ -22,6 +22,7 @@ import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
@@ -88,12 +89,15 @@ func (n NSQArgs) Validate() error {
// NSQTarget - NSQ target.
type NSQTarget struct {
lazyInit lazyInit
id event.TargetID
args NSQArgs
producer *nsq.Producer
store Store
config *nsq.Config
loggerOnce logger.LogOnce
quitCh chan struct{}
}
// ID - returns target ID.
@@ -101,13 +105,15 @@ func (target *NSQTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *NSQTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *NSQTarget) IsActive() (bool, error) {
if err := target.init(); err != nil {
return false, err
}
return target.isActive()
}
func (target *NSQTarget) isActive() (bool, error) {
if target.producer == nil {
producer, err := nsq.NewProducer(target.args.NSQDAddress.String(), target.config)
if err != nil {
@@ -128,10 +134,14 @@ func (target *NSQTarget) IsActive() (bool, error) {
// Save - saves the events to the store which will be replayed when the nsq connection is active.
func (target *NSQTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.IsActive()
_, err := target.isActive()
if err != nil {
return err
}
@@ -156,7 +166,11 @@ func (target *NSQTarget) send(eventData event.Event) error {
// Send - reads an event from store and sends it to NSQ.
func (target *NSQTarget) Send(eventKey string) error {
_, err := target.IsActive()
if err := target.init(); err != nil {
return err
}
_, err := target.isActive()
if err != nil {
return err
}
@@ -181,6 +195,7 @@ func (target *NSQTarget) Send(eventKey string) error {
// Close - closes underneath connections to NSQD server.
func (target *NSQTarget) Close() (err error) {
close(target.quitCh)
if target.producer != nil {
// this blocks until complete:
target.producer.Stop()
@@ -188,8 +203,13 @@ func (target *NSQTarget) Close() (err error) {
return nil
}
// NewNSQTarget - creates new NSQ target.
func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*NSQTarget, error) {
func (target *NSQTarget) init() error {
return target.lazyInit.Do(target.initNSQ)
}
func (target *NSQTarget) initNSQ() error {
args := target.args
config := nsq.NewConfig()
if args.TLS.Enable {
config.TlsV1 = true
@@ -197,47 +217,55 @@ func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce lo
InsecureSkipVerify: args.TLS.SkipVerify,
}
}
var store Store
target := &NSQTarget{
id: event.TargetID{ID: id, Name: "nsq"},
args: args,
config: config,
loggerOnce: loggerOnce,
}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-nsq-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
target.loggerOnce(context.Background(), oErr, target.ID().String())
return target, oErr
}
target.store = store
}
target.config = config
producer, err := nsq.NewProducer(args.NSQDAddress.String(), config)
if err != nil {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
return err
}
target.producer = producer
if err := target.producer.Ping(); err != nil {
err = target.producer.Ping()
if err != nil {
// To treat "connection refused" errors as errNotConnected.
if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
if !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
}
target.producer.Stop()
return err
}
yes, err := target.isActive()
if err != nil {
return err
}
if !yes {
return errNotConnected
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
// NewNSQTarget - creates new NSQ target.
func NewNSQTarget(id string, args NSQArgs, loggerOnce logger.LogOnce) (*NSQTarget, error) {
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-nsq-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if err := store.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of NSQ `%s`: %w", id, err)
}
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil
return &NSQTarget{
id: event.TargetID{ID: id, Name: "nsq"},
args: args,
loggerOnce: loggerOnce,
store: store,
quitCh: make(chan struct{}),
}, nil
}

View File

@@ -137,6 +137,8 @@ func (p PostgreSQLArgs) Validate() error {
// PostgreSQLTarget - PostgreSQL target.
type PostgreSQLTarget struct {
lazyInit lazyInit
id event.TargetID
args PostgreSQLArgs
updateStmt *sql.Stmt
@@ -147,6 +149,7 @@ type PostgreSQLTarget struct {
firstPing bool
connString string
loggerOnce logger.LogOnce
quitCh chan struct{}
}
// ID - returns target ID.
@@ -154,24 +157,15 @@ func (target *PostgreSQLTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *PostgreSQLTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *PostgreSQLTarget) IsActive() (bool, error) {
if target.db == nil {
db, err := sql.Open("postgres", target.connString)
if err != nil {
return false, err
}
target.db = db
if target.args.MaxOpenConnections > 0 {
// Set the maximum connections limit
target.db.SetMaxOpenConns(target.args.MaxOpenConnections)
}
if err := target.init(); err != nil {
return false, err
}
return target.isActive()
}
func (target *PostgreSQLTarget) isActive() (bool, error) {
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
return false, errNotConnected
@@ -183,10 +177,14 @@ func (target *PostgreSQLTarget) IsActive() (bool, error) {
// Save - saves the events to the store if questore is configured, which will be replayed when the PostgreSQL connection is active.
func (target *PostgreSQLTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.IsActive()
_, err := target.isActive()
if err != nil {
return err
}
@@ -241,7 +239,11 @@ func (target *PostgreSQLTarget) send(eventData event.Event) error {
// Send - reads an event from store and sends it to PostgreSQL.
func (target *PostgreSQLTarget) Send(eventKey string) error {
_, err := target.IsActive()
if err := target.init(); err != nil {
return err
}
_, err := target.isActive()
if err != nil {
return err
}
@@ -277,6 +279,7 @@ func (target *PostgreSQLTarget) Send(eventKey string) error {
// Close - closes underneath connections to PostgreSQL database.
func (target *PostgreSQLTarget) Close() error {
close(target.quitCh)
if target.updateStmt != nil {
// FIXME: log returned error. ignore time being.
_ = target.updateStmt.Close()
@@ -329,8 +332,58 @@ func (target *PostgreSQLTarget) executeStmts() error {
return nil
}
func (target *PostgreSQLTarget) init() error {
return target.lazyInit.Do(target.initPostgreSQL)
}
func (target *PostgreSQLTarget) initPostgreSQL() error {
args := target.args
db, err := sql.Open("postgres", target.connString)
if err != nil {
return err
}
target.db = db
if args.MaxOpenConnections > 0 {
// Set the maximum connections limit
target.db.SetMaxOpenConns(args.MaxOpenConnections)
}
err = target.db.Ping()
if err != nil {
if !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
target.loggerOnce(context.Background(), err, target.ID().String())
}
} else {
if err = target.executeStmts(); err != nil {
target.loggerOnce(context.Background(), err, target.ID().String())
} else {
target.firstPing = true
}
}
if err != nil {
target.db.Close()
return err
}
yes, err := target.isActive()
if err != nil {
return err
}
if !yes {
return errNotConnected
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
// NewPostgreSQLTarget - creates new PostgreSQL target.
func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*PostgreSQLTarget, error) {
func NewPostgreSQLTarget(id string, args PostgreSQLArgs, loggerOnce logger.LogOnce) (*PostgreSQLTarget, error) {
params := []string{args.ConnectionString}
if args.ConnectionString == "" {
params = []string{}
@@ -352,57 +405,22 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{},
}
connStr := strings.Join(params, " ")
target := &PostgreSQLTarget{
id: event.TargetID{ID: id, Name: "postgresql"},
args: args,
firstPing: false,
connString: connStr,
loggerOnce: loggerOnce,
}
db, err := sql.Open("postgres", connStr)
if err != nil {
return target, err
}
target.db = db
if args.MaxOpenConnections > 0 {
// Set the maximum connections limit
target.db.SetMaxOpenConns(args.MaxOpenConnections)
}
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-postgresql-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
target.loggerOnce(context.Background(), oErr, target.ID().String())
return target, oErr
if err := store.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of PostgreSQL `%s`: %w", id, err)
}
target.store = store
}
err = target.db.Ping()
if err != nil {
if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
}
} else {
if err = target.executeStmts(); err != nil {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
}
target.firstPing = true
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil
return &PostgreSQLTarget{
id: event.TargetID{ID: id, Name: "postgresql"},
args: args,
firstPing: false,
store: store,
connString: connStr,
loggerOnce: loggerOnce,
quitCh: make(chan struct{}),
}, nil
}

View File

@@ -117,12 +117,15 @@ func (r RedisArgs) validateFormat(c redis.Conn) error {
// RedisTarget - Redis target.
type RedisTarget struct {
lazyInit lazyInit
id event.TargetID
args RedisArgs
pool *redis.Pool
store Store
firstPing bool
loggerOnce logger.LogOnce
quitCh chan struct{}
}
// ID - returns target ID.
@@ -130,13 +133,15 @@ func (target *RedisTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *RedisTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *RedisTarget) IsActive() (bool, error) {
if err := target.init(); err != nil {
return false, err
}
return target.isActive()
}
func (target *RedisTarget) isActive() (bool, error) {
conn := target.pool.Get()
defer conn.Close()
@@ -152,10 +157,14 @@ func (target *RedisTarget) IsActive() (bool, error) {
// Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active.
func (target *RedisTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.IsActive()
_, err := target.isActive()
if err != nil {
return err
}
@@ -204,6 +213,10 @@ func (target *RedisTarget) send(eventData event.Event) error {
// Send - reads an event from store and sends it to redis.
func (target *RedisTarget) Send(eventKey string) error {
if err := target.init(); err != nil {
return err
}
conn := target.pool.Get()
defer conn.Close()
@@ -248,11 +261,58 @@ func (target *RedisTarget) Send(eventKey string) error {
// Close - releases the resources used by the pool.
func (target *RedisTarget) Close() error {
close(target.quitCh)
return target.pool.Close()
}
func (target *RedisTarget) init() error {
return target.lazyInit.Do(target.initRedis)
}
func (target *RedisTarget) initRedis() error {
conn := target.pool.Get()
defer conn.Close()
_, pingErr := conn.Do("PING")
if pingErr != nil {
if !(IsConnRefusedErr(pingErr) || IsConnResetErr(pingErr)) {
target.loggerOnce(context.Background(), pingErr, target.ID().String())
}
return pingErr
}
if err := target.args.validateFormat(conn); err != nil {
target.loggerOnce(context.Background(), err, target.ID().String())
return err
}
target.firstPing = true
yes, err := target.isActive()
if err != nil {
return err
}
if !yes {
return errNotConnected
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
// NewRedisTarget - creates new Redis target.
func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*RedisTarget, error) {
func NewRedisTarget(id string, args RedisArgs, loggerOnce logger.LogOnce) (*RedisTarget, error) {
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-redis-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if err := store.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of Redis `%s`: %w", id, err)
}
}
pool := &redis.Pool{
MaxIdle: 3,
IdleTimeout: 2 * 60 * time.Second,
@@ -283,48 +343,12 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnc
},
}
var store Store
target := &RedisTarget{
return &RedisTarget{
id: event.TargetID{ID: id, Name: "redis"},
args: args,
pool: pool,
store: store,
loggerOnce: loggerOnce,
}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-redis-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
target.loggerOnce(context.Background(), oErr, target.ID().String())
return target, oErr
}
target.store = store
}
conn := target.pool.Get()
defer conn.Close()
_, pingErr := conn.Do("PING")
if pingErr != nil {
if target.store == nil || !(IsConnRefusedErr(pingErr) || IsConnResetErr(pingErr)) {
target.loggerOnce(context.Background(), pingErr, target.ID().String())
return target, pingErr
}
} else {
if err := target.args.validateFormat(conn); err != nil {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
}
target.firstPing = true
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil
quitCh: make(chan struct{}),
}, nil
}

View File

@@ -47,7 +47,7 @@ type Store interface {
}
// replayEvents - Reads the events from the store and replays.
func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce logger.LogOnce, id event.TargetID) <-chan string {
func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce logger.LogOnce, id string) <-chan string {
eventKeyCh := make(chan string)
go func() {
@@ -59,7 +59,7 @@ func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce logger.LogOnce
for {
names, err := store.List()
if err != nil {
loggerOnce(context.Background(), fmt.Errorf("eventStore.List() failed with: %w", err), id.String())
loggerOnce(context.Background(), fmt.Errorf("eventStore.List() failed with: %w", err), id)
} else {
for _, name := range names {
select {
@@ -141,3 +141,12 @@ func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan str
}
}
}
func streamEventsFromStore(store Store, target event.Target, doneCh <-chan struct{}, loggerOnce logger.LogOnce) {
go func() {
// Replays the events from the store.
eventKeyCh := replayEvents(store, doneCh, loggerOnce, target.ID().String())
// Send events from the store.
sendEvents(target, eventKeyCh, doneCh, loggerOnce)
}()
}

View File

@@ -90,25 +90,31 @@ func (w WebhookArgs) Validate() error {
// WebhookTarget - Webhook target.
type WebhookTarget struct {
lazyInit lazyInit
id event.TargetID
args WebhookArgs
transport *http.Transport
httpClient *http.Client
store Store
loggerOnce logger.LogOnce
quitCh chan struct{}
}
// ID - returns target ID.
func (target WebhookTarget) ID() event.TargetID {
func (target *WebhookTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *WebhookTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *WebhookTarget) IsActive() (bool, error) {
if err := target.init(); err != nil {
return false, err
}
return target.isActive()
}
func (target *WebhookTarget) isActive() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
@@ -143,6 +149,10 @@ func (target *WebhookTarget) IsActive() (bool, error) {
// Save - saves the events to the store if queuestore is configured,
// which will be replayed when the webhook connection is active.
func (target *WebhookTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
@@ -205,6 +215,10 @@ func (target *WebhookTarget) send(eventData event.Event) error {
// Send - reads an event from store and sends it to webhook.
func (target *WebhookTarget) Send(eventKey string) error {
if err := target.init(); err != nil {
return err
}
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
@@ -228,52 +242,60 @@ func (target *WebhookTarget) Send(eventKey string) error {
// Close - does nothing and available for interface compatibility.
func (target *WebhookTarget) Close() error {
close(target.quitCh)
return nil
}
// NewWebhookTarget - creates new Webhook target.
func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOnce logger.LogOnce, transport *http.Transport, test bool) (*WebhookTarget, error) {
var store Store
target := &WebhookTarget{
id: event.TargetID{ID: id, Name: "webhook"},
args: args,
loggerOnce: loggerOnce,
}
func (target *WebhookTarget) init() error {
return target.lazyInit.Do(target.initWebhook)
}
if target.args.ClientCert != "" && target.args.ClientKey != "" {
manager, err := certs.NewManager(ctx, target.args.ClientCert, target.args.ClientKey, tls.LoadX509KeyPair)
// Only called from init()
func (target *WebhookTarget) initWebhook() error {
args := target.args
transport := target.transport
if args.ClientCert != "" && args.ClientKey != "" {
manager, err := certs.NewManager(context.Background(), args.ClientCert, args.ClientKey, tls.LoadX509KeyPair)
if err != nil {
return target, err
return err
}
manager.ReloadOnSignal(syscall.SIGHUP) // allow reloads upon SIGHUP
transport.TLSClientConfig.GetClientCertificate = manager.GetClientCertificate
}
target.httpClient = &http.Client{Transport: transport}
yes, err := target.isActive()
if err != nil {
return err
}
if !yes {
return errNotConnected
}
if target.store != nil {
streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce)
}
return nil
}
// NewWebhookTarget - creates new Webhook target.
func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOnce logger.LogOnce, transport *http.Transport) (*WebhookTarget, error) {
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-webhook-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if err := store.Open(); err != nil {
target.loggerOnce(context.Background(), err, target.ID().String())
return target, err
}
target.store = store
}
_, err := target.IsActive()
if err != nil {
if target.store == nil || err != errNotConnected {
target.loggerOnce(ctx, err, target.ID().String())
return target, err
return nil, fmt.Errorf("unable to initialize the queue store of Webhook `%s`: %w", id, err)
}
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, ctx.Done(), target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, ctx.Done(), target.loggerOnce)
}
return target, nil
return &WebhookTarget{
id: event.TargetID{ID: id, Name: "webhook"},
args: args,
loggerOnce: loggerOnce,
transport: transport,
store: store,
quitCh: make(chan struct{}),
}, nil
}