minio/internal/config/notify/parse.go
Praveen raj Mani 261111e728
Kafka notify: support batched commits for queue store (#20377)
The items will be saved per target batch and will
be committed to the queue store when the batch is full

Also, periodically commit the batched items to the queue store
based on configured commit_timeout; default is 30s;

Bonus: compress queue store multi writes
2024-09-06 16:06:30 -07:00

1786 lines
47 KiB
Go

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package notify
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/minio/minio/internal/config"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/event/target"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v3/env"
xnet "github.com/minio/pkg/v3/net"
)
const (
formatNamespace = "namespace"
)
const (
logSubsys = "notify"
)
func logOnceIf(ctx context.Context, err error, id string, errKind ...interface{}) {
logger.LogOnceIf(ctx, logSubsys, err, id, errKind...)
}
// ErrTargetsOffline - Indicates single/multiple target failures.
var ErrTargetsOffline = errors.New("one or more targets are offline. Please use `mc admin info --json` to check the offline targets")
// TestSubSysNotificationTargets - tests notification targets of given subsystem
func TestSubSysNotificationTargets(ctx context.Context, cfg config.Config, subSys string, transport *http.Transport) error {
if err := checkValidNotificationKeysForSubSys(subSys, cfg[subSys]); err != nil {
return err
}
targetList, err := fetchSubSysTargets(ctx, cfg, subSys, transport)
if err != nil {
return err
}
for _, target := range targetList {
defer target.Close()
}
tgts, ok := ctx.Value(config.ContextKeyForTargetFromConfig).(map[string]bool)
if !ok {
tgts = make(map[string]bool)
}
for _, target := range targetList {
if tgts[target.ID().ID] {
// When target set should be online
yes, err := target.IsActive()
if err == nil && !yes {
err = ErrTargetsOffline
}
if err != nil {
return fmt.Errorf("error (%s): %w", target.ID(), err)
}
} else {
// Just for call init.
// Ignore target is online or offline
_, _ = target.IsActive()
}
}
return nil
}
func fetchSubSysTargets(ctx context.Context, cfg config.Config, subSys string, transport *http.Transport) (targets []event.Target, err error) {
if err := checkValidNotificationKeysForSubSys(subSys, cfg[subSys]); err != nil {
return nil, err
}
switch subSys {
case config.NotifyAMQPSubSys:
amqpTargets, err := GetNotifyAMQP(cfg[config.NotifyAMQPSubSys])
if err != nil {
return nil, err
}
for id, args := range amqpTargets {
if !args.Enable {
continue
}
t, err := target.NewAMQPTarget(id, args, logOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyESSubSys:
esTargets, err := GetNotifyES(cfg[config.NotifyESSubSys], transport)
if err != nil {
return nil, err
}
for id, args := range esTargets {
if !args.Enable {
continue
}
t, err := target.NewElasticsearchTarget(id, args, logOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyKafkaSubSys:
kafkaTargets, err := GetNotifyKafka(cfg[config.NotifyKafkaSubSys])
if err != nil {
return nil, err
}
for id, args := range kafkaTargets {
if !args.Enable {
continue
}
args.TLS.RootCAs = transport.TLSClientConfig.RootCAs
t, err := target.NewKafkaTarget(id, args, logOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyMQTTSubSys:
mqttTargets, err := GetNotifyMQTT(cfg[config.NotifyMQTTSubSys], transport.TLSClientConfig.RootCAs)
if err != nil {
return nil, err
}
for id, args := range mqttTargets {
if !args.Enable {
continue
}
args.RootCAs = transport.TLSClientConfig.RootCAs
t, err := target.NewMQTTTarget(id, args, logOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyMySQLSubSys:
mysqlTargets, err := GetNotifyMySQL(cfg[config.NotifyMySQLSubSys])
if err != nil {
return nil, err
}
for id, args := range mysqlTargets {
if !args.Enable {
continue
}
t, err := target.NewMySQLTarget(id, args, logOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyNATSSubSys:
natsTargets, err := GetNotifyNATS(cfg[config.NotifyNATSSubSys], transport.TLSClientConfig.RootCAs)
if err != nil {
return nil, err
}
for id, args := range natsTargets {
if !args.Enable {
continue
}
t, err := target.NewNATSTarget(id, args, logOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyNSQSubSys:
nsqTargets, err := GetNotifyNSQ(cfg[config.NotifyNSQSubSys])
if err != nil {
return nil, err
}
for id, args := range nsqTargets {
if !args.Enable {
continue
}
t, err := target.NewNSQTarget(id, args, logOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyPostgresSubSys:
postgresTargets, err := GetNotifyPostgres(cfg[config.NotifyPostgresSubSys])
if err != nil {
return nil, err
}
for id, args := range postgresTargets {
if !args.Enable {
continue
}
t, err := target.NewPostgreSQLTarget(id, args, logOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyRedisSubSys:
redisTargets, err := GetNotifyRedis(cfg[config.NotifyRedisSubSys])
if err != nil {
return nil, err
}
for id, args := range redisTargets {
if !args.Enable {
continue
}
t, err := target.NewRedisTarget(id, args, logOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyWebhookSubSys:
webhookTargets, err := GetNotifyWebhook(cfg[config.NotifyWebhookSubSys], transport)
if err != nil {
return nil, err
}
for id, args := range webhookTargets {
if !args.Enable {
continue
}
t, err := target.NewWebhookTarget(ctx, id, args, logOnceIf, transport)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
}
return targets, nil
}
// FetchEnabledTargets - Returns a set of configured TargetList
func FetchEnabledTargets(ctx context.Context, cfg config.Config, transport *http.Transport) (_ *event.TargetList, err error) {
targetList := event.NewTargetList(ctx)
for _, subSys := range config.NotifySubSystems.ToSlice() {
targets, err := fetchSubSysTargets(ctx, cfg, subSys, transport)
if err != nil {
return nil, err
}
for _, t := range targets {
if err = targetList.Add(t); err != nil {
return nil, err
}
}
}
return targetList, nil
}
// DefaultNotificationKVS - default notification list of kvs.
var (
DefaultNotificationKVS = map[string]config.KVS{
config.NotifyAMQPSubSys: DefaultAMQPKVS,
config.NotifyKafkaSubSys: DefaultKafkaKVS,
config.NotifyMQTTSubSys: DefaultMQTTKVS,
config.NotifyMySQLSubSys: DefaultMySQLKVS,
config.NotifyNATSSubSys: DefaultNATSKVS,
config.NotifyNSQSubSys: DefaultNSQKVS,
config.NotifyPostgresSubSys: DefaultPostgresKVS,
config.NotifyRedisSubSys: DefaultRedisKVS,
config.NotifyWebhookSubSys: DefaultWebhookKVS,
config.NotifyESSubSys: DefaultESKVS,
}
)
func checkValidNotificationKeysForSubSys(subSys string, tgt map[string]config.KVS) error {
validKVS, ok := DefaultNotificationKVS[subSys]
if !ok {
return nil
}
for tname, kv := range tgt {
subSysTarget := subSys
if tname != config.Default {
subSysTarget = subSys + config.SubSystemSeparator + tname
}
if v, ok := kv.Lookup(config.Enable); ok && v == config.EnableOn {
if err := config.CheckValidKeys(subSysTarget, kv, validKVS); err != nil {
return err
}
}
}
return nil
}
// DefaultKafkaKVS - default KV for kafka target
var (
DefaultKafkaKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.KafkaTopic,
Value: "",
},
config.KV{
Key: target.KafkaBrokers,
Value: "",
},
config.KV{
Key: target.KafkaSASLUsername,
Value: "",
},
config.KV{
Key: target.KafkaSASLPassword,
Value: "",
},
config.KV{
Key: target.KafkaSASLMechanism,
Value: "plain",
},
config.KV{
Key: target.KafkaClientTLSCert,
Value: "",
},
config.KV{
Key: target.KafkaClientTLSKey,
Value: "",
},
config.KV{
Key: target.KafkaTLSClientAuth,
Value: "0",
},
config.KV{
Key: target.KafkaSASL,
Value: config.EnableOff,
},
config.KV{
Key: target.KafkaTLS,
Value: config.EnableOff,
},
config.KV{
Key: target.KafkaTLSSkipVerify,
Value: config.EnableOff,
},
config.KV{
Key: target.KafkaQueueLimit,
Value: "0",
},
config.KV{
Key: target.KafkaQueueDir,
Value: "",
},
config.KV{
Key: target.KafkaVersion,
Value: "",
},
config.KV{
Key: target.KafkaBatchSize,
Value: "0",
},
config.KV{
Key: target.KafkaBatchCommitTimeout,
Value: "0s",
},
config.KV{
Key: target.KafkaCompressionCodec,
Value: "",
},
config.KV{
Key: target.KafkaCompressionLevel,
Value: "",
},
}
)
// GetNotifyKafka - returns a map of registered notification 'kafka' targets
func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs, error) {
kafkaTargets := make(map[string]target.KafkaArgs)
for k, kv := range config.Merge(kafkaKVS, target.EnvKafkaEnable, DefaultKafkaKVS) {
enableEnv := target.EnvKafkaEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
var brokers []xnet.Host
brokersEnv := target.EnvKafkaBrokers
if k != config.Default {
brokersEnv = brokersEnv + config.Default + k
}
kafkaBrokers := env.Get(brokersEnv, kv.Get(target.KafkaBrokers))
if len(kafkaBrokers) == 0 {
return nil, config.Errorf("kafka 'brokers' cannot be empty")
}
for _, s := range strings.Split(kafkaBrokers, config.ValueSeparator) {
var host *xnet.Host
host, err = xnet.ParseHost(s)
if err != nil {
break
}
brokers = append(brokers, *host)
}
if err != nil {
return nil, err
}
queueLimitEnv := target.EnvKafkaQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.KafkaQueueLimit)), 10, 64)
if err != nil {
return nil, err
}
clientAuthEnv := target.EnvKafkaTLSClientAuth
if k != config.Default {
clientAuthEnv = clientAuthEnv + config.Default + k
}
clientAuth, err := strconv.Atoi(env.Get(clientAuthEnv, kv.Get(target.KafkaTLSClientAuth)))
if err != nil {
return nil, err
}
topicEnv := target.EnvKafkaTopic
if k != config.Default {
topicEnv = topicEnv + config.Default + k
}
queueDirEnv := target.EnvKafkaQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
versionEnv := target.EnvKafkaVersion
if k != config.Default {
versionEnv = versionEnv + config.Default + k
}
batchSizeEnv := target.EnvKafkaBatchSize
if k != config.Default {
batchSizeEnv = batchSizeEnv + config.Default + k
}
batchSize, err := strconv.ParseUint(env.Get(batchSizeEnv, kv.Get(target.KafkaBatchSize)), 10, 32)
if err != nil {
return nil, err
}
batchCommitTimeoutEnv := target.EnvKafkaBatchCommitTimeout
if k != config.Default {
batchCommitTimeoutEnv = batchCommitTimeoutEnv + config.Default + k
}
batchCommitTimeout, err := time.ParseDuration(env.Get(batchCommitTimeoutEnv, kv.Get(target.KafkaBatchCommitTimeout)))
if err != nil {
return nil, err
}
kafkaArgs := target.KafkaArgs{
Enable: enabled,
Brokers: brokers,
Topic: env.Get(topicEnv, kv.Get(target.KafkaTopic)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.KafkaQueueDir)),
QueueLimit: queueLimit,
Version: env.Get(versionEnv, kv.Get(target.KafkaVersion)),
BatchSize: uint32(batchSize),
BatchCommitTimeout: batchCommitTimeout,
}
tlsEnableEnv := target.EnvKafkaTLS
if k != config.Default {
tlsEnableEnv = tlsEnableEnv + config.Default + k
}
tlsSkipVerifyEnv := target.EnvKafkaTLSSkipVerify
if k != config.Default {
tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k
}
tlsClientTLSCertEnv := target.EnvKafkaClientTLSCert
if k != config.Default {
tlsClientTLSCertEnv = tlsClientTLSCertEnv + config.Default + k
}
tlsClientTLSKeyEnv := target.EnvKafkaClientTLSKey
if k != config.Default {
tlsClientTLSKeyEnv = tlsClientTLSKeyEnv + config.Default + k
}
kafkaArgs.TLS.Enable = env.Get(tlsEnableEnv, kv.Get(target.KafkaTLS)) == config.EnableOn
kafkaArgs.TLS.SkipVerify = env.Get(tlsSkipVerifyEnv, kv.Get(target.KafkaTLSSkipVerify)) == config.EnableOn
kafkaArgs.TLS.ClientAuth = tls.ClientAuthType(clientAuth)
kafkaArgs.TLS.ClientTLSCert = env.Get(tlsClientTLSCertEnv, kv.Get(target.KafkaClientTLSCert))
kafkaArgs.TLS.ClientTLSKey = env.Get(tlsClientTLSKeyEnv, kv.Get(target.KafkaClientTLSKey))
compressionCodecEnv := target.EnvKafkaProducerCompressionCodec
if k != config.Default {
compressionCodecEnv = compressionCodecEnv + config.Default + k
}
kafkaArgs.Producer.Compression = env.Get(compressionCodecEnv, kv.Get(target.KafkaCompressionCodec))
compressionLevelEnv := target.EnvKafkaProducerCompressionLevel
if k != config.Default {
compressionLevelEnv = compressionLevelEnv + config.Default + k
}
compressionLevel, _ := strconv.Atoi(env.Get(compressionLevelEnv, kv.Get(target.KafkaCompressionLevel)))
kafkaArgs.Producer.CompressionLevel = compressionLevel
saslEnableEnv := target.EnvKafkaSASLEnable
if k != config.Default {
saslEnableEnv = saslEnableEnv + config.Default + k
}
saslUsernameEnv := target.EnvKafkaSASLUsername
if k != config.Default {
saslUsernameEnv = saslUsernameEnv + config.Default + k
}
saslPasswordEnv := target.EnvKafkaSASLPassword
if k != config.Default {
saslPasswordEnv = saslPasswordEnv + config.Default + k
}
saslMechanismEnv := target.EnvKafkaSASLMechanism
if k != config.Default {
saslMechanismEnv = saslMechanismEnv + config.Default + k
}
kafkaArgs.SASL.Enable = env.Get(saslEnableEnv, kv.Get(target.KafkaSASL)) == config.EnableOn
kafkaArgs.SASL.User = env.Get(saslUsernameEnv, kv.Get(target.KafkaSASLUsername))
kafkaArgs.SASL.Password = env.Get(saslPasswordEnv, kv.Get(target.KafkaSASLPassword))
kafkaArgs.SASL.Mechanism = env.Get(saslMechanismEnv, kv.Get(target.KafkaSASLMechanism))
if err = kafkaArgs.Validate(); err != nil {
return nil, err
}
kafkaTargets[k] = kafkaArgs
}
return kafkaTargets, nil
}
// DefaultMQTTKVS - default MQTT config
var (
DefaultMQTTKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.MqttBroker,
Value: "",
},
config.KV{
Key: target.MqttTopic,
Value: "",
},
config.KV{
Key: target.MqttPassword,
Value: "",
},
config.KV{
Key: target.MqttUsername,
Value: "",
},
config.KV{
Key: target.MqttQoS,
Value: "0",
},
config.KV{
Key: target.MqttKeepAliveInterval,
Value: "0s",
},
config.KV{
Key: target.MqttReconnectInterval,
Value: "0s",
},
config.KV{
Key: target.MqttQueueDir,
Value: "",
},
config.KV{
Key: target.MqttQueueLimit,
Value: "0",
},
}
)
// GetNotifyMQTT - returns a map of registered notification 'mqtt' targets
func GetNotifyMQTT(mqttKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[string]target.MQTTArgs, error) {
mqttTargets := make(map[string]target.MQTTArgs)
for k, kv := range config.Merge(mqttKVS, target.EnvMQTTEnable, DefaultMQTTKVS) {
enableEnv := target.EnvMQTTEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
brokerEnv := target.EnvMQTTBroker
if k != config.Default {
brokerEnv = brokerEnv + config.Default + k
}
brokerURL, err := xnet.ParseURL(env.Get(brokerEnv, kv.Get(target.MqttBroker)))
if err != nil {
return nil, err
}
reconnectIntervalEnv := target.EnvMQTTReconnectInterval
if k != config.Default {
reconnectIntervalEnv = reconnectIntervalEnv + config.Default + k
}
reconnectInterval, err := time.ParseDuration(env.Get(reconnectIntervalEnv,
kv.Get(target.MqttReconnectInterval)))
if err != nil {
return nil, err
}
keepAliveIntervalEnv := target.EnvMQTTKeepAliveInterval
if k != config.Default {
keepAliveIntervalEnv = keepAliveIntervalEnv + config.Default + k
}
keepAliveInterval, err := time.ParseDuration(env.Get(keepAliveIntervalEnv,
kv.Get(target.MqttKeepAliveInterval)))
if err != nil {
return nil, err
}
queueLimitEnv := target.EnvMQTTQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.MqttQueueLimit)), 10, 64)
if err != nil {
return nil, err
}
qosEnv := target.EnvMQTTQoS
if k != config.Default {
qosEnv = qosEnv + config.Default + k
}
// Parse uint8 value
qos, err := strconv.ParseUint(env.Get(qosEnv, kv.Get(target.MqttQoS)), 10, 8)
if err != nil {
return nil, err
}
topicEnv := target.EnvMQTTTopic
if k != config.Default {
topicEnv = topicEnv + config.Default + k
}
usernameEnv := target.EnvMQTTUsername
if k != config.Default {
usernameEnv = usernameEnv + config.Default + k
}
passwordEnv := target.EnvMQTTPassword
if k != config.Default {
passwordEnv = passwordEnv + config.Default + k
}
queueDirEnv := target.EnvMQTTQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
mqttArgs := target.MQTTArgs{
Enable: enabled,
Broker: *brokerURL,
Topic: env.Get(topicEnv, kv.Get(target.MqttTopic)),
QoS: byte(qos),
User: env.Get(usernameEnv, kv.Get(target.MqttUsername)),
Password: env.Get(passwordEnv, kv.Get(target.MqttPassword)),
MaxReconnectInterval: reconnectInterval,
KeepAlive: keepAliveInterval,
RootCAs: rootCAs,
QueueDir: env.Get(queueDirEnv, kv.Get(target.MqttQueueDir)),
QueueLimit: queueLimit,
}
if err = mqttArgs.Validate(); err != nil {
return nil, err
}
mqttTargets[k] = mqttArgs
}
return mqttTargets, nil
}
// DefaultMySQLKVS - default KV for MySQL
var (
DefaultMySQLKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.MySQLFormat,
Value: formatNamespace,
},
config.KV{
Key: target.MySQLDSNString,
Value: "",
},
config.KV{
Key: target.MySQLTable,
Value: "",
},
config.KV{
Key: target.MySQLQueueDir,
Value: "",
},
config.KV{
Key: target.MySQLQueueLimit,
Value: "0",
},
config.KV{
Key: target.MySQLMaxOpenConnections,
Value: "2",
},
}
)
// GetNotifyMySQL - returns a map of registered notification 'mysql' targets
func GetNotifyMySQL(mysqlKVS map[string]config.KVS) (map[string]target.MySQLArgs, error) {
mysqlTargets := make(map[string]target.MySQLArgs)
for k, kv := range config.Merge(mysqlKVS, target.EnvMySQLEnable, DefaultMySQLKVS) {
enableEnv := target.EnvMySQLEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
queueLimitEnv := target.EnvMySQLQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.MySQLQueueLimit)), 10, 64)
if err != nil {
return nil, err
}
formatEnv := target.EnvMySQLFormat
if k != config.Default {
formatEnv = formatEnv + config.Default + k
}
dsnStringEnv := target.EnvMySQLDSNString
if k != config.Default {
dsnStringEnv = dsnStringEnv + config.Default + k
}
tableEnv := target.EnvMySQLTable
if k != config.Default {
tableEnv = tableEnv + config.Default + k
}
queueDirEnv := target.EnvMySQLQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
maxOpenConnectionsEnv := target.EnvMySQLMaxOpenConnections
if k != config.Default {
maxOpenConnectionsEnv = maxOpenConnectionsEnv + config.Default + k
}
maxOpenConnections, cErr := strconv.Atoi(env.Get(maxOpenConnectionsEnv, kv.Get(target.MySQLMaxOpenConnections)))
if cErr != nil {
return nil, cErr
}
mysqlArgs := target.MySQLArgs{
Enable: enabled,
Format: env.Get(formatEnv, kv.Get(target.MySQLFormat)),
DSN: env.Get(dsnStringEnv, kv.Get(target.MySQLDSNString)),
Table: env.Get(tableEnv, kv.Get(target.MySQLTable)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.MySQLQueueDir)),
QueueLimit: queueLimit,
MaxOpenConnections: maxOpenConnections,
}
if err = mysqlArgs.Validate(); err != nil {
return nil, err
}
mysqlTargets[k] = mysqlArgs
}
return mysqlTargets, nil
}
// DefaultNATSKVS - NATS KV for nats config.
var (
DefaultNATSKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.NATSAddress,
Value: "",
},
config.KV{
Key: target.NATSSubject,
Value: "",
},
config.KV{
Key: target.NATSUsername,
Value: "",
},
config.KV{
Key: target.NATSPassword,
Value: "",
},
config.KV{
Key: target.NATSToken,
Value: "",
},
config.KV{
Key: target.NATSTLS,
Value: config.EnableOff,
},
config.KV{
Key: target.NATSTLSSkipVerify,
Value: config.EnableOff,
},
config.KV{
Key: target.NATSCertAuthority,
Value: "",
},
config.KV{
Key: target.NATSClientCert,
Value: "",
},
config.KV{
Key: target.NATSClientKey,
Value: "",
},
config.KV{
Key: target.NATSPingInterval,
Value: "0",
},
config.KV{
Key: target.NATSJetStream,
Value: config.EnableOff,
},
config.KV{
Key: target.NATSStreaming,
Value: config.EnableOff,
HiddenIfEmpty: true,
},
config.KV{
Key: target.NATSStreamingAsync,
Value: config.EnableOff,
HiddenIfEmpty: true,
},
config.KV{
Key: target.NATSStreamingMaxPubAcksInFlight,
Value: "0",
HiddenIfEmpty: true,
},
config.KV{
Key: target.NATSStreamingClusterID,
Value: "",
HiddenIfEmpty: true,
},
config.KV{
Key: target.NATSQueueDir,
Value: "",
},
config.KV{
Key: target.NATSQueueLimit,
Value: "0",
},
}
)
// GetNotifyNATS - returns a map of registered notification 'nats' targets
func GetNotifyNATS(natsKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[string]target.NATSArgs, error) {
natsTargets := make(map[string]target.NATSArgs)
for k, kv := range config.Merge(natsKVS, target.EnvNATSEnable, DefaultNATSKVS) {
enableEnv := target.EnvNATSEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
addressEnv := target.EnvNATSAddress
if k != config.Default {
addressEnv = addressEnv + config.Default + k
}
address, err := xnet.ParseHost(env.Get(addressEnv, kv.Get(target.NATSAddress)))
if err != nil {
return nil, err
}
pingIntervalEnv := target.EnvNATSPingInterval
if k != config.Default {
pingIntervalEnv = pingIntervalEnv + config.Default + k
}
pingInterval, err := strconv.ParseInt(env.Get(pingIntervalEnv, kv.Get(target.NATSPingInterval)), 10, 64)
if err != nil {
return nil, err
}
queueLimitEnv := target.EnvNATSQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.NATSQueueLimit)), 10, 64)
if err != nil {
return nil, err
}
tlsEnv := target.EnvNATSTLS
if k != config.Default {
tlsEnv = tlsEnv + config.Default + k
}
tlsSkipVerifyEnv := target.EnvNATSTLSSkipVerify
if k != config.Default {
tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k
}
subjectEnv := target.EnvNATSSubject
if k != config.Default {
subjectEnv = subjectEnv + config.Default + k
}
usernameEnv := target.EnvNATSUsername
if k != config.Default {
usernameEnv = usernameEnv + config.Default + k
}
userCredentialsEnv := target.NATSUserCredentials
if k != config.Default {
userCredentialsEnv = userCredentialsEnv + config.Default + k
}
passwordEnv := target.EnvNATSPassword
if k != config.Default {
passwordEnv = passwordEnv + config.Default + k
}
tokenEnv := target.EnvNATSToken
if k != config.Default {
tokenEnv = tokenEnv + config.Default + k
}
queueDirEnv := target.EnvNATSQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
certAuthorityEnv := target.EnvNATSCertAuthority
if k != config.Default {
certAuthorityEnv = certAuthorityEnv + config.Default + k
}
clientCertEnv := target.EnvNATSClientCert
if k != config.Default {
clientCertEnv = clientCertEnv + config.Default + k
}
clientKeyEnv := target.EnvNATSClientKey
if k != config.Default {
clientKeyEnv = clientKeyEnv + config.Default + k
}
jetStreamEnableEnv := target.EnvNATSJetStream
if k != config.Default {
jetStreamEnableEnv = jetStreamEnableEnv + config.Default + k
}
natsArgs := target.NATSArgs{
Enable: true,
Address: *address,
Subject: env.Get(subjectEnv, kv.Get(target.NATSSubject)),
Username: env.Get(usernameEnv, kv.Get(target.NATSUsername)),
UserCredentials: env.Get(userCredentialsEnv, kv.Get(target.NATSUserCredentials)),
Password: env.Get(passwordEnv, kv.Get(target.NATSPassword)),
CertAuthority: env.Get(certAuthorityEnv, kv.Get(target.NATSCertAuthority)),
ClientCert: env.Get(clientCertEnv, kv.Get(target.NATSClientCert)),
ClientKey: env.Get(clientKeyEnv, kv.Get(target.NATSClientKey)),
Token: env.Get(tokenEnv, kv.Get(target.NATSToken)),
TLS: env.Get(tlsEnv, kv.Get(target.NATSTLS)) == config.EnableOn,
TLSSkipVerify: env.Get(tlsSkipVerifyEnv, kv.Get(target.NATSTLSSkipVerify)) == config.EnableOn,
PingInterval: pingInterval,
QueueDir: env.Get(queueDirEnv, kv.Get(target.NATSQueueDir)),
QueueLimit: queueLimit,
RootCAs: rootCAs,
}
natsArgs.JetStream.Enable = env.Get(jetStreamEnableEnv, kv.Get(target.NATSJetStream)) == config.EnableOn
streamingEnableEnv := target.EnvNATSStreaming
if k != config.Default {
streamingEnableEnv = streamingEnableEnv + config.Default + k
}
streamingEnabled := env.Get(streamingEnableEnv, kv.Get(target.NATSStreaming)) == config.EnableOn
if streamingEnabled {
asyncEnv := target.EnvNATSStreamingAsync
if k != config.Default {
asyncEnv = asyncEnv + config.Default + k
}
maxPubAcksInflightEnv := target.EnvNATSStreamingMaxPubAcksInFlight
if k != config.Default {
maxPubAcksInflightEnv = maxPubAcksInflightEnv + config.Default + k
}
maxPubAcksInflight, err := strconv.Atoi(env.Get(maxPubAcksInflightEnv,
kv.Get(target.NATSStreamingMaxPubAcksInFlight)))
if err != nil {
return nil, err
}
clusterIDEnv := target.EnvNATSStreamingClusterID
if k != config.Default {
clusterIDEnv = clusterIDEnv + config.Default + k
}
natsArgs.Streaming.Enable = streamingEnabled
natsArgs.Streaming.ClusterID = env.Get(clusterIDEnv, kv.Get(target.NATSStreamingClusterID))
natsArgs.Streaming.Async = env.Get(asyncEnv, kv.Get(target.NATSStreamingAsync)) == config.EnableOn
natsArgs.Streaming.MaxPubAcksInflight = maxPubAcksInflight
}
if err = natsArgs.Validate(); err != nil {
return nil, err
}
natsTargets[k] = natsArgs
}
return natsTargets, nil
}
// DefaultNSQKVS - NSQ KV for config
var (
DefaultNSQKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.NSQAddress,
Value: "",
},
config.KV{
Key: target.NSQTopic,
Value: "",
},
config.KV{
Key: target.NSQTLS,
Value: config.EnableOff,
},
config.KV{
Key: target.NSQTLSSkipVerify,
Value: config.EnableOff,
},
config.KV{
Key: target.NSQQueueDir,
Value: "",
},
config.KV{
Key: target.NSQQueueLimit,
Value: "0",
},
}
)
// GetNotifyNSQ - returns a map of registered notification 'nsq' targets
func GetNotifyNSQ(nsqKVS map[string]config.KVS) (map[string]target.NSQArgs, error) {
nsqTargets := make(map[string]target.NSQArgs)
for k, kv := range config.Merge(nsqKVS, target.EnvNSQEnable, DefaultNSQKVS) {
enableEnv := target.EnvNSQEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
addressEnv := target.EnvNSQAddress
if k != config.Default {
addressEnv = addressEnv + config.Default + k
}
nsqdAddress, err := xnet.ParseHost(env.Get(addressEnv, kv.Get(target.NSQAddress)))
if err != nil {
return nil, err
}
tlsEnableEnv := target.EnvNSQTLS
if k != config.Default {
tlsEnableEnv = tlsEnableEnv + config.Default + k
}
tlsSkipVerifyEnv := target.EnvNSQTLSSkipVerify
if k != config.Default {
tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k
}
queueLimitEnv := target.EnvNSQQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.NSQQueueLimit)), 10, 64)
if err != nil {
return nil, err
}
topicEnv := target.EnvNSQTopic
if k != config.Default {
topicEnv = topicEnv + config.Default + k
}
queueDirEnv := target.EnvNSQQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
nsqArgs := target.NSQArgs{
Enable: enabled,
NSQDAddress: *nsqdAddress,
Topic: env.Get(topicEnv, kv.Get(target.NSQTopic)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.NSQQueueDir)),
QueueLimit: queueLimit,
}
nsqArgs.TLS.Enable = env.Get(tlsEnableEnv, kv.Get(target.NSQTLS)) == config.EnableOn
nsqArgs.TLS.SkipVerify = env.Get(tlsSkipVerifyEnv, kv.Get(target.NSQTLSSkipVerify)) == config.EnableOn
if err = nsqArgs.Validate(); err != nil {
return nil, err
}
nsqTargets[k] = nsqArgs
}
return nsqTargets, nil
}
// DefaultPostgresKVS - default Postgres KV for server config.
var (
DefaultPostgresKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.PostgresFormat,
Value: formatNamespace,
},
config.KV{
Key: target.PostgresConnectionString,
Value: "",
},
config.KV{
Key: target.PostgresTable,
Value: "",
},
config.KV{
Key: target.PostgresQueueDir,
Value: "",
},
config.KV{
Key: target.PostgresQueueLimit,
Value: "0",
},
config.KV{
Key: target.PostgresMaxOpenConnections,
Value: "2",
},
}
)
// GetNotifyPostgres - returns a map of registered notification 'postgres' targets
func GetNotifyPostgres(postgresKVS map[string]config.KVS) (map[string]target.PostgreSQLArgs, error) {
psqlTargets := make(map[string]target.PostgreSQLArgs)
for k, kv := range config.Merge(postgresKVS, target.EnvPostgresEnable, DefaultPostgresKVS) {
enableEnv := target.EnvPostgresEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
queueLimitEnv := target.EnvPostgresQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.PostgresQueueLimit)))
if err != nil {
return nil, err
}
formatEnv := target.EnvPostgresFormat
if k != config.Default {
formatEnv = formatEnv + config.Default + k
}
connectionStringEnv := target.EnvPostgresConnectionString
if k != config.Default {
connectionStringEnv = connectionStringEnv + config.Default + k
}
tableEnv := target.EnvPostgresTable
if k != config.Default {
tableEnv = tableEnv + config.Default + k
}
queueDirEnv := target.EnvPostgresQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
maxOpenConnectionsEnv := target.EnvPostgresMaxOpenConnections
if k != config.Default {
maxOpenConnectionsEnv = maxOpenConnectionsEnv + config.Default + k
}
maxOpenConnections, cErr := strconv.Atoi(env.Get(maxOpenConnectionsEnv, kv.Get(target.PostgresMaxOpenConnections)))
if cErr != nil {
return nil, cErr
}
psqlArgs := target.PostgreSQLArgs{
Enable: enabled,
Format: env.Get(formatEnv, kv.Get(target.PostgresFormat)),
ConnectionString: env.Get(connectionStringEnv, kv.Get(target.PostgresConnectionString)),
Table: env.Get(tableEnv, kv.Get(target.PostgresTable)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.PostgresQueueDir)),
QueueLimit: uint64(queueLimit),
MaxOpenConnections: maxOpenConnections,
}
if err = psqlArgs.Validate(); err != nil {
return nil, err
}
psqlTargets[k] = psqlArgs
}
return psqlTargets, nil
}
// DefaultRedisKVS - default KV for redis config
var (
DefaultRedisKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.RedisFormat,
Value: formatNamespace,
},
config.KV{
Key: target.RedisAddress,
Value: "",
},
config.KV{
Key: target.RedisKey,
Value: "",
},
config.KV{
Key: target.RedisPassword,
Value: "",
},
config.KV{
Key: target.RedisUser,
Value: "",
},
config.KV{
Key: target.RedisQueueDir,
Value: "",
},
config.KV{
Key: target.RedisQueueLimit,
Value: "0",
},
}
)
// GetNotifyRedis - returns a map of registered notification 'redis' targets
func GetNotifyRedis(redisKVS map[string]config.KVS) (map[string]target.RedisArgs, error) {
redisTargets := make(map[string]target.RedisArgs)
for k, kv := range config.Merge(redisKVS, target.EnvRedisEnable, DefaultRedisKVS) {
enableEnv := target.EnvRedisEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
addressEnv := target.EnvRedisAddress
if k != config.Default {
addressEnv = addressEnv + config.Default + k
}
addr, err := xnet.ParseHost(env.Get(addressEnv, kv.Get(target.RedisAddress)))
if err != nil {
return nil, err
}
queueLimitEnv := target.EnvRedisQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.RedisQueueLimit)))
if err != nil {
return nil, err
}
formatEnv := target.EnvRedisFormat
if k != config.Default {
formatEnv = formatEnv + config.Default + k
}
passwordEnv := target.EnvRedisPassword
if k != config.Default {
passwordEnv = passwordEnv + config.Default + k
}
userEnv := target.EnvRedisUser
if k != config.Default {
userEnv = userEnv + config.Default + k
}
keyEnv := target.EnvRedisKey
if k != config.Default {
keyEnv = keyEnv + config.Default + k
}
queueDirEnv := target.EnvRedisQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
redisArgs := target.RedisArgs{
Enable: enabled,
Format: env.Get(formatEnv, kv.Get(target.RedisFormat)),
Addr: *addr,
Password: env.Get(passwordEnv, kv.Get(target.RedisPassword)),
User: env.Get(userEnv, kv.Get(target.RedisUser)),
Key: env.Get(keyEnv, kv.Get(target.RedisKey)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.RedisQueueDir)),
QueueLimit: uint64(queueLimit),
}
if err = redisArgs.Validate(); err != nil {
return nil, err
}
redisTargets[k] = redisArgs
}
return redisTargets, nil
}
// DefaultWebhookKVS - default KV for webhook config
var (
DefaultWebhookKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.WebhookEndpoint,
Value: "",
},
config.KV{
Key: target.WebhookAuthToken,
Value: "",
},
config.KV{
Key: target.WebhookQueueLimit,
Value: "0",
},
config.KV{
Key: target.WebhookQueueDir,
Value: "",
},
config.KV{
Key: target.WebhookClientCert,
Value: "",
},
config.KV{
Key: target.WebhookClientKey,
Value: "",
},
}
)
// GetNotifyWebhook - returns a map of registered notification 'webhook' targets
func GetNotifyWebhook(webhookKVS map[string]config.KVS, transport *http.Transport) (
map[string]target.WebhookArgs, error,
) {
webhookTargets := make(map[string]target.WebhookArgs)
for k, kv := range config.Merge(webhookKVS, target.EnvWebhookEnable, DefaultWebhookKVS) {
enableEnv := target.EnvWebhookEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
urlEnv := target.EnvWebhookEndpoint
if k != config.Default {
urlEnv = urlEnv + config.Default + k
}
url, err := xnet.ParseHTTPURL(env.Get(urlEnv, kv.Get(target.WebhookEndpoint)))
if err != nil {
return nil, err
}
queueLimitEnv := target.EnvWebhookQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.WebhookQueueLimit)))
if err != nil {
return nil, err
}
queueDirEnv := target.EnvWebhookQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
authEnv := target.EnvWebhookAuthToken
if k != config.Default {
authEnv = authEnv + config.Default + k
}
clientCertEnv := target.EnvWebhookClientCert
if k != config.Default {
clientCertEnv = clientCertEnv + config.Default + k
}
clientKeyEnv := target.EnvWebhookClientKey
if k != config.Default {
clientKeyEnv = clientKeyEnv + config.Default + k
}
webhookArgs := target.WebhookArgs{
Enable: enabled,
Endpoint: *url,
Transport: transport,
AuthToken: env.Get(authEnv, kv.Get(target.WebhookAuthToken)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.WebhookQueueDir)),
QueueLimit: uint64(queueLimit),
ClientCert: env.Get(clientCertEnv, kv.Get(target.WebhookClientCert)),
ClientKey: env.Get(clientKeyEnv, kv.Get(target.WebhookClientKey)),
}
if err = webhookArgs.Validate(); err != nil {
return nil, err
}
webhookTargets[k] = webhookArgs
}
return webhookTargets, nil
}
// DefaultESKVS - default KV config for Elasticsearch target
var (
DefaultESKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.ElasticURL,
Value: "",
},
config.KV{
Key: target.ElasticFormat,
Value: formatNamespace,
},
config.KV{
Key: target.ElasticIndex,
Value: "",
},
config.KV{
Key: target.ElasticQueueDir,
Value: "",
},
config.KV{
Key: target.ElasticQueueLimit,
Value: "0",
},
config.KV{
Key: target.ElasticUsername,
Value: "",
},
config.KV{
Key: target.ElasticPassword,
Value: "",
},
}
)
// GetNotifyES - returns a map of registered notification 'elasticsearch' targets
func GetNotifyES(esKVS map[string]config.KVS, transport *http.Transport) (map[string]target.ElasticsearchArgs, error) {
esTargets := make(map[string]target.ElasticsearchArgs)
for k, kv := range config.Merge(esKVS, target.EnvElasticEnable, DefaultESKVS) {
enableEnv := target.EnvElasticEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
urlEnv := target.EnvElasticURL
if k != config.Default {
urlEnv = urlEnv + config.Default + k
}
url, err := xnet.ParseHTTPURL(env.Get(urlEnv, kv.Get(target.ElasticURL)))
if err != nil {
return nil, err
}
queueLimitEnv := target.EnvElasticQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.ElasticQueueLimit)))
if err != nil {
return nil, err
}
formatEnv := target.EnvElasticFormat
if k != config.Default {
formatEnv = formatEnv + config.Default + k
}
indexEnv := target.EnvElasticIndex
if k != config.Default {
indexEnv = indexEnv + config.Default + k
}
queueDirEnv := target.EnvElasticQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
usernameEnv := target.EnvElasticUsername
if k != config.Default {
usernameEnv = usernameEnv + config.Default + k
}
passwordEnv := target.EnvElasticPassword
if k != config.Default {
passwordEnv = passwordEnv + config.Default + k
}
esArgs := target.ElasticsearchArgs{
Enable: enabled,
Format: env.Get(formatEnv, kv.Get(target.ElasticFormat)),
URL: *url,
Index: env.Get(indexEnv, kv.Get(target.ElasticIndex)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.ElasticQueueDir)),
QueueLimit: uint64(queueLimit),
Transport: transport,
Username: env.Get(usernameEnv, kv.Get(target.ElasticUsername)),
Password: env.Get(passwordEnv, kv.Get(target.ElasticPassword)),
}
if err = esArgs.Validate(); err != nil {
return nil, err
}
esTargets[k] = esArgs
}
return esTargets, nil
}
// DefaultAMQPKVS - default KV for AMQP config
var (
DefaultAMQPKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpURL,
Value: "",
},
config.KV{
Key: target.AmqpExchange,
Value: "",
},
config.KV{
Key: target.AmqpExchangeType,
Value: "",
},
config.KV{
Key: target.AmqpRoutingKey,
Value: "",
},
config.KV{
Key: target.AmqpMandatory,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpDurable,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpNoWait,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpInternal,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpAutoDeleted,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpDeliveryMode,
Value: "0",
},
config.KV{
Key: target.AmqpPublisherConfirms,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpQueueLimit,
Value: "0",
},
config.KV{
Key: target.AmqpQueueDir,
Value: "",
},
}
)
// GetNotifyAMQP - returns a map of registered notification 'amqp' targets
func GetNotifyAMQP(amqpKVS map[string]config.KVS) (map[string]target.AMQPArgs, error) {
amqpTargets := make(map[string]target.AMQPArgs)
for k, kv := range config.Merge(amqpKVS, target.EnvAMQPEnable, DefaultAMQPKVS) {
enableEnv := target.EnvAMQPEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
urlEnv := target.EnvAMQPURL
if k != config.Default {
urlEnv = urlEnv + config.Default + k
}
url, err := xnet.ParseURL(env.Get(urlEnv, kv.Get(target.AmqpURL)))
if err != nil {
return nil, err
}
deliveryModeEnv := target.EnvAMQPDeliveryMode
if k != config.Default {
deliveryModeEnv = deliveryModeEnv + config.Default + k
}
deliveryMode, err := strconv.Atoi(env.Get(deliveryModeEnv, kv.Get(target.AmqpDeliveryMode)))
if err != nil {
return nil, err
}
exchangeEnv := target.EnvAMQPExchange
if k != config.Default {
exchangeEnv = exchangeEnv + config.Default + k
}
routingKeyEnv := target.EnvAMQPRoutingKey
if k != config.Default {
routingKeyEnv = routingKeyEnv + config.Default + k
}
exchangeTypeEnv := target.EnvAMQPExchangeType
if k != config.Default {
exchangeTypeEnv = exchangeTypeEnv + config.Default + k
}
mandatoryEnv := target.EnvAMQPMandatory
if k != config.Default {
mandatoryEnv = mandatoryEnv + config.Default + k
}
immediateEnv := target.EnvAMQPImmediate
if k != config.Default {
immediateEnv = immediateEnv + config.Default + k
}
durableEnv := target.EnvAMQPDurable
if k != config.Default {
durableEnv = durableEnv + config.Default + k
}
internalEnv := target.EnvAMQPInternal
if k != config.Default {
internalEnv = internalEnv + config.Default + k
}
noWaitEnv := target.EnvAMQPNoWait
if k != config.Default {
noWaitEnv = noWaitEnv + config.Default + k
}
autoDeletedEnv := target.EnvAMQPAutoDeleted
if k != config.Default {
autoDeletedEnv = autoDeletedEnv + config.Default + k
}
publisherConfirmsEnv := target.EnvAMQPPublisherConfirms
if k != config.Default {
publisherConfirmsEnv = publisherConfirmsEnv + config.Default + k
}
queueDirEnv := target.EnvAMQPQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
queueLimitEnv := target.EnvAMQPQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.AmqpQueueLimit)), 10, 64)
if err != nil {
return nil, err
}
amqpArgs := target.AMQPArgs{
Enable: enabled,
URL: *url,
Exchange: env.Get(exchangeEnv, kv.Get(target.AmqpExchange)),
RoutingKey: env.Get(routingKeyEnv, kv.Get(target.AmqpRoutingKey)),
ExchangeType: env.Get(exchangeTypeEnv, kv.Get(target.AmqpExchangeType)),
DeliveryMode: uint8(deliveryMode),
Mandatory: env.Get(mandatoryEnv, kv.Get(target.AmqpMandatory)) == config.EnableOn,
Immediate: env.Get(immediateEnv, kv.Get(target.AmqpImmediate)) == config.EnableOn,
Durable: env.Get(durableEnv, kv.Get(target.AmqpDurable)) == config.EnableOn,
Internal: env.Get(internalEnv, kv.Get(target.AmqpInternal)) == config.EnableOn,
NoWait: env.Get(noWaitEnv, kv.Get(target.AmqpNoWait)) == config.EnableOn,
AutoDeleted: env.Get(autoDeletedEnv, kv.Get(target.AmqpAutoDeleted)) == config.EnableOn,
PublisherConfirms: env.Get(publisherConfirmsEnv, kv.Get(target.AmqpPublisherConfirms)) == config.EnableOn,
QueueDir: env.Get(queueDirEnv, kv.Get(target.AmqpQueueDir)),
QueueLimit: queueLimit,
}
if err = amqpArgs.Validate(); err != nil {
return nil, err
}
amqpTargets[k] = amqpArgs
}
return amqpTargets, nil
}