From 86bb48792c1ee52e9b2fde33c8ed211ab8ecaf13 Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Wed, 28 Sep 2022 01:23:28 +0100 Subject: [PATCH] non-blocking initialization of bucket target notifications (#15571) --- cmd/admin-handlers.go | 14 -- cmd/config-current.go | 9 +- cmd/event-notification.go | 41 ++-- cmd/globals.go | 2 - cmd/server-main.go | 4 +- cmd/test-utils_test.go | 2 + internal/config/notify/parse.go | 285 ++++++------------------- internal/event/target/amqp.go | 85 ++++---- internal/event/target/elasticsearch.go | 80 ++++--- internal/event/target/kafka.go | 102 +++++---- internal/event/target/lazyinit.go | 50 +++++ internal/event/target/mqtt.go | 138 ++++++------ internal/event/target/mysql.go | 153 +++++++------ internal/event/target/nats.go | 108 ++++++---- internal/event/target/nsq.go | 108 ++++++---- internal/event/target/postgresql.go | 146 +++++++------ internal/event/target/redis.go | 118 ++++++---- internal/event/target/store.go | 13 +- internal/event/target/webhook.go | 94 ++++---- internal/event/targetlist.go | 1 - internal/event/targetlist_test.go | 6 +- 21 files changed, 827 insertions(+), 732 deletions(-) create mode 100644 internal/event/target/lazyinit.go diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index d88ae40cf..3c52a83ba 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -2555,20 +2555,6 @@ func fetchLambdaInfo() []map[string][]madmin.TargetIDStatus { lambdaMap[targetID.Name] = list } - for _, tgt := range globalEnvTargetList.Targets() { - targetIDStatus := make(map[string]madmin.Status) - active, _ := tgt.IsActive() - targetID := tgt.ID() - if active { - targetIDStatus[targetID.ID] = madmin.Status{Status: string(madmin.ItemOnline)} - } else { - targetIDStatus[targetID.ID] = madmin.Status{Status: string(madmin.ItemOffline)} - } - list := lambdaMap[targetID.Name] - list = append(list, targetIDStatus) - lambdaMap[targetID.Name] = list - } - notify := make([]map[string][]madmin.TargetIDStatus, len(lambdaMap)) counter := 0 for key, value := range lambdaMap { diff --git a/cmd/config-current.go b/cmd/config-current.go index 06e59ba8a..8404d7f42 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -388,7 +388,7 @@ func validateSubSysConfig(s config.Config, subSys string, objAPI ObjectLayer) er } if config.NotifySubSystems.Contains(subSys) { - if err := notify.TestSubSysNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), globalEventNotifier.ConfiguredTargetIDs(), subSys); err != nil { + if err := notify.TestSubSysNotificationTargets(GlobalContext, s, subSys, NewGatewayHTTPTransport()); err != nil { return err } } @@ -547,12 +547,7 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) { transport := NewGatewayHTTPTransport() - globalConfigTargetList, err = notify.GetNotificationTargets(GlobalContext, s, transport, false) - if err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to initialize notification target(s): %w", err)) - } - - globalEnvTargetList, err = notify.GetNotificationTargets(GlobalContext, newServerConfig(), transport, true) + globalConfigTargetList, err = notify.FetchEnabledTargets(GlobalContext, s, transport) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to initialize notification target(s): %w", err)) } diff --git a/cmd/event-notification.go b/cmd/event-notification.go index fde2cdd8b..267309d9f 100644 --- a/cmd/event-notification.go +++ b/cmd/event-notification.go @@ -19,6 +19,7 @@ package cmd import ( "context" + "errors" "fmt" "net/url" "strings" @@ -38,16 +39,18 @@ type EventNotifier struct { targetResCh chan event.TargetIDResult bucketRulesMap map[string]event.RulesMap bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap + eventsQueue chan eventArgs } // NewEventNotifier - creates new event notification object. func NewEventNotifier() *EventNotifier { - // targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.Init() + // targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.InitBucketTargets() return &EventNotifier{ targetList: event.NewTargetList(), targetResCh: make(chan event.TargetIDResult), bucketRulesMap: make(map[string]event.RulesMap), bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap), + eventsQueue: make(chan eventArgs, 10000), } } @@ -64,7 +67,7 @@ func (evnot *EventNotifier) GetARNList(onlyActive bool) []string { // This list is only meant for external targets, filter // this out pro-actively. if !strings.HasPrefix(targetID.ID, "httpclient+") { - if onlyActive && !target.HasQueueStore() { + if onlyActive { if _, err := target.IsActive(); err != nil { continue } @@ -91,7 +94,7 @@ func (evnot *EventNotifier) set(bucket BucketInfo, meta BucketMetadata) { evnot.AddRulesMap(bucket.Name, config.ToRulesMap()) } -// InitBucketTargets - initializes notification system from notification.xml of all buckets. +// InitBucketTargets - initializes event notification system from notification.xml of all buckets. func (evnot *EventNotifier) InitBucketTargets(ctx context.Context, objAPI ObjectLayer) error { if objAPI == nil { return errServerNotInitialized @@ -102,7 +105,15 @@ func (evnot *EventNotifier) InitBucketTargets(ctx context.Context, objAPI Object return nil } - logger.LogIf(ctx, evnot.targetList.Add(globalConfigTargetList.Targets()...)) + if err := evnot.targetList.Add(globalConfigTargetList.Targets()...); err != nil { + return err + } + + go func() { + for e := range evnot.eventsQueue { + evnot.send(e) + } + }() go func() { for res := range evnot.targetResCh { @@ -166,14 +177,8 @@ func (evnot *EventNotifier) ConfiguredTargetIDs() []event.TargetID { } } } - // Filter out targets configured via env - var tIDs []event.TargetID - for _, targetID := range targetIDs { - if !globalEnvTargetList.Exists(targetID) { - tIDs = append(tIDs, targetID) - } - } - return tIDs + + return targetIDs } // RemoveNotification - removes all notification configuration for bucket name. @@ -207,8 +212,18 @@ func (evnot *EventNotifier) RemoveAllRemoteTargets() { } } -// Send - sends event data to all matching targets. +// Send - sends the event to all registered notification targets func (evnot *EventNotifier) Send(args eventArgs) { + select { + case evnot.eventsQueue <- args: + default: + // A new goroutine is created for each notification job, eventsQueue is + // drained quickly and is not expected to be filled with any scenario. + logger.LogIf(context.Background(), errors.New("internal events queue unexpectedly full")) + } +} + +func (evnot *EventNotifier) send(args eventArgs) { evnot.RLock() targetIDSet := evnot.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name) evnot.RUnlock() diff --git a/cmd/globals.go b/cmd/globals.go index 13afe315b..7de8a0567 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -192,8 +192,6 @@ var ( globalEventNotifier *EventNotifier globalConfigTargetList *event.TargetList - // globalEnvTargetList has list of targets configured via env. - globalEnvTargetList *event.TargetList globalBucketMetadataSys *BucketMetadataSys globalBucketMonitor *bandwidth.Monitor diff --git a/cmd/server-main.go b/cmd/server-main.go index f8cf8a7aa..808355f14 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -638,8 +638,8 @@ func serverMain(ctx *cli.Context) { // Initialize site replication manager. globalSiteReplicationSys.Init(GlobalContext, newObject) - // Initialize bucket notification targets. - globalEventNotifier.InitBucketTargets(GlobalContext, newObject) + // Initialize bucket notification system + logger.LogIf(GlobalContext, globalEventNotifier.InitBucketTargets(GlobalContext, newObject)) // initialize the new disk cache objects. if globalCacheConfig.Enabled { diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index dccffaba2..18746bdd1 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -373,6 +373,8 @@ func initTestServerWithBackend(ctx context.Context, t TestErrHandler, testServer globalIAMSys.Init(ctx, objLayer, globalEtcdClient, 2*time.Second) + globalEventNotifier.InitBucketTargets(ctx, objLayer) + return testServer } diff --git a/internal/config/notify/parse.go b/internal/config/notify/parse.go index 60b09e121..be750f3f9 100644 --- a/internal/config/notify/parse.go +++ b/internal/config/notify/parse.go @@ -42,361 +42,210 @@ const ( // ErrTargetsOffline - Indicates single/multiple target failures. var ErrTargetsOffline = errors.New("one or more targets are offline. Please use `mc admin info --json` to check the offline targets") -// TestNotificationTargets is similar to GetNotificationTargets() -// avoids explicit registration. -func TestNotificationTargets(ctx context.Context, cfg config.Config, transport *http.Transport, targetIDs []event.TargetID) error { - test := true - returnOnTargetError := true - targets, err := RegisterNotificationTargets(ctx, cfg, transport, targetIDs, test, returnOnTargetError) - if err == nil { - // Close all targets since we are only testing connections. - for _, t := range targets.TargetMap() { - _ = t.Close() - } - } - - return err -} - // TestSubSysNotificationTargets - tests notification targets of given subsystem -func TestSubSysNotificationTargets(ctx context.Context, cfg config.Config, transport *http.Transport, targetIDs []event.TargetID, subSys string) error { +func TestSubSysNotificationTargets(ctx context.Context, cfg config.Config, subSys string, transport *http.Transport) error { if err := checkValidNotificationKeysForSubSys(subSys, cfg[subSys]); err != nil { return err } - targetList := event.NewTargetList() - targetsOffline, err := fetchSubSysTargets(ctx, cfg, transport, true, true, subSys, targetList) - if err == nil { - // Close all targets since we are only testing connections. - for _, t := range targetList.TargetMap() { - _ = t.Close() - } - } - if targetsOffline { - return ErrTargetsOffline - } - - return err -} - -// GetNotificationTargets registers and initializes all notification -// targets, returns error if any. -func GetNotificationTargets(ctx context.Context, cfg config.Config, transport *http.Transport, test bool) (*event.TargetList, error) { - returnOnTargetError := false - return RegisterNotificationTargets(ctx, cfg, transport, nil, test, returnOnTargetError) -} - -// RegisterNotificationTargets - returns TargetList which contains enabled targets in serverConfig. -// A new notification target is added like below -// * Add a new target in pkg/event/target package. -// * Add newly added target configuration to serverConfig.Notify.. -// * Handle the configuration in this function to create/add into TargetList. -func RegisterNotificationTargets(ctx context.Context, cfg config.Config, transport *http.Transport, targetIDs []event.TargetID, test bool, returnOnTargetError bool) (*event.TargetList, error) { - targetList, err := FetchRegisteredTargets(ctx, cfg, transport, test, returnOnTargetError) + targetList, err := fetchSubSysTargets(ctx, cfg, subSys, transport) if err != nil { - return targetList, err + return err } - if test { - // Verify if user is trying to disable already configured - // notification targets, based on their target IDs - for _, targetID := range targetIDs { - if !targetList.Exists(targetID) { - return nil, config.Errorf( - "Unable to disable currently configured targets '%v'", - targetID) - } + for _, target := range targetList { + defer target.Close() + } + + for _, target := range targetList { + yes, err := target.IsActive() + if err != nil || !yes { + return ErrTargetsOffline } } - return targetList, nil + return nil } -func fetchSubSysTargets(ctx context.Context, cfg config.Config, - transport *http.Transport, test bool, returnOnTargetError bool, - subSys string, targetList *event.TargetList, -) (targetsOffline bool, err error) { - targetsOffline = false +func fetchSubSysTargets(ctx context.Context, cfg config.Config, subSys string, transport *http.Transport) (targets []event.Target, err error) { if err := checkValidNotificationKeysForSubSys(subSys, cfg[subSys]); err != nil { - return targetsOffline, err + return nil, err } switch subSys { case config.NotifyAMQPSubSys: amqpTargets, err := GetNotifyAMQP(cfg[config.NotifyAMQPSubSys]) if err != nil { - return targetsOffline, err + return nil, err } for id, args := range amqpTargets { if !args.Enable { continue } - newTarget, err := target.NewAMQPTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + t, err := target.NewAMQPTarget(id, args, logger.LogOnceIf) if err != nil { - targetsOffline = true - if returnOnTargetError { - return targetsOffline, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return targetsOffline, err - } + return nil, err } + targets = append(targets, t) } case config.NotifyESSubSys: esTargets, err := GetNotifyES(cfg[config.NotifyESSubSys], transport) if err != nil { - return targetsOffline, err + return nil, err } for id, args := range esTargets { if !args.Enable { continue } - newTarget, err := target.NewElasticsearchTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + t, err := target.NewElasticsearchTarget(id, args, logger.LogOnceIf) if err != nil { - targetsOffline = true - if returnOnTargetError { - return targetsOffline, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return targetsOffline, err - } + return nil, err } + targets = append(targets, t) + } case config.NotifyKafkaSubSys: kafkaTargets, err := GetNotifyKafka(cfg[config.NotifyKafkaSubSys]) if err != nil { - return targetsOffline, err + return nil, err } for id, args := range kafkaTargets { if !args.Enable { continue } args.TLS.RootCAs = transport.TLSClientConfig.RootCAs - newTarget, err := target.NewKafkaTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + t, err := target.NewKafkaTarget(id, args, logger.LogOnceIf) if err != nil { - targetsOffline = true - if returnOnTargetError { - return targetsOffline, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return targetsOffline, err - } + return nil, err } + targets = append(targets, t) + } case config.NotifyMQTTSubSys: mqttTargets, err := GetNotifyMQTT(cfg[config.NotifyMQTTSubSys], transport.TLSClientConfig.RootCAs) if err != nil { - return targetsOffline, err + return nil, err } for id, args := range mqttTargets { if !args.Enable { continue } args.RootCAs = transport.TLSClientConfig.RootCAs - newTarget, err := target.NewMQTTTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + t, err := target.NewMQTTTarget(id, args, logger.LogOnceIf) if err != nil { - targetsOffline = true - if returnOnTargetError { - return targetsOffline, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return targetsOffline, err - } + return nil, err } + targets = append(targets, t) } case config.NotifyMySQLSubSys: mysqlTargets, err := GetNotifyMySQL(cfg[config.NotifyMySQLSubSys]) if err != nil { - return targetsOffline, err + return nil, err } for id, args := range mysqlTargets { if !args.Enable { continue } - newTarget, err := target.NewMySQLTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + t, err := target.NewMySQLTarget(id, args, logger.LogOnceIf) if err != nil { - targetsOffline = true - if returnOnTargetError { - return targetsOffline, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return targetsOffline, err - } + return nil, err } + targets = append(targets, t) } case config.NotifyNATSSubSys: natsTargets, err := GetNotifyNATS(cfg[config.NotifyNATSSubSys], transport.TLSClientConfig.RootCAs) if err != nil { - return targetsOffline, err + return nil, err } for id, args := range natsTargets { if !args.Enable { continue } - newTarget, err := target.NewNATSTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + t, err := target.NewNATSTarget(id, args, logger.LogOnceIf) if err != nil { - targetsOffline = true - if returnOnTargetError { - return targetsOffline, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return targetsOffline, err - } + return nil, err } + targets = append(targets, t) } case config.NotifyNSQSubSys: nsqTargets, err := GetNotifyNSQ(cfg[config.NotifyNSQSubSys]) if err != nil { - return targetsOffline, err + return nil, err } for id, args := range nsqTargets { if !args.Enable { continue } - newTarget, err := target.NewNSQTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + t, err := target.NewNSQTarget(id, args, logger.LogOnceIf) if err != nil { - targetsOffline = true - if returnOnTargetError { - return targetsOffline, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return targetsOffline, err - } + return nil, err } + targets = append(targets, t) } case config.NotifyPostgresSubSys: postgresTargets, err := GetNotifyPostgres(cfg[config.NotifyPostgresSubSys]) if err != nil { - return targetsOffline, err + return nil, err } for id, args := range postgresTargets { if !args.Enable { continue } - newTarget, err := target.NewPostgreSQLTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + t, err := target.NewPostgreSQLTarget(id, args, logger.LogOnceIf) if err != nil { - targetsOffline = true - if returnOnTargetError { - return targetsOffline, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return targetsOffline, err - } + return nil, err } + targets = append(targets, t) } case config.NotifyRedisSubSys: redisTargets, err := GetNotifyRedis(cfg[config.NotifyRedisSubSys]) if err != nil { - return targetsOffline, err + return nil, err } for id, args := range redisTargets { if !args.Enable { continue } - newTarget, err := target.NewRedisTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + t, err := target.NewRedisTarget(id, args, logger.LogOnceIf) if err != nil { - targetsOffline = true - if returnOnTargetError { - return targetsOffline, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return targetsOffline, err - } + return nil, err } + targets = append(targets, t) } case config.NotifyWebhookSubSys: webhookTargets, err := GetNotifyWebhook(cfg[config.NotifyWebhookSubSys], transport) if err != nil { - return targetsOffline, err + return nil, err } for id, args := range webhookTargets { if !args.Enable { continue } - newTarget, err := target.NewWebhookTarget(ctx, id, args, logger.LogOnceIf, transport, test) + t, err := target.NewWebhookTarget(ctx, id, args, logger.LogOnceIf, transport) if err != nil { - targetsOffline = true - if returnOnTargetError { - return targetsOffline, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return targetsOffline, err - } + return nil, err } + targets = append(targets, t) } - } - return targetsOffline, nil + return targets, nil } -// FetchRegisteredTargets - Returns a set of configured TargetList -// If `returnOnTargetError` is set to true, The function returns when a target initialization fails -// Else, the function will return a complete TargetList irrespective of errors -func FetchRegisteredTargets(ctx context.Context, cfg config.Config, transport *http.Transport, test bool, returnOnTargetError bool) (_ *event.TargetList, err error) { +// FetchEnabledTargets - Returns a set of configured TargetList +func FetchEnabledTargets(ctx context.Context, cfg config.Config, transport *http.Transport) (_ *event.TargetList, err error) { targetList := event.NewTargetList() - var targetsOffline bool - - defer func() { - // Automatically close all connections to targets when an error occur. - // Close all the targets if returnOnTargetError is set - // Else, close only the failed targets - if err != nil && returnOnTargetError { - for _, t := range targetList.TargetMap() { - _ = t.Close() + for _, subSys := range config.NotifySubSystems.ToSlice() { + targets, err := fetchSubSysTargets(ctx, cfg, subSys, transport) + if err != nil { + return nil, err + } + for _, t := range targets { + if err = targetList.Add(t); err != nil { + return nil, err } } - }() - - for _, subSys := range config.NotifySubSystems.ToSlice() { - if targetsOffline, err = fetchSubSysTargets(ctx, cfg, transport, test, returnOnTargetError, subSys, targetList); err != nil { - return targetList, err - } - if targetsOffline { - return targetList, ErrTargetsOffline - } } - return targetList, nil } diff --git a/internal/event/target/amqp.go b/internal/event/target/amqp.go index ad7a11f65..357043fce 100644 --- a/internal/event/target/amqp.go +++ b/internal/event/target/amqp.go @@ -111,12 +111,16 @@ func (a *AMQPArgs) Validate() error { // AMQPTarget - AMQP target type AMQPTarget struct { + lazyInit lazyInit + id event.TargetID args AMQPArgs conn *amqp.Connection connMutex sync.Mutex store Store loggerOnce logger.LogOnce + + quitCh chan struct{} } // ID - returns TargetID. @@ -126,6 +130,14 @@ func (target *AMQPTarget) ID() event.TargetID { // IsActive - Return true if target is up and active func (target *AMQPTarget) IsActive() (bool, error) { + if err := target.init(); err != nil { + return false, err + } + + return target.isActive() +} + +func (target *AMQPTarget) isActive() (bool, error) { ch, _, err := target.channel() if err != nil { return false, err @@ -136,11 +148,6 @@ func (target *AMQPTarget) IsActive() (bool, error) { return true, nil } -// HasQueueStore - Checks if the queueStore has been configured for the target -func (target *AMQPTarget) HasQueueStore() bool { - return target.store != nil -} - func (target *AMQPTarget) channel() (*amqp.Channel, chan amqp.Confirmation, error) { var err error var conn *amqp.Connection @@ -256,6 +263,10 @@ func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel, confirms // Save - saves the events to the store which will be replayed when the amqp connection is active. func (target *AMQPTarget) Save(eventData event.Event) error { + if err := target.init(); err != nil { + return err + } + if target.store != nil { return target.store.Put(eventData) } @@ -270,6 +281,10 @@ func (target *AMQPTarget) Save(eventData event.Event) error { // Send - sends event to AMQP. func (target *AMQPTarget) Send(eventKey string) error { + if err := target.init(); err != nil { + return err + } + ch, confirms, err := target.channel() if err != nil { return err @@ -296,51 +311,49 @@ func (target *AMQPTarget) Send(eventKey string) error { // Close - does nothing and available for interface compatibility. func (target *AMQPTarget) Close() error { + close(target.quitCh) if target.conn != nil { return target.conn.Close() } return nil } -// NewAMQPTarget - creates new AMQP target. -func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*AMQPTarget, error) { - var conn *amqp.Connection - var err error +func (target *AMQPTarget) init() error { + return target.lazyInit.Do(target.initAMQP) +} - var store Store - - target := &AMQPTarget{ - id: event.TargetID{ID: id, Name: "amqp"}, - args: args, - loggerOnce: loggerOnce, - } - - if args.QueueDir != "" { - queueDir := filepath.Join(args.QueueDir, storePrefix+"-amqp-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if oErr := store.Open(); oErr != nil { - target.loggerOnce(context.Background(), oErr, target.ID().String()) - return target, oErr - } - target.store = store - } - - conn, err = amqp.Dial(args.URL.String()) +func (target *AMQPTarget) initAMQP() error { + conn, err := amqp.Dial(target.args.URL.String()) if err != nil { - if store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) { + if IsConnRefusedErr(err) || IsConnResetErr(err) { target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err } + return err } target.conn = conn - if target.store != nil && !test { - // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + return nil +} - // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) +// NewAMQPTarget - creates new AMQP target. +func NewAMQPTarget(id string, args AMQPArgs, loggerOnce logger.LogOnce) (*AMQPTarget, error) { + var store Store + if args.QueueDir != "" { + queueDir := filepath.Join(args.QueueDir, storePrefix+"-amqp-"+id) + store = NewQueueStore(queueDir, args.QueueLimit) + if err := store.Open(); err != nil { + return nil, fmt.Errorf("unable to initialize the queue store of AMQP `%s`: %w", id, err) + } } - return target, nil + return &AMQPTarget{ + id: event.TargetID{ID: id, Name: "amqp"}, + args: args, + loggerOnce: loggerOnce, + store: store, + quitCh: make(chan struct{}), + }, nil } diff --git a/internal/event/target/elasticsearch.go b/internal/event/target/elasticsearch.go index 610216a0a..4ccdc6cc4 100644 --- a/internal/event/target/elasticsearch.go +++ b/internal/event/target/elasticsearch.go @@ -152,11 +152,14 @@ func (a ElasticsearchArgs) Validate() error { // ElasticsearchTarget - Elasticsearch target. type ElasticsearchTarget struct { + lazyInit lazyInit + id event.TargetID args ElasticsearchArgs client esClient store Store loggerOnce logger.LogOnce + quitCh chan struct{} } // ID - returns target ID. @@ -164,13 +167,15 @@ func (target *ElasticsearchTarget) ID() event.TargetID { return target.id } -// HasQueueStore - Checks if the queueStore has been configured for the target -func (target *ElasticsearchTarget) HasQueueStore() bool { - return target.store != nil -} - // IsActive - Return true if target is up and active func (target *ElasticsearchTarget) IsActive() (bool, error) { + if err := target.init(); err != nil { + return false, err + } + return target.isActive() +} + +func (target *ElasticsearchTarget) isActive() (bool, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -184,6 +189,10 @@ func (target *ElasticsearchTarget) IsActive() (bool, error) { // Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active. func (target *ElasticsearchTarget) Save(eventData event.Event) error { + if err := target.init(); err != nil { + return err + } + if target.store != nil { return target.store.Put(eventData) } @@ -246,6 +255,10 @@ func (target *ElasticsearchTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to Elasticsearch. func (target *ElasticsearchTarget) Send(eventKey string) error { + if err := target.init(); err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -277,6 +290,7 @@ func (target *ElasticsearchTarget) Send(eventKey string) error { // Close - does nothing and available for interface compatibility. func (target *ElasticsearchTarget) Close() error { + close(target.quitCh) if target.client != nil { // Stops the background processes that the client is running. target.client.stop() @@ -319,42 +333,46 @@ func (target *ElasticsearchTarget) checkAndInitClient(ctx context.Context) error return nil } -// NewElasticsearchTarget - creates new Elasticsearch target. -func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*ElasticsearchTarget, error) { - target := &ElasticsearchTarget{ - id: event.TargetID{ID: id, Name: "elasticsearch"}, - args: args, - loggerOnce: loggerOnce, - } - - if args.QueueDir != "" { - queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id) - target.store = NewQueueStore(queueDir, args.QueueLimit) - if err := target.store.Open(); err != nil { - target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err - } - } +func (target *ElasticsearchTarget) init() error { + return target.lazyInit.Do(target.initElasticsearch) +} +func (target *ElasticsearchTarget) initElasticsearch() error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() err := target.checkAndInitClient(ctx) if err != nil { - if target.store == nil || err != errNotConnected { + if err != errNotConnected { target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err + } + return err + } + + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + return nil +} + +// NewElasticsearchTarget - creates new Elasticsearch target. +func NewElasticsearchTarget(id string, args ElasticsearchArgs, loggerOnce logger.LogOnce) (*ElasticsearchTarget, error) { + var store Store + if args.QueueDir != "" { + queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id) + store = NewQueueStore(queueDir, args.QueueLimit) + if err := store.Open(); err != nil { + return nil, fmt.Errorf("unable to initialize the queue store of Elasticsearch `%s`: %w", id, err) } } - if target.store != nil && !test { - // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) - // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) - } - - return target, nil + return &ElasticsearchTarget{ + id: event.TargetID{ID: id, Name: "elasticsearch"}, + args: args, + store: store, + loggerOnce: loggerOnce, + quitCh: make(chan struct{}), + }, nil } // ES Client definitions and methods diff --git a/internal/event/target/kafka.go b/internal/event/target/kafka.go index 718dea35d..7ed59f200 100644 --- a/internal/event/target/kafka.go +++ b/internal/event/target/kafka.go @@ -23,6 +23,7 @@ import ( "crypto/x509" "encoding/json" "errors" + "fmt" "net" "net/url" "os" @@ -122,12 +123,15 @@ func (k KafkaArgs) Validate() error { // KafkaTarget - Kafka target. type KafkaTarget struct { + lazyInit lazyInit + id event.TargetID args KafkaArgs producer sarama.SyncProducer config *sarama.Config store Store loggerOnce logger.LogOnce + quitCh chan struct{} } // ID - returns target ID. @@ -135,13 +139,15 @@ func (target *KafkaTarget) ID() event.TargetID { return target.id } -// HasQueueStore - Checks if the queueStore has been configured for the target -func (target *KafkaTarget) HasQueueStore() bool { - return target.store != nil -} - // IsActive - Return true if target is up and active func (target *KafkaTarget) IsActive() (bool, error) { + if err := target.init(); err != nil { + return false, err + } + return target.isActive() +} + +func (target *KafkaTarget) isActive() (bool, error) { if !target.args.pingBrokers() { return false, errNotConnected } @@ -150,10 +156,14 @@ func (target *KafkaTarget) IsActive() (bool, error) { // Save - saves the events to the store which will be replayed when the Kafka connection is active. func (target *KafkaTarget) Save(eventData event.Event) error { + if err := target.init(); err != nil { + return err + } + if target.store != nil { return target.store.Put(eventData) } - _, err := target.IsActive() + _, err := target.isActive() if err != nil { return err } @@ -189,8 +199,12 @@ func (target *KafkaTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to Kafka. func (target *KafkaTarget) Send(eventKey string) error { + if err := target.init(); err != nil { + return err + } + var err error - _, err = target.IsActive() + _, err = target.isActive() if err != nil { return err } @@ -234,6 +248,7 @@ func (target *KafkaTarget) Send(eventKey string) error { // Close - closes underneath kafka connection. func (target *KafkaTarget) Close() error { + close(target.quitCh) if target.producer != nil { return target.producer.Close() } @@ -251,21 +266,19 @@ func (k KafkaArgs) pingBrokers() bool { return false } -// NewKafkaTarget - creates new Kafka target with auth credentials. -func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*KafkaTarget, error) { +func (target *KafkaTarget) init() error { + return target.lazyInit.Do(target.initKafka) +} + +func (target *KafkaTarget) initKafka() error { + args := target.args + config := sarama.NewConfig() - - target := &KafkaTarget{ - id: event.TargetID{ID: id, Name: "kafka"}, - args: args, - loggerOnce: loggerOnce, - } - if args.Version != "" { kafkaVersion, err := sarama.ParseKafkaVersion(args.Version) if err != nil { target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err + return err } config.Version = kafkaVersion } @@ -278,7 +291,7 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc tlsConfig, err := saramatls.NewConfig(args.TLS.ClientTLSCert, args.TLS.ClientTLSKey) if err != nil { target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err + return err } config.Net.TLS.Enable = args.TLS.Enable @@ -298,33 +311,46 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc brokers = append(brokers, broker.String()) } - var store Store - - if args.QueueDir != "" { - queueDir := filepath.Join(args.QueueDir, storePrefix+"-kafka-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if oErr := store.Open(); oErr != nil { - target.loggerOnce(context.Background(), oErr, target.ID().String()) - return target, oErr - } - target.store = store - } - producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { - if store == nil || err != sarama.ErrOutOfBrokers { + if err != sarama.ErrOutOfBrokers { target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err } + target.producer.Close() + return err } target.producer = producer - if target.store != nil && !test { - // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) - // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) + yes, err := target.isActive() + if err != nil { + return err + } + if !yes { + return errNotConnected } - return target, nil + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + return nil +} + +// NewKafkaTarget - creates new Kafka target with auth credentials. +func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*KafkaTarget, error) { + var store Store + if args.QueueDir != "" { + queueDir := filepath.Join(args.QueueDir, storePrefix+"-kafka-"+id) + store = NewQueueStore(queueDir, args.QueueLimit) + if err := store.Open(); err != nil { + return nil, fmt.Errorf("unable to initialize the queue store of Kafka `%s`: %w", id, err) + } + } + + return &KafkaTarget{ + id: event.TargetID{ID: id, Name: "kafka"}, + args: args, + store: store, + loggerOnce: loggerOnce, + quitCh: make(chan struct{}), + }, nil } diff --git a/internal/event/target/lazyinit.go b/internal/event/target/lazyinit.go new file mode 100644 index 000000000..728005b68 --- /dev/null +++ b/internal/event/target/lazyinit.go @@ -0,0 +1,50 @@ +// Copyright (c) 2015-2022 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package target + +import ( + "sync" + "sync/atomic" +) + +// Inspired from Golang sync.Once but it is only marked +// initialized when the provided function returns nil. + +type lazyInit struct { + done uint32 + m sync.Mutex +} + +func (l *lazyInit) Do(f func() error) error { + if atomic.LoadUint32(&l.done) == 0 { + return l.doSlow(f) + } + return nil +} + +func (l *lazyInit) doSlow(f func() error) error { + l.m.Lock() + defer l.m.Unlock() + if atomic.LoadUint32(&l.done) == 0 { + if f() == nil { + // Mark as done only when f() is successful + atomic.StoreUint32(&l.done, 1) + } + } + return nil +} diff --git a/internal/event/target/mqtt.go b/internal/event/target/mqtt.go index bc9ac528c..72680786c 100644 --- a/internal/event/target/mqtt.go +++ b/internal/event/target/mqtt.go @@ -18,7 +18,6 @@ package target import ( - "context" "crypto/tls" "crypto/x509" "encoding/json" @@ -36,7 +35,7 @@ import ( ) const ( - reconnectInterval = 5 // In Seconds + reconnectInterval = 5 * time.Second storePrefix = "minio" ) @@ -107,6 +106,8 @@ func (m MQTTArgs) Validate() error { // MQTTTarget - MQTT target. type MQTTTarget struct { + lazyInit lazyInit + id event.TargetID args MQTTArgs client mqtt.Client @@ -120,13 +121,15 @@ func (target *MQTTTarget) ID() event.TargetID { return target.id } -// HasQueueStore - Checks if the queueStore has been configured for the target -func (target *MQTTTarget) HasQueueStore() bool { - return target.store != nil -} - // IsActive - Return true if target is up and active func (target *MQTTTarget) IsActive() (bool, error) { + if err := target.init(); err != nil { + return false, err + } + return target.isActive() +} + +func (target *MQTTTarget) isActive() (bool, error) { if !target.client.IsConnectionOpen() { return false, errNotConnected } @@ -147,7 +150,7 @@ func (target *MQTTTarget) send(eventData event.Event) error { } token := target.client.Publish(target.args.Topic, target.args.QoS, false, string(data)) - if !token.WaitTimeout(reconnectInterval * time.Second) { + if !token.WaitTimeout(reconnectInterval) { return errNotConnected } return token.Error() @@ -155,8 +158,12 @@ func (target *MQTTTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to MQTT. func (target *MQTTTarget) Send(eventKey string) error { + if err := target.init(); err != nil { + return err + } + // Do not send if the connection is not active. - _, err := target.IsActive() + _, err := target.isActive() if err != nil { return err } @@ -182,12 +189,16 @@ func (target *MQTTTarget) Send(eventKey string) error { // Save - saves the events to the store if queuestore is configured, which will // be replayed when the mqtt connection is active. func (target *MQTTTarget) Save(eventData event.Event) error { + if err := target.init(); err != nil { + return err + } + if target.store != nil { return target.store.Put(eventData) } // Do not send if the connection is not active. - _, err := target.IsActive() + _, err := target.isActive() if err != nil { return err } @@ -202,17 +213,12 @@ func (target *MQTTTarget) Close() error { return nil } -// NewMQTTTarget - creates new MQTT target. -func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*MQTTTarget, error) { - if args.MaxReconnectInterval == 0 { - // Default interval - // https://github.com/eclipse/paho.mqtt.golang/blob/master/options.go#L115 - args.MaxReconnectInterval = 10 * time.Minute - } +func (target *MQTTTarget) init() error { + return target.lazyInit.Do(target.initMQTT) +} - if args.KeepAlive == 0 { - args.KeepAlive = 10 * time.Second - } +func (target *MQTTTarget) initMQTT() error { + args := target.args // Using hex here, to make sure we avoid 23 // character limit on client_id according to @@ -229,61 +235,57 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce SetTLSConfig(&tls.Config{RootCAs: args.RootCAs}). AddBroker(args.Broker.String()) - client := mqtt.NewClient(options) + target.client = mqtt.NewClient(options) - target := &MQTTTarget{ - id: event.TargetID{ID: id, Name: "mqtt"}, - args: args, - client: client, - quitCh: make(chan struct{}), - loggerOnce: loggerOnce, + token := target.client.Connect() + ok := token.WaitTimeout(reconnectInterval) + if !ok { + return errNotConnected + } + if token.Error() != nil { + return token.Error() } - token := client.Connect() - retryRegister := func() { - for { - retry: - select { - case <-doneCh: - return - case <-target.quitCh: - return - default: - ok := token.WaitTimeout(reconnectInterval * time.Second) - if ok && token.Error() != nil { - target.loggerOnce(context.Background(), - fmt.Errorf("Previous connect failed with %w attempting a reconnect", - token.Error()), - target.ID().String()) - time.Sleep(reconnectInterval * time.Second) - token = client.Connect() - goto retry - } - if ok { - // Successfully connected. - return - } - } - } + yes, err := target.isActive() + if err != nil { + return err + } + if !yes { + return errNotConnected } + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + return nil +} + +// NewMQTTTarget - creates new MQTT target. +func NewMQTTTarget(id string, args MQTTArgs, loggerOnce logger.LogOnce) (*MQTTTarget, error) { + if args.MaxReconnectInterval == 0 { + // Default interval + // https://github.com/eclipse/paho.mqtt.golang/blob/master/options.go#L115 + args.MaxReconnectInterval = 10 * time.Minute + } + + if args.KeepAlive == 0 { + args.KeepAlive = 10 * time.Second + } + + var store Store if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id) - target.store = NewQueueStore(queueDir, args.QueueLimit) - if err := target.store.Open(); err != nil { - target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err + store = NewQueueStore(queueDir, args.QueueLimit) + if err := store.Open(); err != nil { + return nil, fmt.Errorf("unable to initialize the queue store of MQTT `%s`: %w", id, err) } - - if !test { - go retryRegister() - // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) - // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) - } - } else if token.Wait() && token.Error() != nil { - return target, token.Error() } - return target, nil + + return &MQTTTarget{ + id: event.TargetID{ID: id, Name: "mqtt"}, + args: args, + store: store, + quitCh: make(chan struct{}), + loggerOnce: loggerOnce, + }, nil } diff --git a/internal/event/target/mysql.go b/internal/event/target/mysql.go index 27e928ee5..bae803148 100644 --- a/internal/event/target/mysql.go +++ b/internal/event/target/mysql.go @@ -145,6 +145,8 @@ func (m MySQLArgs) Validate() error { // MySQLTarget - MySQL target. type MySQLTarget struct { + lazyInit lazyInit + id event.TargetID args MySQLArgs updateStmt *sql.Stmt @@ -154,6 +156,8 @@ type MySQLTarget struct { store Store firstPing bool loggerOnce logger.LogOnce + + quitCh chan struct{} } // ID - returns target ID. @@ -161,24 +165,15 @@ func (target *MySQLTarget) ID() event.TargetID { return target.id } -// HasQueueStore - Checks if the queueStore has been configured for the target -func (target *MySQLTarget) HasQueueStore() bool { - return target.store != nil -} - // IsActive - Return true if target is up and active func (target *MySQLTarget) IsActive() (bool, error) { - if target.db == nil { - db, sErr := sql.Open("mysql", target.args.DSN) - if sErr != nil { - return false, sErr - } - target.db = db - if target.args.MaxOpenConnections > 0 { - // Set the maximum connections limit - target.db.SetMaxOpenConns(target.args.MaxOpenConnections) - } + if err := target.init(); err != nil { + return false, err } + return target.isActive() +} + +func (target *MySQLTarget) isActive() (bool, error) { if err := target.db.Ping(); err != nil { if IsConnErr(err) { return false, errNotConnected @@ -190,10 +185,14 @@ func (target *MySQLTarget) IsActive() (bool, error) { // Save - saves the events to the store which will be replayed when the SQL connection is active. func (target *MySQLTarget) Save(eventData event.Event) error { + if err := target.init(); err != nil { + return err + } + if target.store != nil { return target.store.Put(eventData) } - _, err := target.IsActive() + _, err := target.isActive() if err != nil { return err } @@ -244,7 +243,11 @@ func (target *MySQLTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to MySQL. func (target *MySQLTarget) Send(eventKey string) error { - _, err := target.IsActive() + if err := target.init(); err != nil { + return err + } + + _, err := target.isActive() if err != nil { return err } @@ -281,6 +284,7 @@ func (target *MySQLTarget) Send(eventKey string) error { // Close - closes underneath connections to MySQL database. func (target *MySQLTarget) Close() error { + close(target.quitCh) if target.updateStmt != nil { // FIXME: log returned error. ignore time being. _ = target.updateStmt.Close() @@ -333,8 +337,68 @@ func (target *MySQLTarget) executeStmts() error { return nil } +func (target *MySQLTarget) init() error { + return target.lazyInit.Do(target.initMySQL) +} + +func (target *MySQLTarget) initMySQL() error { + args := target.args + + db, err := sql.Open("mysql", args.DSN) + if err != nil { + target.loggerOnce(context.Background(), err, target.ID().String()) + return err + } + target.db = db + + if args.MaxOpenConnections > 0 { + // Set the maximum connections limit + target.db.SetMaxOpenConns(args.MaxOpenConnections) + } + + err = target.db.Ping() + if err != nil { + if !(IsConnRefusedErr(err) || IsConnResetErr(err)) { + target.loggerOnce(context.Background(), err, target.ID().String()) + } + } else { + if err = target.executeStmts(); err != nil { + target.loggerOnce(context.Background(), err, target.ID().String()) + } else { + target.firstPing = true + } + } + + if err != nil { + target.db.Close() + return err + } + + yes, err := target.isActive() + if err != nil { + return err + } + if !yes { + return errNotConnected + } + + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + return nil +} + // NewMySQLTarget - creates new MySQL target. -func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*MySQLTarget, error) { +func NewMySQLTarget(id string, args MySQLArgs, loggerOnce logger.LogOnce) (*MySQLTarget, error) { + var store Store + if args.QueueDir != "" { + queueDir := filepath.Join(args.QueueDir, storePrefix+"-mysql-"+id) + store = NewQueueStore(queueDir, args.QueueLimit) + if err := store.Open(); err != nil { + return nil, fmt.Errorf("unable to initialize the queue store of MySQL `%s`: %w", id, err) + } + } + if args.DSN == "" { config := mysql.Config{ User: args.User, @@ -349,57 +413,12 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnc args.DSN = config.FormatDSN() } - target := &MySQLTarget{ + return &MySQLTarget{ id: event.TargetID{ID: id, Name: "mysql"}, args: args, firstPing: false, + store: store, loggerOnce: loggerOnce, - } - - db, err := sql.Open("mysql", args.DSN) - if err != nil { - target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err - } - target.db = db - - if args.MaxOpenConnections > 0 { - // Set the maximum connections limit - target.db.SetMaxOpenConns(args.MaxOpenConnections) - } - - var store Store - - if args.QueueDir != "" { - queueDir := filepath.Join(args.QueueDir, storePrefix+"-mysql-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if oErr := store.Open(); oErr != nil { - target.loggerOnce(context.Background(), oErr, target.ID().String()) - return target, oErr - } - target.store = store - } - - err = target.db.Ping() - if err != nil { - if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) { - target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err - } - } else { - if err = target.executeStmts(); err != nil { - target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err - } - target.firstPing = true - } - - if target.store != nil && !test { - // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) - // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) - } - - return target, nil + quitCh: make(chan struct{}), + }, nil } diff --git a/internal/event/target/nats.go b/internal/event/target/nats.go index bc118da58..6f03fc9ad 100644 --- a/internal/event/target/nats.go +++ b/internal/event/target/nats.go @@ -23,6 +23,7 @@ import ( "crypto/x509" "encoding/json" "errors" + "fmt" "net/url" "os" "path/filepath" @@ -212,6 +213,8 @@ func (n NATSArgs) connectStan() (stan.Conn, error) { // NATSTarget - NATS target. type NATSTarget struct { + lazyInit lazyInit + id event.TargetID args NATSArgs natsConn *nats.Conn @@ -219,6 +222,7 @@ type NATSTarget struct { jstream nats.JetStream store Store loggerOnce logger.LogOnce + quitCh chan struct{} } // ID - returns target ID. @@ -226,13 +230,15 @@ func (target *NATSTarget) ID() event.TargetID { return target.id } -// HasQueueStore - Checks if the queueStore has been configured for the target -func (target *NATSTarget) HasQueueStore() bool { - return target.store != nil -} - // IsActive - Return true if target is up and active func (target *NATSTarget) IsActive() (bool, error) { + if err := target.init(); err != nil { + return false, err + } + return target.isActive() +} + +func (target *NATSTarget) isActive() (bool, error) { var connErr error if target.args.Streaming.Enable { if target.stanConn == nil || target.stanConn.NatsConn() == nil { @@ -270,10 +276,14 @@ func (target *NATSTarget) IsActive() (bool, error) { // Save - saves the events to the store which will be replayed when the Nats connection is active. func (target *NATSTarget) Save(eventData event.Event) error { + if err := target.init(); err != nil { + return err + } + if target.store != nil { return target.store.Put(eventData) } - _, err := target.IsActive() + _, err := target.isActive() if err != nil { return err } @@ -311,7 +321,11 @@ func (target *NATSTarget) send(eventData event.Event) error { // Send - sends event to Nats. func (target *NATSTarget) Send(eventKey string) error { - _, err := target.IsActive() + if err := target.init(); err != nil { + return err + } + + _, err := target.isActive() if err != nil { return err } @@ -335,6 +349,7 @@ func (target *NATSTarget) Send(eventKey string) error { // Close - closes underneath connections to NATS server. func (target *NATSTarget) Close() (err error) { + close(target.quitCh) if target.stanConn != nil { // closing the streaming connection does not close the provided NATS connection. if target.stanConn.NatsConn() != nil { @@ -350,66 +365,73 @@ func (target *NATSTarget) Close() (err error) { return nil } -// NewNATSTarget - creates new NATS target. -func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*NATSTarget, error) { - var natsConn *nats.Conn - var stanConn stan.Conn - var jstream nats.JetStream +func (target *NATSTarget) init() error { + return target.lazyInit.Do(target.initNATS) +} + +func (target *NATSTarget) initNATS() error { + args := target.args var err error - - var store Store - - target := &NATSTarget{ - id: event.TargetID{ID: id, Name: "nats"}, - args: args, - loggerOnce: loggerOnce, - } - - if args.QueueDir != "" { - queueDir := filepath.Join(args.QueueDir, storePrefix+"-nats-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if oErr := store.Open(); oErr != nil { - target.loggerOnce(context.Background(), oErr, target.ID().String()) - return target, oErr - } - target.store = store - } - if args.Streaming.Enable { target.loggerOnce(context.Background(), errors.New("NATS Streaming is deprecated please migrate to JetStream"), target.ID().String()) - + var stanConn stan.Conn stanConn, err = args.connectStan() target.stanConn = stanConn } else { + var natsConn *nats.Conn natsConn, err = args.connectNats() target.natsConn = natsConn } - if err != nil { - if store == nil || err.Error() != nats.ErrNoServers.Error() { + if err.Error() != nats.ErrNoServers.Error() { target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err } + return err } if target.natsConn != nil && args.JetStream.Enable { + var jstream nats.JetStream jstream, err = target.natsConn.JetStream() if err != nil { - if store == nil || err.Error() != nats.ErrNoServers.Error() { + if err.Error() != nats.ErrNoServers.Error() { target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err } + return err } target.jstream = jstream } - if target.store != nil && !test { - // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) - // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) + yes, err := target.isActive() + if err != nil { + return err + } + if !yes { + return errNotConnected } - return target, nil + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + return nil +} + +// NewNATSTarget - creates new NATS target. +func NewNATSTarget(id string, args NATSArgs, loggerOnce logger.LogOnce) (*NATSTarget, error) { + var store Store + if args.QueueDir != "" { + queueDir := filepath.Join(args.QueueDir, storePrefix+"-nats-"+id) + store = NewQueueStore(queueDir, args.QueueLimit) + if err := store.Open(); err != nil { + return nil, fmt.Errorf("unable to initialize the queue store of NATS `%s`: %w", id, err) + } + } + + return &NATSTarget{ + id: event.TargetID{ID: id, Name: "nats"}, + args: args, + loggerOnce: loggerOnce, + store: store, + quitCh: make(chan struct{}), + }, nil } diff --git a/internal/event/target/nsq.go b/internal/event/target/nsq.go index 8eb7aac32..ecebd077e 100644 --- a/internal/event/target/nsq.go +++ b/internal/event/target/nsq.go @@ -22,6 +22,7 @@ import ( "crypto/tls" "encoding/json" "errors" + "fmt" "net/url" "os" "path/filepath" @@ -88,12 +89,15 @@ func (n NSQArgs) Validate() error { // NSQTarget - NSQ target. type NSQTarget struct { + lazyInit lazyInit + id event.TargetID args NSQArgs producer *nsq.Producer store Store config *nsq.Config loggerOnce logger.LogOnce + quitCh chan struct{} } // ID - returns target ID. @@ -101,13 +105,15 @@ func (target *NSQTarget) ID() event.TargetID { return target.id } -// HasQueueStore - Checks if the queueStore has been configured for the target -func (target *NSQTarget) HasQueueStore() bool { - return target.store != nil -} - // IsActive - Return true if target is up and active func (target *NSQTarget) IsActive() (bool, error) { + if err := target.init(); err != nil { + return false, err + } + return target.isActive() +} + +func (target *NSQTarget) isActive() (bool, error) { if target.producer == nil { producer, err := nsq.NewProducer(target.args.NSQDAddress.String(), target.config) if err != nil { @@ -128,10 +134,14 @@ func (target *NSQTarget) IsActive() (bool, error) { // Save - saves the events to the store which will be replayed when the nsq connection is active. func (target *NSQTarget) Save(eventData event.Event) error { + if err := target.init(); err != nil { + return err + } + if target.store != nil { return target.store.Put(eventData) } - _, err := target.IsActive() + _, err := target.isActive() if err != nil { return err } @@ -156,7 +166,11 @@ func (target *NSQTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to NSQ. func (target *NSQTarget) Send(eventKey string) error { - _, err := target.IsActive() + if err := target.init(); err != nil { + return err + } + + _, err := target.isActive() if err != nil { return err } @@ -181,6 +195,7 @@ func (target *NSQTarget) Send(eventKey string) error { // Close - closes underneath connections to NSQD server. func (target *NSQTarget) Close() (err error) { + close(target.quitCh) if target.producer != nil { // this blocks until complete: target.producer.Stop() @@ -188,8 +203,13 @@ func (target *NSQTarget) Close() (err error) { return nil } -// NewNSQTarget - creates new NSQ target. -func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*NSQTarget, error) { +func (target *NSQTarget) init() error { + return target.lazyInit.Do(target.initNSQ) +} + +func (target *NSQTarget) initNSQ() error { + args := target.args + config := nsq.NewConfig() if args.TLS.Enable { config.TlsV1 = true @@ -197,47 +217,55 @@ func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce lo InsecureSkipVerify: args.TLS.SkipVerify, } } - - var store Store - - target := &NSQTarget{ - id: event.TargetID{ID: id, Name: "nsq"}, - args: args, - config: config, - loggerOnce: loggerOnce, - } - - if args.QueueDir != "" { - queueDir := filepath.Join(args.QueueDir, storePrefix+"-nsq-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if oErr := store.Open(); oErr != nil { - target.loggerOnce(context.Background(), oErr, target.ID().String()) - return target, oErr - } - target.store = store - } + target.config = config producer, err := nsq.NewProducer(args.NSQDAddress.String(), config) if err != nil { target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err + return err } target.producer = producer - if err := target.producer.Ping(); err != nil { + err = target.producer.Ping() + if err != nil { // To treat "connection refused" errors as errNotConnected. - if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) { + if !(IsConnRefusedErr(err) || IsConnResetErr(err)) { target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err + } + target.producer.Stop() + return err + } + + yes, err := target.isActive() + if err != nil { + return err + } + if !yes { + return errNotConnected + } + + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + return nil +} + +// NewNSQTarget - creates new NSQ target. +func NewNSQTarget(id string, args NSQArgs, loggerOnce logger.LogOnce) (*NSQTarget, error) { + var store Store + if args.QueueDir != "" { + queueDir := filepath.Join(args.QueueDir, storePrefix+"-nsq-"+id) + store = NewQueueStore(queueDir, args.QueueLimit) + if err := store.Open(); err != nil { + return nil, fmt.Errorf("unable to initialize the queue store of NSQ `%s`: %w", id, err) } } - if target.store != nil && !test { - // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) - // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) - } - - return target, nil + return &NSQTarget{ + id: event.TargetID{ID: id, Name: "nsq"}, + args: args, + loggerOnce: loggerOnce, + store: store, + quitCh: make(chan struct{}), + }, nil } diff --git a/internal/event/target/postgresql.go b/internal/event/target/postgresql.go index 22bb2b3c4..b5eb43cce 100644 --- a/internal/event/target/postgresql.go +++ b/internal/event/target/postgresql.go @@ -137,6 +137,8 @@ func (p PostgreSQLArgs) Validate() error { // PostgreSQLTarget - PostgreSQL target. type PostgreSQLTarget struct { + lazyInit lazyInit + id event.TargetID args PostgreSQLArgs updateStmt *sql.Stmt @@ -147,6 +149,7 @@ type PostgreSQLTarget struct { firstPing bool connString string loggerOnce logger.LogOnce + quitCh chan struct{} } // ID - returns target ID. @@ -154,24 +157,15 @@ func (target *PostgreSQLTarget) ID() event.TargetID { return target.id } -// HasQueueStore - Checks if the queueStore has been configured for the target -func (target *PostgreSQLTarget) HasQueueStore() bool { - return target.store != nil -} - // IsActive - Return true if target is up and active func (target *PostgreSQLTarget) IsActive() (bool, error) { - if target.db == nil { - db, err := sql.Open("postgres", target.connString) - if err != nil { - return false, err - } - target.db = db - if target.args.MaxOpenConnections > 0 { - // Set the maximum connections limit - target.db.SetMaxOpenConns(target.args.MaxOpenConnections) - } + if err := target.init(); err != nil { + return false, err } + return target.isActive() +} + +func (target *PostgreSQLTarget) isActive() (bool, error) { if err := target.db.Ping(); err != nil { if IsConnErr(err) { return false, errNotConnected @@ -183,10 +177,14 @@ func (target *PostgreSQLTarget) IsActive() (bool, error) { // Save - saves the events to the store if questore is configured, which will be replayed when the PostgreSQL connection is active. func (target *PostgreSQLTarget) Save(eventData event.Event) error { + if err := target.init(); err != nil { + return err + } + if target.store != nil { return target.store.Put(eventData) } - _, err := target.IsActive() + _, err := target.isActive() if err != nil { return err } @@ -241,7 +239,11 @@ func (target *PostgreSQLTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to PostgreSQL. func (target *PostgreSQLTarget) Send(eventKey string) error { - _, err := target.IsActive() + if err := target.init(); err != nil { + return err + } + + _, err := target.isActive() if err != nil { return err } @@ -277,6 +279,7 @@ func (target *PostgreSQLTarget) Send(eventKey string) error { // Close - closes underneath connections to PostgreSQL database. func (target *PostgreSQLTarget) Close() error { + close(target.quitCh) if target.updateStmt != nil { // FIXME: log returned error. ignore time being. _ = target.updateStmt.Close() @@ -329,8 +332,58 @@ func (target *PostgreSQLTarget) executeStmts() error { return nil } +func (target *PostgreSQLTarget) init() error { + return target.lazyInit.Do(target.initPostgreSQL) +} + +func (target *PostgreSQLTarget) initPostgreSQL() error { + args := target.args + + db, err := sql.Open("postgres", target.connString) + if err != nil { + return err + } + target.db = db + + if args.MaxOpenConnections > 0 { + // Set the maximum connections limit + target.db.SetMaxOpenConns(args.MaxOpenConnections) + } + + err = target.db.Ping() + if err != nil { + if !(IsConnRefusedErr(err) || IsConnResetErr(err)) { + target.loggerOnce(context.Background(), err, target.ID().String()) + } + } else { + if err = target.executeStmts(); err != nil { + target.loggerOnce(context.Background(), err, target.ID().String()) + } else { + target.firstPing = true + } + } + + if err != nil { + target.db.Close() + return err + } + + yes, err := target.isActive() + if err != nil { + return err + } + if !yes { + return errNotConnected + } + + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + return nil +} + // NewPostgreSQLTarget - creates new PostgreSQL target. -func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*PostgreSQLTarget, error) { +func NewPostgreSQLTarget(id string, args PostgreSQLArgs, loggerOnce logger.LogOnce) (*PostgreSQLTarget, error) { params := []string{args.ConnectionString} if args.ConnectionString == "" { params = []string{} @@ -352,57 +405,22 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, } connStr := strings.Join(params, " ") - target := &PostgreSQLTarget{ - id: event.TargetID{ID: id, Name: "postgresql"}, - args: args, - firstPing: false, - connString: connStr, - loggerOnce: loggerOnce, - } - - db, err := sql.Open("postgres", connStr) - if err != nil { - return target, err - } - target.db = db - - if args.MaxOpenConnections > 0 { - // Set the maximum connections limit - target.db.SetMaxOpenConns(args.MaxOpenConnections) - } - var store Store - if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-postgresql-"+id) store = NewQueueStore(queueDir, args.QueueLimit) - if oErr := store.Open(); oErr != nil { - target.loggerOnce(context.Background(), oErr, target.ID().String()) - return target, oErr + if err := store.Open(); err != nil { + return nil, fmt.Errorf("unable to initialize the queue store of PostgreSQL `%s`: %w", id, err) } - target.store = store } - err = target.db.Ping() - if err != nil { - if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) { - target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err - } - } else { - if err = target.executeStmts(); err != nil { - target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err - } - target.firstPing = true - } - - if target.store != nil && !test { - // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) - // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) - } - - return target, nil + return &PostgreSQLTarget{ + id: event.TargetID{ID: id, Name: "postgresql"}, + args: args, + firstPing: false, + store: store, + connString: connStr, + loggerOnce: loggerOnce, + quitCh: make(chan struct{}), + }, nil } diff --git a/internal/event/target/redis.go b/internal/event/target/redis.go index 87706c48d..fcdedcae6 100644 --- a/internal/event/target/redis.go +++ b/internal/event/target/redis.go @@ -117,12 +117,15 @@ func (r RedisArgs) validateFormat(c redis.Conn) error { // RedisTarget - Redis target. type RedisTarget struct { + lazyInit lazyInit + id event.TargetID args RedisArgs pool *redis.Pool store Store firstPing bool loggerOnce logger.LogOnce + quitCh chan struct{} } // ID - returns target ID. @@ -130,13 +133,15 @@ func (target *RedisTarget) ID() event.TargetID { return target.id } -// HasQueueStore - Checks if the queueStore has been configured for the target -func (target *RedisTarget) HasQueueStore() bool { - return target.store != nil -} - // IsActive - Return true if target is up and active func (target *RedisTarget) IsActive() (bool, error) { + if err := target.init(); err != nil { + return false, err + } + return target.isActive() +} + +func (target *RedisTarget) isActive() (bool, error) { conn := target.pool.Get() defer conn.Close() @@ -152,10 +157,14 @@ func (target *RedisTarget) IsActive() (bool, error) { // Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active. func (target *RedisTarget) Save(eventData event.Event) error { + if err := target.init(); err != nil { + return err + } + if target.store != nil { return target.store.Put(eventData) } - _, err := target.IsActive() + _, err := target.isActive() if err != nil { return err } @@ -204,6 +213,10 @@ func (target *RedisTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to redis. func (target *RedisTarget) Send(eventKey string) error { + if err := target.init(); err != nil { + return err + } + conn := target.pool.Get() defer conn.Close() @@ -248,11 +261,58 @@ func (target *RedisTarget) Send(eventKey string) error { // Close - releases the resources used by the pool. func (target *RedisTarget) Close() error { + close(target.quitCh) return target.pool.Close() } +func (target *RedisTarget) init() error { + return target.lazyInit.Do(target.initRedis) +} + +func (target *RedisTarget) initRedis() error { + conn := target.pool.Get() + defer conn.Close() + + _, pingErr := conn.Do("PING") + if pingErr != nil { + if !(IsConnRefusedErr(pingErr) || IsConnResetErr(pingErr)) { + target.loggerOnce(context.Background(), pingErr, target.ID().String()) + } + return pingErr + } + + if err := target.args.validateFormat(conn); err != nil { + target.loggerOnce(context.Background(), err, target.ID().String()) + return err + } + + target.firstPing = true + + yes, err := target.isActive() + if err != nil { + return err + } + if !yes { + return errNotConnected + } + + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + return nil +} + // NewRedisTarget - creates new Redis target. -func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*RedisTarget, error) { +func NewRedisTarget(id string, args RedisArgs, loggerOnce logger.LogOnce) (*RedisTarget, error) { + var store Store + if args.QueueDir != "" { + queueDir := filepath.Join(args.QueueDir, storePrefix+"-redis-"+id) + store = NewQueueStore(queueDir, args.QueueLimit) + if err := store.Open(); err != nil { + return nil, fmt.Errorf("unable to initialize the queue store of Redis `%s`: %w", id, err) + } + } + pool := &redis.Pool{ MaxIdle: 3, IdleTimeout: 2 * 60 * time.Second, @@ -283,48 +343,12 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnc }, } - var store Store - - target := &RedisTarget{ + return &RedisTarget{ id: event.TargetID{ID: id, Name: "redis"}, args: args, pool: pool, + store: store, loggerOnce: loggerOnce, - } - - if args.QueueDir != "" { - queueDir := filepath.Join(args.QueueDir, storePrefix+"-redis-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if oErr := store.Open(); oErr != nil { - target.loggerOnce(context.Background(), oErr, target.ID().String()) - return target, oErr - } - target.store = store - } - - conn := target.pool.Get() - defer conn.Close() - - _, pingErr := conn.Do("PING") - if pingErr != nil { - if target.store == nil || !(IsConnRefusedErr(pingErr) || IsConnResetErr(pingErr)) { - target.loggerOnce(context.Background(), pingErr, target.ID().String()) - return target, pingErr - } - } else { - if err := target.args.validateFormat(conn); err != nil { - target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err - } - target.firstPing = true - } - - if target.store != nil && !test { - // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) - // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) - } - - return target, nil + quitCh: make(chan struct{}), + }, nil } diff --git a/internal/event/target/store.go b/internal/event/target/store.go index a22ff0d47..07733fcea 100644 --- a/internal/event/target/store.go +++ b/internal/event/target/store.go @@ -47,7 +47,7 @@ type Store interface { } // replayEvents - Reads the events from the store and replays. -func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce logger.LogOnce, id event.TargetID) <-chan string { +func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce logger.LogOnce, id string) <-chan string { eventKeyCh := make(chan string) go func() { @@ -59,7 +59,7 @@ func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce logger.LogOnce for { names, err := store.List() if err != nil { - loggerOnce(context.Background(), fmt.Errorf("eventStore.List() failed with: %w", err), id.String()) + loggerOnce(context.Background(), fmt.Errorf("eventStore.List() failed with: %w", err), id) } else { for _, name := range names { select { @@ -141,3 +141,12 @@ func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan str } } } + +func streamEventsFromStore(store Store, target event.Target, doneCh <-chan struct{}, loggerOnce logger.LogOnce) { + go func() { + // Replays the events from the store. + eventKeyCh := replayEvents(store, doneCh, loggerOnce, target.ID().String()) + // Send events from the store. + sendEvents(target, eventKeyCh, doneCh, loggerOnce) + }() +} diff --git a/internal/event/target/webhook.go b/internal/event/target/webhook.go index cf794a0fb..c5108d9d1 100644 --- a/internal/event/target/webhook.go +++ b/internal/event/target/webhook.go @@ -90,25 +90,31 @@ func (w WebhookArgs) Validate() error { // WebhookTarget - Webhook target. type WebhookTarget struct { + lazyInit lazyInit + id event.TargetID args WebhookArgs + transport *http.Transport httpClient *http.Client store Store loggerOnce logger.LogOnce + quitCh chan struct{} } // ID - returns target ID. -func (target WebhookTarget) ID() event.TargetID { +func (target *WebhookTarget) ID() event.TargetID { return target.id } -// HasQueueStore - Checks if the queueStore has been configured for the target -func (target *WebhookTarget) HasQueueStore() bool { - return target.store != nil -} - // IsActive - Return true if target is up and active func (target *WebhookTarget) IsActive() (bool, error) { + if err := target.init(); err != nil { + return false, err + } + return target.isActive() +} + +func (target *WebhookTarget) isActive() (bool, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -143,6 +149,10 @@ func (target *WebhookTarget) IsActive() (bool, error) { // Save - saves the events to the store if queuestore is configured, // which will be replayed when the webhook connection is active. func (target *WebhookTarget) Save(eventData event.Event) error { + if err := target.init(); err != nil { + return err + } + if target.store != nil { return target.store.Put(eventData) } @@ -205,6 +215,10 @@ func (target *WebhookTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to webhook. func (target *WebhookTarget) Send(eventKey string) error { + if err := target.init(); err != nil { + return err + } + eventData, eErr := target.store.Get(eventKey) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() @@ -228,52 +242,60 @@ func (target *WebhookTarget) Send(eventKey string) error { // Close - does nothing and available for interface compatibility. func (target *WebhookTarget) Close() error { + close(target.quitCh) return nil } -// NewWebhookTarget - creates new Webhook target. -func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOnce logger.LogOnce, transport *http.Transport, test bool) (*WebhookTarget, error) { - var store Store - target := &WebhookTarget{ - id: event.TargetID{ID: id, Name: "webhook"}, - args: args, - loggerOnce: loggerOnce, - } +func (target *WebhookTarget) init() error { + return target.lazyInit.Do(target.initWebhook) +} - if target.args.ClientCert != "" && target.args.ClientKey != "" { - manager, err := certs.NewManager(ctx, target.args.ClientCert, target.args.ClientKey, tls.LoadX509KeyPair) +// Only called from init() +func (target *WebhookTarget) initWebhook() error { + args := target.args + transport := target.transport + + if args.ClientCert != "" && args.ClientKey != "" { + manager, err := certs.NewManager(context.Background(), args.ClientCert, args.ClientKey, tls.LoadX509KeyPair) if err != nil { - return target, err + return err } manager.ReloadOnSignal(syscall.SIGHUP) // allow reloads upon SIGHUP transport.TLSClientConfig.GetClientCertificate = manager.GetClientCertificate } target.httpClient = &http.Client{Transport: transport} + yes, err := target.isActive() + if err != nil { + return err + } + if !yes { + return errNotConnected + } + + if target.store != nil { + streamEventsFromStore(target.store, target, target.quitCh, target.loggerOnce) + } + return nil +} + +// NewWebhookTarget - creates new Webhook target. +func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOnce logger.LogOnce, transport *http.Transport) (*WebhookTarget, error) { + var store Store if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-webhook-"+id) store = NewQueueStore(queueDir, args.QueueLimit) if err := store.Open(); err != nil { - target.loggerOnce(context.Background(), err, target.ID().String()) - return target, err - } - target.store = store - } - - _, err := target.IsActive() - if err != nil { - if target.store == nil || err != errNotConnected { - target.loggerOnce(ctx, err, target.ID().String()) - return target, err + return nil, fmt.Errorf("unable to initialize the queue store of Webhook `%s`: %w", id, err) } } - if target.store != nil && !test { - // Replays the events from the store. - eventKeyCh := replayEvents(target.store, ctx.Done(), target.loggerOnce, target.ID()) - // Start replaying events from the store. - go sendEvents(target, eventKeyCh, ctx.Done(), target.loggerOnce) - } - - return target, nil + return &WebhookTarget{ + id: event.TargetID{ID: id, Name: "webhook"}, + args: args, + loggerOnce: loggerOnce, + transport: transport, + store: store, + quitCh: make(chan struct{}), + }, nil } diff --git a/internal/event/targetlist.go b/internal/event/targetlist.go index 40ab94dc4..b998ee2dc 100644 --- a/internal/event/targetlist.go +++ b/internal/event/targetlist.go @@ -35,7 +35,6 @@ type Target interface { Save(Event) error Send(string) error Close() error - HasQueueStore() bool } // TargetList - holds list of targets indexed by target ID. diff --git a/internal/event/targetlist_test.go b/internal/event/targetlist_test.go index eaac18fd3..1958611ff 100644 --- a/internal/event/targetlist_test.go +++ b/internal/event/targetlist_test.go @@ -72,9 +72,9 @@ func (target ExampleTarget) IsActive() (bool, error) { return false, errors.New("not connected to target server/service") } -// HasQueueStore - No-Op. Added for interface compatibility -func (target ExampleTarget) HasQueueStore() bool { - return false +// FlushQueueStore - No-Op. Added for interface compatibility +func (target ExampleTarget) FlushQueueStore() error { + return nil } func TestTargetListAdd(t *testing.T) {