From 21251d8c220dbe665d2826ab4a2cd283150aa20c Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 7 Nov 2022 08:01:24 -0800 Subject: [PATCH] initialize streaming events without lazy initialization (#16016) --- internal/event/target/amqp.go | 13 ++++++++----- internal/event/target/elasticsearch.go | 13 ++++++++----- internal/event/target/kafka.go | 13 ++++++++----- internal/event/target/mqtt.go | 13 ++++++++----- internal/event/target/mysql.go | 13 ++++++++----- internal/event/target/nats.go | 13 ++++++++----- internal/event/target/nsq.go | 13 ++++++++----- internal/event/target/postgresql.go | 13 ++++++++----- internal/event/target/redis.go | 13 ++++++++----- internal/event/target/webhook.go | 15 +++++++++------ 10 files changed, 81 insertions(+), 51 deletions(-) diff --git a/internal/event/target/amqp.go b/internal/event/target/amqp.go index 357043fce..357e7a1bc 100644 --- a/internal/event/target/amqp.go +++ b/internal/event/target/amqp.go @@ -332,9 +332,6 @@ func (target *AMQPTarget) initAMQP() error { } target.conn = conn - if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) - } return nil } @@ -349,11 +346,17 @@ func NewAMQPTarget(id string, args AMQPArgs, loggerOnce logger.LogOnce) (*AMQPTa } } - return &AMQPTarget{ + target := &AMQPTarget{ id: event.TargetID{ID: id, Name: "amqp"}, args: args, loggerOnce: loggerOnce, store: store, quitCh: make(chan struct{}), - }, nil + } + + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + + return target, nil } diff --git a/internal/event/target/elasticsearch.go b/internal/event/target/elasticsearch.go index 4ccdc6cc4..0a0a31dae 100644 --- a/internal/event/target/elasticsearch.go +++ b/internal/event/target/elasticsearch.go @@ -349,9 +349,6 @@ func (target *ElasticsearchTarget) initElasticsearch() error { return err } - if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) - } return nil } @@ -366,13 +363,19 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, loggerOnce logger } } - return &ElasticsearchTarget{ + target := &ElasticsearchTarget{ id: event.TargetID{ID: id, Name: "elasticsearch"}, args: args, store: store, loggerOnce: loggerOnce, quitCh: make(chan struct{}), - }, nil + } + + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + + return target, nil } // ES Client definitions and methods diff --git a/internal/event/target/kafka.go b/internal/event/target/kafka.go index 7ed59f200..06adbfcb2 100644 --- a/internal/event/target/kafka.go +++ b/internal/event/target/kafka.go @@ -329,9 +329,6 @@ func (target *KafkaTarget) initKafka() error { return errNotConnected } - if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) - } return nil } @@ -346,11 +343,17 @@ func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*Kafk } } - return &KafkaTarget{ + target := &KafkaTarget{ id: event.TargetID{ID: id, Name: "kafka"}, args: args, store: store, loggerOnce: loggerOnce, quitCh: make(chan struct{}), - }, nil + } + + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + + return target, nil } diff --git a/internal/event/target/mqtt.go b/internal/event/target/mqtt.go index 72680786c..960c48b77 100644 --- a/internal/event/target/mqtt.go +++ b/internal/event/target/mqtt.go @@ -254,9 +254,6 @@ func (target *MQTTTarget) initMQTT() error { return errNotConnected } - if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) - } return nil } @@ -281,11 +278,17 @@ func NewMQTTTarget(id string, args MQTTArgs, loggerOnce logger.LogOnce) (*MQTTTa } } - return &MQTTTarget{ + target := &MQTTTarget{ id: event.TargetID{ID: id, Name: "mqtt"}, args: args, store: store, quitCh: make(chan struct{}), loggerOnce: loggerOnce, - }, nil + } + + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + + return target, nil } diff --git a/internal/event/target/mysql.go b/internal/event/target/mysql.go index bae803148..8e2fb31d9 100644 --- a/internal/event/target/mysql.go +++ b/internal/event/target/mysql.go @@ -382,9 +382,6 @@ func (target *MySQLTarget) initMySQL() error { return errNotConnected } - if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) - } return nil } @@ -413,12 +410,18 @@ func NewMySQLTarget(id string, args MySQLArgs, loggerOnce logger.LogOnce) (*MySQ args.DSN = config.FormatDSN() } - return &MySQLTarget{ + target := &MySQLTarget{ id: event.TargetID{ID: id, Name: "mysql"}, args: args, firstPing: false, store: store, loggerOnce: loggerOnce, quitCh: make(chan struct{}), - }, nil + } + + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + + return target, nil } diff --git a/internal/event/target/nats.go b/internal/event/target/nats.go index 6f03fc9ad..4c9f0b1db 100644 --- a/internal/event/target/nats.go +++ b/internal/event/target/nats.go @@ -410,9 +410,6 @@ func (target *NATSTarget) initNATS() error { return errNotConnected } - if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) - } return nil } @@ -427,11 +424,17 @@ func NewNATSTarget(id string, args NATSArgs, loggerOnce logger.LogOnce) (*NATSTa } } - return &NATSTarget{ + target := &NATSTarget{ id: event.TargetID{ID: id, Name: "nats"}, args: args, loggerOnce: loggerOnce, store: store, quitCh: make(chan struct{}), - }, nil + } + + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + + return target, nil } diff --git a/internal/event/target/nsq.go b/internal/event/target/nsq.go index ecebd077e..261958def 100644 --- a/internal/event/target/nsq.go +++ b/internal/event/target/nsq.go @@ -244,9 +244,6 @@ func (target *NSQTarget) initNSQ() error { return errNotConnected } - if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) - } return nil } @@ -261,11 +258,17 @@ func NewNSQTarget(id string, args NSQArgs, loggerOnce logger.LogOnce) (*NSQTarge } } - return &NSQTarget{ + target := &NSQTarget{ id: event.TargetID{ID: id, Name: "nsq"}, args: args, loggerOnce: loggerOnce, store: store, quitCh: make(chan struct{}), - }, nil + } + + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + + return target, nil } diff --git a/internal/event/target/postgresql.go b/internal/event/target/postgresql.go index b5eb43cce..a74b52ac1 100644 --- a/internal/event/target/postgresql.go +++ b/internal/event/target/postgresql.go @@ -376,9 +376,6 @@ func (target *PostgreSQLTarget) initPostgreSQL() error { return errNotConnected } - if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) - } return nil } @@ -414,7 +411,7 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, loggerOnce logger.LogOn } } - return &PostgreSQLTarget{ + target := &PostgreSQLTarget{ id: event.TargetID{ID: id, Name: "postgresql"}, args: args, firstPing: false, @@ -422,5 +419,11 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, loggerOnce logger.LogOn connString: connStr, loggerOnce: loggerOnce, quitCh: make(chan struct{}), - }, nil + } + + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + + return target, nil } diff --git a/internal/event/target/redis.go b/internal/event/target/redis.go index fcdedcae6..d448d0d9b 100644 --- a/internal/event/target/redis.go +++ b/internal/event/target/redis.go @@ -296,9 +296,6 @@ func (target *RedisTarget) initRedis() error { return errNotConnected } - if target.store != nil { - streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) - } return nil } @@ -343,12 +340,18 @@ func NewRedisTarget(id string, args RedisArgs, loggerOnce logger.LogOnce) (*Redi }, } - return &RedisTarget{ + target := &RedisTarget{ id: event.TargetID{ID: id, Name: "redis"}, args: args, pool: pool, store: store, loggerOnce: loggerOnce, quitCh: make(chan struct{}), - }, nil + } + + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + + return target, nil } diff --git a/internal/event/target/webhook.go b/internal/event/target/webhook.go index 88c94047e..c2156bf08 100644 --- a/internal/event/target/webhook.go +++ b/internal/event/target/webhook.go @@ -179,7 +179,7 @@ func (target *WebhookTarget) send(eventData event.Event) error { return err } - req, err := http.NewRequest("POST", target.args.Endpoint.String(), bytes.NewReader(data)) + req, err := http.NewRequest(http.MethodPost, target.args.Endpoint.String(), bytes.NewReader(data)) if err != nil { return err } @@ -272,9 +272,6 @@ func (target *WebhookTarget) initWebhook() error { return errNotConnected } - if target.store != nil { - streamEventsFromStore(target.store, target, target.cancelCh, target.loggerOnce) - } return nil } @@ -292,7 +289,7 @@ func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOn } } - return &WebhookTarget{ + target := &WebhookTarget{ id: event.TargetID{ID: id, Name: "webhook"}, args: args, loggerOnce: loggerOnce, @@ -300,5 +297,11 @@ func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOn store: store, cancel: cancel, cancelCh: ctx.Done(), - }, nil + } + + if target.store != nil { + streamEventsFromStore(target.store, target, target.cancelCh, target.loggerOnce) + } + + return target, nil }