minio/internal/config/notify/parse.go

1725 lines
45 KiB
Go

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package notify
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/minio/minio/internal/config"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/event/target"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/env"
xnet "github.com/minio/pkg/v2/net"
)
const (
formatNamespace = "namespace"
)
// ErrTargetsOffline - Indicates single/multiple target failures.
var ErrTargetsOffline = errors.New("one or more targets are offline. Please use `mc admin info --json` to check the offline targets")
// TestSubSysNotificationTargets - tests notification targets of given subsystem
func TestSubSysNotificationTargets(ctx context.Context, cfg config.Config, subSys string, transport *http.Transport) error {
if err := checkValidNotificationKeysForSubSys(subSys, cfg[subSys]); err != nil {
return err
}
targetList, err := fetchSubSysTargets(ctx, cfg, subSys, transport)
if err != nil {
return err
}
for _, target := range targetList {
defer target.Close()
}
tgts, ok := ctx.Value(config.ContextKeyForTargetFromConfig).(map[string]bool)
if !ok {
tgts = make(map[string]bool)
}
for _, target := range targetList {
if tgts[target.ID().ID] {
// When target set should be online
yes, err := target.IsActive()
if err == nil && !yes {
err = ErrTargetsOffline
}
if err != nil {
return fmt.Errorf("error (%s): %w", target.ID(), err)
}
} else {
// Just for call init.
// Ignore target is online or offline
_, _ = target.IsActive()
}
}
return nil
}
func fetchSubSysTargets(ctx context.Context, cfg config.Config, subSys string, transport *http.Transport) (targets []event.Target, err error) {
if err := checkValidNotificationKeysForSubSys(subSys, cfg[subSys]); err != nil {
return nil, err
}
switch subSys {
case config.NotifyAMQPSubSys:
amqpTargets, err := GetNotifyAMQP(cfg[config.NotifyAMQPSubSys])
if err != nil {
return nil, err
}
for id, args := range amqpTargets {
if !args.Enable {
continue
}
t, err := target.NewAMQPTarget(id, args, logger.LogOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyESSubSys:
esTargets, err := GetNotifyES(cfg[config.NotifyESSubSys], transport)
if err != nil {
return nil, err
}
for id, args := range esTargets {
if !args.Enable {
continue
}
t, err := target.NewElasticsearchTarget(id, args, logger.LogOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyKafkaSubSys:
kafkaTargets, err := GetNotifyKafka(cfg[config.NotifyKafkaSubSys])
if err != nil {
return nil, err
}
for id, args := range kafkaTargets {
if !args.Enable {
continue
}
args.TLS.RootCAs = transport.TLSClientConfig.RootCAs
t, err := target.NewKafkaTarget(id, args, logger.LogOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyMQTTSubSys:
mqttTargets, err := GetNotifyMQTT(cfg[config.NotifyMQTTSubSys], transport.TLSClientConfig.RootCAs)
if err != nil {
return nil, err
}
for id, args := range mqttTargets {
if !args.Enable {
continue
}
args.RootCAs = transport.TLSClientConfig.RootCAs
t, err := target.NewMQTTTarget(id, args, logger.LogOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyMySQLSubSys:
mysqlTargets, err := GetNotifyMySQL(cfg[config.NotifyMySQLSubSys])
if err != nil {
return nil, err
}
for id, args := range mysqlTargets {
if !args.Enable {
continue
}
t, err := target.NewMySQLTarget(id, args, logger.LogOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyNATSSubSys:
natsTargets, err := GetNotifyNATS(cfg[config.NotifyNATSSubSys], transport.TLSClientConfig.RootCAs)
if err != nil {
return nil, err
}
for id, args := range natsTargets {
if !args.Enable {
continue
}
t, err := target.NewNATSTarget(id, args, logger.LogOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyNSQSubSys:
nsqTargets, err := GetNotifyNSQ(cfg[config.NotifyNSQSubSys])
if err != nil {
return nil, err
}
for id, args := range nsqTargets {
if !args.Enable {
continue
}
t, err := target.NewNSQTarget(id, args, logger.LogOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyPostgresSubSys:
postgresTargets, err := GetNotifyPostgres(cfg[config.NotifyPostgresSubSys])
if err != nil {
return nil, err
}
for id, args := range postgresTargets {
if !args.Enable {
continue
}
t, err := target.NewPostgreSQLTarget(id, args, logger.LogOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyRedisSubSys:
redisTargets, err := GetNotifyRedis(cfg[config.NotifyRedisSubSys])
if err != nil {
return nil, err
}
for id, args := range redisTargets {
if !args.Enable {
continue
}
t, err := target.NewRedisTarget(id, args, logger.LogOnceIf)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
case config.NotifyWebhookSubSys:
webhookTargets, err := GetNotifyWebhook(cfg[config.NotifyWebhookSubSys], transport)
if err != nil {
return nil, err
}
for id, args := range webhookTargets {
if !args.Enable {
continue
}
t, err := target.NewWebhookTarget(ctx, id, args, logger.LogOnceIf, transport)
if err != nil {
return nil, err
}
targets = append(targets, t)
}
}
return targets, nil
}
// FetchEnabledTargets - Returns a set of configured TargetList
func FetchEnabledTargets(ctx context.Context, cfg config.Config, transport *http.Transport) (_ *event.TargetList, err error) {
targetList := event.NewTargetList()
for _, subSys := range config.NotifySubSystems.ToSlice() {
targets, err := fetchSubSysTargets(ctx, cfg, subSys, transport)
if err != nil {
return nil, err
}
for _, t := range targets {
if err = targetList.Add(t); err != nil {
return nil, err
}
}
}
return targetList, nil
}
// DefaultNotificationKVS - default notification list of kvs.
var (
DefaultNotificationKVS = map[string]config.KVS{
config.NotifyAMQPSubSys: DefaultAMQPKVS,
config.NotifyKafkaSubSys: DefaultKafkaKVS,
config.NotifyMQTTSubSys: DefaultMQTTKVS,
config.NotifyMySQLSubSys: DefaultMySQLKVS,
config.NotifyNATSSubSys: DefaultNATSKVS,
config.NotifyNSQSubSys: DefaultNSQKVS,
config.NotifyPostgresSubSys: DefaultPostgresKVS,
config.NotifyRedisSubSys: DefaultRedisKVS,
config.NotifyWebhookSubSys: DefaultWebhookKVS,
config.NotifyESSubSys: DefaultESKVS,
}
)
func checkValidNotificationKeysForSubSys(subSys string, tgt map[string]config.KVS) error {
validKVS, ok := DefaultNotificationKVS[subSys]
if !ok {
return nil
}
for tname, kv := range tgt {
subSysTarget := subSys
if tname != config.Default {
subSysTarget = subSys + config.SubSystemSeparator + tname
}
if v, ok := kv.Lookup(config.Enable); ok && v == config.EnableOn {
if err := config.CheckValidKeys(subSysTarget, kv, validKVS); err != nil {
return err
}
}
}
return nil
}
// DefaultKakfaKVS - default KV for kafka target
var (
DefaultKafkaKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.KafkaTopic,
Value: "",
},
config.KV{
Key: target.KafkaBrokers,
Value: "",
},
config.KV{
Key: target.KafkaSASLUsername,
Value: "",
},
config.KV{
Key: target.KafkaSASLPassword,
Value: "",
},
config.KV{
Key: target.KafkaSASLMechanism,
Value: "plain",
},
config.KV{
Key: target.KafkaClientTLSCert,
Value: "",
},
config.KV{
Key: target.KafkaClientTLSKey,
Value: "",
},
config.KV{
Key: target.KafkaTLSClientAuth,
Value: "0",
},
config.KV{
Key: target.KafkaSASL,
Value: config.EnableOff,
},
config.KV{
Key: target.KafkaTLS,
Value: config.EnableOff,
},
config.KV{
Key: target.KafkaTLSSkipVerify,
Value: config.EnableOff,
},
config.KV{
Key: target.KafkaQueueLimit,
Value: "0",
},
config.KV{
Key: target.KafkaQueueDir,
Value: "",
},
config.KV{
Key: target.KafkaVersion,
Value: "",
},
config.KV{
Key: target.KafkaBatchSize,
Value: "0",
},
}
)
// GetNotifyKafka - returns a map of registered notification 'kafka' targets
func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs, error) {
kafkaTargets := make(map[string]target.KafkaArgs)
for k, kv := range config.Merge(kafkaKVS, target.EnvKafkaEnable, DefaultKafkaKVS) {
enableEnv := target.EnvKafkaEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
var brokers []xnet.Host
brokersEnv := target.EnvKafkaBrokers
if k != config.Default {
brokersEnv = brokersEnv + config.Default + k
}
kafkaBrokers := env.Get(brokersEnv, kv.Get(target.KafkaBrokers))
if len(kafkaBrokers) == 0 {
return nil, config.Errorf("kafka 'brokers' cannot be empty")
}
for _, s := range strings.Split(kafkaBrokers, config.ValueSeparator) {
var host *xnet.Host
host, err = xnet.ParseHost(s)
if err != nil {
break
}
brokers = append(brokers, *host)
}
if err != nil {
return nil, err
}
queueLimitEnv := target.EnvKafkaQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.KafkaQueueLimit)), 10, 64)
if err != nil {
return nil, err
}
clientAuthEnv := target.EnvKafkaTLSClientAuth
if k != config.Default {
clientAuthEnv = clientAuthEnv + config.Default + k
}
clientAuth, err := strconv.Atoi(env.Get(clientAuthEnv, kv.Get(target.KafkaTLSClientAuth)))
if err != nil {
return nil, err
}
topicEnv := target.EnvKafkaTopic
if k != config.Default {
topicEnv = topicEnv + config.Default + k
}
queueDirEnv := target.EnvKafkaQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
versionEnv := target.EnvKafkaVersion
if k != config.Default {
versionEnv = versionEnv + config.Default + k
}
batchSizeEnv := target.EnvKafkaBatchSize
if k != config.Default {
batchSizeEnv = batchSizeEnv + config.Default + k
}
batchSize, err := strconv.ParseUint(env.Get(batchSizeEnv, kv.Get(target.KafkaBatchSize)), 10, 32)
if err != nil {
return nil, err
}
kafkaArgs := target.KafkaArgs{
Enable: enabled,
Brokers: brokers,
Topic: env.Get(topicEnv, kv.Get(target.KafkaTopic)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.KafkaQueueDir)),
QueueLimit: queueLimit,
Version: env.Get(versionEnv, kv.Get(target.KafkaVersion)),
BatchSize: uint32(batchSize),
}
tlsEnableEnv := target.EnvKafkaTLS
if k != config.Default {
tlsEnableEnv = tlsEnableEnv + config.Default + k
}
tlsSkipVerifyEnv := target.EnvKafkaTLSSkipVerify
if k != config.Default {
tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k
}
tlsClientTLSCertEnv := target.EnvKafkaClientTLSCert
if k != config.Default {
tlsClientTLSCertEnv = tlsClientTLSCertEnv + config.Default + k
}
tlsClientTLSKeyEnv := target.EnvKafkaClientTLSKey
if k != config.Default {
tlsClientTLSKeyEnv = tlsClientTLSKeyEnv + config.Default + k
}
kafkaArgs.TLS.Enable = env.Get(tlsEnableEnv, kv.Get(target.KafkaTLS)) == config.EnableOn
kafkaArgs.TLS.SkipVerify = env.Get(tlsSkipVerifyEnv, kv.Get(target.KafkaTLSSkipVerify)) == config.EnableOn
kafkaArgs.TLS.ClientAuth = tls.ClientAuthType(clientAuth)
kafkaArgs.TLS.ClientTLSCert = env.Get(tlsClientTLSCertEnv, kv.Get(target.KafkaClientTLSCert))
kafkaArgs.TLS.ClientTLSKey = env.Get(tlsClientTLSKeyEnv, kv.Get(target.KafkaClientTLSKey))
saslEnableEnv := target.EnvKafkaSASLEnable
if k != config.Default {
saslEnableEnv = saslEnableEnv + config.Default + k
}
saslUsernameEnv := target.EnvKafkaSASLUsername
if k != config.Default {
saslUsernameEnv = saslUsernameEnv + config.Default + k
}
saslPasswordEnv := target.EnvKafkaSASLPassword
if k != config.Default {
saslPasswordEnv = saslPasswordEnv + config.Default + k
}
saslMechanismEnv := target.EnvKafkaSASLMechanism
if k != config.Default {
saslMechanismEnv = saslMechanismEnv + config.Default + k
}
kafkaArgs.SASL.Enable = env.Get(saslEnableEnv, kv.Get(target.KafkaSASL)) == config.EnableOn
kafkaArgs.SASL.User = env.Get(saslUsernameEnv, kv.Get(target.KafkaSASLUsername))
kafkaArgs.SASL.Password = env.Get(saslPasswordEnv, kv.Get(target.KafkaSASLPassword))
kafkaArgs.SASL.Mechanism = env.Get(saslMechanismEnv, kv.Get(target.KafkaSASLMechanism))
if err = kafkaArgs.Validate(); err != nil {
return nil, err
}
kafkaTargets[k] = kafkaArgs
}
return kafkaTargets, nil
}
// DefaultMQTTKVS - default MQTT config
var (
DefaultMQTTKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.MqttBroker,
Value: "",
},
config.KV{
Key: target.MqttTopic,
Value: "",
},
config.KV{
Key: target.MqttPassword,
Value: "",
},
config.KV{
Key: target.MqttUsername,
Value: "",
},
config.KV{
Key: target.MqttQoS,
Value: "0",
},
config.KV{
Key: target.MqttKeepAliveInterval,
Value: "0s",
},
config.KV{
Key: target.MqttReconnectInterval,
Value: "0s",
},
config.KV{
Key: target.MqttQueueDir,
Value: "",
},
config.KV{
Key: target.MqttQueueLimit,
Value: "0",
},
}
)
// GetNotifyMQTT - returns a map of registered notification 'mqtt' targets
func GetNotifyMQTT(mqttKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[string]target.MQTTArgs, error) {
mqttTargets := make(map[string]target.MQTTArgs)
for k, kv := range config.Merge(mqttKVS, target.EnvMQTTEnable, DefaultMQTTKVS) {
enableEnv := target.EnvMQTTEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
brokerEnv := target.EnvMQTTBroker
if k != config.Default {
brokerEnv = brokerEnv + config.Default + k
}
brokerURL, err := xnet.ParseURL(env.Get(brokerEnv, kv.Get(target.MqttBroker)))
if err != nil {
return nil, err
}
reconnectIntervalEnv := target.EnvMQTTReconnectInterval
if k != config.Default {
reconnectIntervalEnv = reconnectIntervalEnv + config.Default + k
}
reconnectInterval, err := time.ParseDuration(env.Get(reconnectIntervalEnv,
kv.Get(target.MqttReconnectInterval)))
if err != nil {
return nil, err
}
keepAliveIntervalEnv := target.EnvMQTTKeepAliveInterval
if k != config.Default {
keepAliveIntervalEnv = keepAliveIntervalEnv + config.Default + k
}
keepAliveInterval, err := time.ParseDuration(env.Get(keepAliveIntervalEnv,
kv.Get(target.MqttKeepAliveInterval)))
if err != nil {
return nil, err
}
queueLimitEnv := target.EnvMQTTQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.MqttQueueLimit)), 10, 64)
if err != nil {
return nil, err
}
qosEnv := target.EnvMQTTQoS
if k != config.Default {
qosEnv = qosEnv + config.Default + k
}
// Parse uint8 value
qos, err := strconv.ParseUint(env.Get(qosEnv, kv.Get(target.MqttQoS)), 10, 8)
if err != nil {
return nil, err
}
topicEnv := target.EnvMQTTTopic
if k != config.Default {
topicEnv = topicEnv + config.Default + k
}
usernameEnv := target.EnvMQTTUsername
if k != config.Default {
usernameEnv = usernameEnv + config.Default + k
}
passwordEnv := target.EnvMQTTPassword
if k != config.Default {
passwordEnv = passwordEnv + config.Default + k
}
queueDirEnv := target.EnvMQTTQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
mqttArgs := target.MQTTArgs{
Enable: enabled,
Broker: *brokerURL,
Topic: env.Get(topicEnv, kv.Get(target.MqttTopic)),
QoS: byte(qos),
User: env.Get(usernameEnv, kv.Get(target.MqttUsername)),
Password: env.Get(passwordEnv, kv.Get(target.MqttPassword)),
MaxReconnectInterval: reconnectInterval,
KeepAlive: keepAliveInterval,
RootCAs: rootCAs,
QueueDir: env.Get(queueDirEnv, kv.Get(target.MqttQueueDir)),
QueueLimit: queueLimit,
}
if err = mqttArgs.Validate(); err != nil {
return nil, err
}
mqttTargets[k] = mqttArgs
}
return mqttTargets, nil
}
// DefaultMySQLKVS - default KV for MySQL
var (
DefaultMySQLKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.MySQLFormat,
Value: formatNamespace,
},
config.KV{
Key: target.MySQLDSNString,
Value: "",
},
config.KV{
Key: target.MySQLTable,
Value: "",
},
config.KV{
Key: target.MySQLQueueDir,
Value: "",
},
config.KV{
Key: target.MySQLQueueLimit,
Value: "0",
},
config.KV{
Key: target.MySQLMaxOpenConnections,
Value: "2",
},
}
)
// GetNotifyMySQL - returns a map of registered notification 'mysql' targets
func GetNotifyMySQL(mysqlKVS map[string]config.KVS) (map[string]target.MySQLArgs, error) {
mysqlTargets := make(map[string]target.MySQLArgs)
for k, kv := range config.Merge(mysqlKVS, target.EnvMySQLEnable, DefaultMySQLKVS) {
enableEnv := target.EnvMySQLEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
queueLimitEnv := target.EnvMySQLQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.MySQLQueueLimit)), 10, 64)
if err != nil {
return nil, err
}
formatEnv := target.EnvMySQLFormat
if k != config.Default {
formatEnv = formatEnv + config.Default + k
}
dsnStringEnv := target.EnvMySQLDSNString
if k != config.Default {
dsnStringEnv = dsnStringEnv + config.Default + k
}
tableEnv := target.EnvMySQLTable
if k != config.Default {
tableEnv = tableEnv + config.Default + k
}
queueDirEnv := target.EnvMySQLQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
maxOpenConnectionsEnv := target.EnvMySQLMaxOpenConnections
if k != config.Default {
maxOpenConnectionsEnv = maxOpenConnectionsEnv + config.Default + k
}
maxOpenConnections, cErr := strconv.Atoi(env.Get(maxOpenConnectionsEnv, kv.Get(target.MySQLMaxOpenConnections)))
if cErr != nil {
return nil, cErr
}
mysqlArgs := target.MySQLArgs{
Enable: enabled,
Format: env.Get(formatEnv, kv.Get(target.MySQLFormat)),
DSN: env.Get(dsnStringEnv, kv.Get(target.MySQLDSNString)),
Table: env.Get(tableEnv, kv.Get(target.MySQLTable)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.MySQLQueueDir)),
QueueLimit: queueLimit,
MaxOpenConnections: maxOpenConnections,
}
if err = mysqlArgs.Validate(); err != nil {
return nil, err
}
mysqlTargets[k] = mysqlArgs
}
return mysqlTargets, nil
}
// DefaultNATSKVS - NATS KV for nats config.
var (
DefaultNATSKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.NATSAddress,
Value: "",
},
config.KV{
Key: target.NATSSubject,
Value: "",
},
config.KV{
Key: target.NATSUsername,
Value: "",
},
config.KV{
Key: target.NATSPassword,
Value: "",
},
config.KV{
Key: target.NATSToken,
Value: "",
},
config.KV{
Key: target.NATSTLS,
Value: config.EnableOff,
},
config.KV{
Key: target.NATSTLSSkipVerify,
Value: config.EnableOff,
},
config.KV{
Key: target.NATSCertAuthority,
Value: "",
},
config.KV{
Key: target.NATSClientCert,
Value: "",
},
config.KV{
Key: target.NATSClientKey,
Value: "",
},
config.KV{
Key: target.NATSPingInterval,
Value: "0",
},
config.KV{
Key: target.NATSJetStream,
Value: config.EnableOff,
},
config.KV{
Key: target.NATSStreaming,
Value: config.EnableOff,
},
config.KV{
Key: target.NATSStreamingAsync,
Value: config.EnableOff,
},
config.KV{
Key: target.NATSStreamingMaxPubAcksInFlight,
Value: "0",
},
config.KV{
Key: target.NATSStreamingClusterID,
Value: "",
},
config.KV{
Key: target.NATSQueueDir,
Value: "",
},
config.KV{
Key: target.NATSQueueLimit,
Value: "0",
},
}
)
// GetNotifyNATS - returns a map of registered notification 'nats' targets
func GetNotifyNATS(natsKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[string]target.NATSArgs, error) {
natsTargets := make(map[string]target.NATSArgs)
for k, kv := range config.Merge(natsKVS, target.EnvNATSEnable, DefaultNATSKVS) {
enableEnv := target.EnvNATSEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
addressEnv := target.EnvNATSAddress
if k != config.Default {
addressEnv = addressEnv + config.Default + k
}
address, err := xnet.ParseHost(env.Get(addressEnv, kv.Get(target.NATSAddress)))
if err != nil {
return nil, err
}
pingIntervalEnv := target.EnvNATSPingInterval
if k != config.Default {
pingIntervalEnv = pingIntervalEnv + config.Default + k
}
pingInterval, err := strconv.ParseInt(env.Get(pingIntervalEnv, kv.Get(target.NATSPingInterval)), 10, 64)
if err != nil {
return nil, err
}
queueLimitEnv := target.EnvNATSQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.NATSQueueLimit)), 10, 64)
if err != nil {
return nil, err
}
tlsEnv := target.EnvNATSTLS
if k != config.Default {
tlsEnv = tlsEnv + config.Default + k
}
tlsSkipVerifyEnv := target.EnvNATSTLSSkipVerify
if k != config.Default {
tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k
}
subjectEnv := target.EnvNATSSubject
if k != config.Default {
subjectEnv = subjectEnv + config.Default + k
}
usernameEnv := target.EnvNATSUsername
if k != config.Default {
usernameEnv = usernameEnv + config.Default + k
}
passwordEnv := target.EnvNATSPassword
if k != config.Default {
passwordEnv = passwordEnv + config.Default + k
}
tokenEnv := target.EnvNATSToken
if k != config.Default {
tokenEnv = tokenEnv + config.Default + k
}
queueDirEnv := target.EnvNATSQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
certAuthorityEnv := target.EnvNATSCertAuthority
if k != config.Default {
certAuthorityEnv = certAuthorityEnv + config.Default + k
}
clientCertEnv := target.EnvNATSClientCert
if k != config.Default {
clientCertEnv = clientCertEnv + config.Default + k
}
clientKeyEnv := target.EnvNATSClientKey
if k != config.Default {
clientKeyEnv = clientKeyEnv + config.Default + k
}
jetStreamEnableEnv := target.EnvNATSJetStream
if k != config.Default {
jetStreamEnableEnv = jetStreamEnableEnv + config.Default + k
}
natsArgs := target.NATSArgs{
Enable: true,
Address: *address,
Subject: env.Get(subjectEnv, kv.Get(target.NATSSubject)),
Username: env.Get(usernameEnv, kv.Get(target.NATSUsername)),
Password: env.Get(passwordEnv, kv.Get(target.NATSPassword)),
CertAuthority: env.Get(certAuthorityEnv, kv.Get(target.NATSCertAuthority)),
ClientCert: env.Get(clientCertEnv, kv.Get(target.NATSClientCert)),
ClientKey: env.Get(clientKeyEnv, kv.Get(target.NATSClientKey)),
Token: env.Get(tokenEnv, kv.Get(target.NATSToken)),
TLS: env.Get(tlsEnv, kv.Get(target.NATSTLS)) == config.EnableOn,
TLSSkipVerify: env.Get(tlsSkipVerifyEnv, kv.Get(target.NATSTLSSkipVerify)) == config.EnableOn,
PingInterval: pingInterval,
QueueDir: env.Get(queueDirEnv, kv.Get(target.NATSQueueDir)),
QueueLimit: queueLimit,
RootCAs: rootCAs,
}
natsArgs.JetStream.Enable = env.Get(jetStreamEnableEnv, kv.Get(target.NATSJetStream)) == config.EnableOn
streamingEnableEnv := target.EnvNATSStreaming
if k != config.Default {
streamingEnableEnv = streamingEnableEnv + config.Default + k
}
streamingEnabled := env.Get(streamingEnableEnv, kv.Get(target.NATSStreaming)) == config.EnableOn
if streamingEnabled {
asyncEnv := target.EnvNATSStreamingAsync
if k != config.Default {
asyncEnv = asyncEnv + config.Default + k
}
maxPubAcksInflightEnv := target.EnvNATSStreamingMaxPubAcksInFlight
if k != config.Default {
maxPubAcksInflightEnv = maxPubAcksInflightEnv + config.Default + k
}
maxPubAcksInflight, err := strconv.Atoi(env.Get(maxPubAcksInflightEnv,
kv.Get(target.NATSStreamingMaxPubAcksInFlight)))
if err != nil {
return nil, err
}
clusterIDEnv := target.EnvNATSStreamingClusterID
if k != config.Default {
clusterIDEnv = clusterIDEnv + config.Default + k
}
natsArgs.Streaming.Enable = streamingEnabled
natsArgs.Streaming.ClusterID = env.Get(clusterIDEnv, kv.Get(target.NATSStreamingClusterID))
natsArgs.Streaming.Async = env.Get(asyncEnv, kv.Get(target.NATSStreamingAsync)) == config.EnableOn
natsArgs.Streaming.MaxPubAcksInflight = maxPubAcksInflight
}
if err = natsArgs.Validate(); err != nil {
return nil, err
}
natsTargets[k] = natsArgs
}
return natsTargets, nil
}
// DefaultNSQKVS - NSQ KV for config
var (
DefaultNSQKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.NSQAddress,
Value: "",
},
config.KV{
Key: target.NSQTopic,
Value: "",
},
config.KV{
Key: target.NSQTLS,
Value: config.EnableOff,
},
config.KV{
Key: target.NSQTLSSkipVerify,
Value: config.EnableOff,
},
config.KV{
Key: target.NSQQueueDir,
Value: "",
},
config.KV{
Key: target.NSQQueueLimit,
Value: "0",
},
}
)
// GetNotifyNSQ - returns a map of registered notification 'nsq' targets
func GetNotifyNSQ(nsqKVS map[string]config.KVS) (map[string]target.NSQArgs, error) {
nsqTargets := make(map[string]target.NSQArgs)
for k, kv := range config.Merge(nsqKVS, target.EnvNSQEnable, DefaultNSQKVS) {
enableEnv := target.EnvNSQEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
addressEnv := target.EnvNSQAddress
if k != config.Default {
addressEnv = addressEnv + config.Default + k
}
nsqdAddress, err := xnet.ParseHost(env.Get(addressEnv, kv.Get(target.NSQAddress)))
if err != nil {
return nil, err
}
tlsEnableEnv := target.EnvNSQTLS
if k != config.Default {
tlsEnableEnv = tlsEnableEnv + config.Default + k
}
tlsSkipVerifyEnv := target.EnvNSQTLSSkipVerify
if k != config.Default {
tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k
}
queueLimitEnv := target.EnvNSQQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.NSQQueueLimit)), 10, 64)
if err != nil {
return nil, err
}
topicEnv := target.EnvNSQTopic
if k != config.Default {
topicEnv = topicEnv + config.Default + k
}
queueDirEnv := target.EnvNSQQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
nsqArgs := target.NSQArgs{
Enable: enabled,
NSQDAddress: *nsqdAddress,
Topic: env.Get(topicEnv, kv.Get(target.NSQTopic)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.NSQQueueDir)),
QueueLimit: queueLimit,
}
nsqArgs.TLS.Enable = env.Get(tlsEnableEnv, kv.Get(target.NSQTLS)) == config.EnableOn
nsqArgs.TLS.SkipVerify = env.Get(tlsSkipVerifyEnv, kv.Get(target.NSQTLSSkipVerify)) == config.EnableOn
if err = nsqArgs.Validate(); err != nil {
return nil, err
}
nsqTargets[k] = nsqArgs
}
return nsqTargets, nil
}
// DefaultPostgresKVS - default Postgres KV for server config.
var (
DefaultPostgresKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.PostgresFormat,
Value: formatNamespace,
},
config.KV{
Key: target.PostgresConnectionString,
Value: "",
},
config.KV{
Key: target.PostgresTable,
Value: "",
},
config.KV{
Key: target.PostgresQueueDir,
Value: "",
},
config.KV{
Key: target.PostgresQueueLimit,
Value: "0",
},
config.KV{
Key: target.PostgresMaxOpenConnections,
Value: "2",
},
}
)
// GetNotifyPostgres - returns a map of registered notification 'postgres' targets
func GetNotifyPostgres(postgresKVS map[string]config.KVS) (map[string]target.PostgreSQLArgs, error) {
psqlTargets := make(map[string]target.PostgreSQLArgs)
for k, kv := range config.Merge(postgresKVS, target.EnvPostgresEnable, DefaultPostgresKVS) {
enableEnv := target.EnvPostgresEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
queueLimitEnv := target.EnvPostgresQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.PostgresQueueLimit)))
if err != nil {
return nil, err
}
formatEnv := target.EnvPostgresFormat
if k != config.Default {
formatEnv = formatEnv + config.Default + k
}
connectionStringEnv := target.EnvPostgresConnectionString
if k != config.Default {
connectionStringEnv = connectionStringEnv + config.Default + k
}
tableEnv := target.EnvPostgresTable
if k != config.Default {
tableEnv = tableEnv + config.Default + k
}
queueDirEnv := target.EnvPostgresQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
maxOpenConnectionsEnv := target.EnvPostgresMaxOpenConnections
if k != config.Default {
maxOpenConnectionsEnv = maxOpenConnectionsEnv + config.Default + k
}
maxOpenConnections, cErr := strconv.Atoi(env.Get(maxOpenConnectionsEnv, kv.Get(target.PostgresMaxOpenConnections)))
if cErr != nil {
return nil, cErr
}
psqlArgs := target.PostgreSQLArgs{
Enable: enabled,
Format: env.Get(formatEnv, kv.Get(target.PostgresFormat)),
ConnectionString: env.Get(connectionStringEnv, kv.Get(target.PostgresConnectionString)),
Table: env.Get(tableEnv, kv.Get(target.PostgresTable)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.PostgresQueueDir)),
QueueLimit: uint64(queueLimit),
MaxOpenConnections: maxOpenConnections,
}
if err = psqlArgs.Validate(); err != nil {
return nil, err
}
psqlTargets[k] = psqlArgs
}
return psqlTargets, nil
}
// DefaultRedisKVS - default KV for redis config
var (
DefaultRedisKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.RedisFormat,
Value: formatNamespace,
},
config.KV{
Key: target.RedisAddress,
Value: "",
},
config.KV{
Key: target.RedisKey,
Value: "",
},
config.KV{
Key: target.RedisPassword,
Value: "",
},
config.KV{
Key: target.RedisQueueDir,
Value: "",
},
config.KV{
Key: target.RedisQueueLimit,
Value: "0",
},
}
)
// GetNotifyRedis - returns a map of registered notification 'redis' targets
func GetNotifyRedis(redisKVS map[string]config.KVS) (map[string]target.RedisArgs, error) {
redisTargets := make(map[string]target.RedisArgs)
for k, kv := range config.Merge(redisKVS, target.EnvRedisEnable, DefaultRedisKVS) {
enableEnv := target.EnvRedisEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
addressEnv := target.EnvRedisAddress
if k != config.Default {
addressEnv = addressEnv + config.Default + k
}
addr, err := xnet.ParseHost(env.Get(addressEnv, kv.Get(target.RedisAddress)))
if err != nil {
return nil, err
}
queueLimitEnv := target.EnvRedisQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.RedisQueueLimit)))
if err != nil {
return nil, err
}
formatEnv := target.EnvRedisFormat
if k != config.Default {
formatEnv = formatEnv + config.Default + k
}
passwordEnv := target.EnvRedisPassword
if k != config.Default {
passwordEnv = passwordEnv + config.Default + k
}
keyEnv := target.EnvRedisKey
if k != config.Default {
keyEnv = keyEnv + config.Default + k
}
queueDirEnv := target.EnvRedisQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
redisArgs := target.RedisArgs{
Enable: enabled,
Format: env.Get(formatEnv, kv.Get(target.RedisFormat)),
Addr: *addr,
Password: env.Get(passwordEnv, kv.Get(target.RedisPassword)),
Key: env.Get(keyEnv, kv.Get(target.RedisKey)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.RedisQueueDir)),
QueueLimit: uint64(queueLimit),
}
if err = redisArgs.Validate(); err != nil {
return nil, err
}
redisTargets[k] = redisArgs
}
return redisTargets, nil
}
// DefaultWebhookKVS - default KV for webhook config
var (
DefaultWebhookKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.WebhookEndpoint,
Value: "",
},
config.KV{
Key: target.WebhookAuthToken,
Value: "",
},
config.KV{
Key: target.WebhookQueueLimit,
Value: "0",
},
config.KV{
Key: target.WebhookQueueDir,
Value: "",
},
config.KV{
Key: target.WebhookClientCert,
Value: "",
},
config.KV{
Key: target.WebhookClientKey,
Value: "",
},
}
)
// GetNotifyWebhook - returns a map of registered notification 'webhook' targets
func GetNotifyWebhook(webhookKVS map[string]config.KVS, transport *http.Transport) (
map[string]target.WebhookArgs, error,
) {
webhookTargets := make(map[string]target.WebhookArgs)
for k, kv := range config.Merge(webhookKVS, target.EnvWebhookEnable, DefaultWebhookKVS) {
enableEnv := target.EnvWebhookEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
urlEnv := target.EnvWebhookEndpoint
if k != config.Default {
urlEnv = urlEnv + config.Default + k
}
url, err := xnet.ParseHTTPURL(env.Get(urlEnv, kv.Get(target.WebhookEndpoint)))
if err != nil {
return nil, err
}
queueLimitEnv := target.EnvWebhookQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.WebhookQueueLimit)))
if err != nil {
return nil, err
}
queueDirEnv := target.EnvWebhookQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
authEnv := target.EnvWebhookAuthToken
if k != config.Default {
authEnv = authEnv + config.Default + k
}
clientCertEnv := target.EnvWebhookClientCert
if k != config.Default {
clientCertEnv = clientCertEnv + config.Default + k
}
clientKeyEnv := target.EnvWebhookClientKey
if k != config.Default {
clientKeyEnv = clientKeyEnv + config.Default + k
}
webhookArgs := target.WebhookArgs{
Enable: enabled,
Endpoint: *url,
Transport: transport,
AuthToken: env.Get(authEnv, kv.Get(target.WebhookAuthToken)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.WebhookQueueDir)),
QueueLimit: uint64(queueLimit),
ClientCert: env.Get(clientCertEnv, kv.Get(target.WebhookClientCert)),
ClientKey: env.Get(clientKeyEnv, kv.Get(target.WebhookClientKey)),
}
if err = webhookArgs.Validate(); err != nil {
return nil, err
}
webhookTargets[k] = webhookArgs
}
return webhookTargets, nil
}
// DefaultESKVS - default KV config for Elasticsearch target
var (
DefaultESKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.ElasticURL,
Value: "",
},
config.KV{
Key: target.ElasticFormat,
Value: formatNamespace,
},
config.KV{
Key: target.ElasticIndex,
Value: "",
},
config.KV{
Key: target.ElasticQueueDir,
Value: "",
},
config.KV{
Key: target.ElasticQueueLimit,
Value: "0",
},
config.KV{
Key: target.ElasticUsername,
Value: "",
},
config.KV{
Key: target.ElasticPassword,
Value: "",
},
}
)
// GetNotifyES - returns a map of registered notification 'elasticsearch' targets
func GetNotifyES(esKVS map[string]config.KVS, transport *http.Transport) (map[string]target.ElasticsearchArgs, error) {
esTargets := make(map[string]target.ElasticsearchArgs)
for k, kv := range config.Merge(esKVS, target.EnvElasticEnable, DefaultESKVS) {
enableEnv := target.EnvElasticEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
urlEnv := target.EnvElasticURL
if k != config.Default {
urlEnv = urlEnv + config.Default + k
}
url, err := xnet.ParseHTTPURL(env.Get(urlEnv, kv.Get(target.ElasticURL)))
if err != nil {
return nil, err
}
queueLimitEnv := target.EnvElasticQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.ElasticQueueLimit)))
if err != nil {
return nil, err
}
formatEnv := target.EnvElasticFormat
if k != config.Default {
formatEnv = formatEnv + config.Default + k
}
indexEnv := target.EnvElasticIndex
if k != config.Default {
indexEnv = indexEnv + config.Default + k
}
queueDirEnv := target.EnvElasticQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
usernameEnv := target.EnvElasticUsername
if k != config.Default {
usernameEnv = usernameEnv + config.Default + k
}
passwordEnv := target.EnvElasticPassword
if k != config.Default {
passwordEnv = passwordEnv + config.Default + k
}
esArgs := target.ElasticsearchArgs{
Enable: enabled,
Format: env.Get(formatEnv, kv.Get(target.ElasticFormat)),
URL: *url,
Index: env.Get(indexEnv, kv.Get(target.ElasticIndex)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.ElasticQueueDir)),
QueueLimit: uint64(queueLimit),
Transport: transport,
Username: env.Get(usernameEnv, kv.Get(target.ElasticUsername)),
Password: env.Get(passwordEnv, kv.Get(target.ElasticPassword)),
}
if err = esArgs.Validate(); err != nil {
return nil, err
}
esTargets[k] = esArgs
}
return esTargets, nil
}
// DefaultAMQPKVS - default KV for AMQP config
var (
DefaultAMQPKVS = config.KVS{
config.KV{
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpURL,
Value: "",
},
config.KV{
Key: target.AmqpExchange,
Value: "",
},
config.KV{
Key: target.AmqpExchangeType,
Value: "",
},
config.KV{
Key: target.AmqpRoutingKey,
Value: "",
},
config.KV{
Key: target.AmqpMandatory,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpDurable,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpNoWait,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpInternal,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpAutoDeleted,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpDeliveryMode,
Value: "0",
},
config.KV{
Key: target.AmqpPublisherConfirms,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpQueueLimit,
Value: "0",
},
config.KV{
Key: target.AmqpQueueDir,
Value: "",
},
}
)
// GetNotifyAMQP - returns a map of registered notification 'amqp' targets
func GetNotifyAMQP(amqpKVS map[string]config.KVS) (map[string]target.AMQPArgs, error) {
amqpTargets := make(map[string]target.AMQPArgs)
for k, kv := range config.Merge(amqpKVS, target.EnvAMQPEnable, DefaultAMQPKVS) {
enableEnv := target.EnvAMQPEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
if !enabled {
continue
}
urlEnv := target.EnvAMQPURL
if k != config.Default {
urlEnv = urlEnv + config.Default + k
}
url, err := xnet.ParseURL(env.Get(urlEnv, kv.Get(target.AmqpURL)))
if err != nil {
return nil, err
}
deliveryModeEnv := target.EnvAMQPDeliveryMode
if k != config.Default {
deliveryModeEnv = deliveryModeEnv + config.Default + k
}
deliveryMode, err := strconv.Atoi(env.Get(deliveryModeEnv, kv.Get(target.AmqpDeliveryMode)))
if err != nil {
return nil, err
}
exchangeEnv := target.EnvAMQPExchange
if k != config.Default {
exchangeEnv = exchangeEnv + config.Default + k
}
routingKeyEnv := target.EnvAMQPRoutingKey
if k != config.Default {
routingKeyEnv = routingKeyEnv + config.Default + k
}
exchangeTypeEnv := target.EnvAMQPExchangeType
if k != config.Default {
exchangeTypeEnv = exchangeTypeEnv + config.Default + k
}
mandatoryEnv := target.EnvAMQPMandatory
if k != config.Default {
mandatoryEnv = mandatoryEnv + config.Default + k
}
immediateEnv := target.EnvAMQPImmediate
if k != config.Default {
immediateEnv = immediateEnv + config.Default + k
}
durableEnv := target.EnvAMQPDurable
if k != config.Default {
durableEnv = durableEnv + config.Default + k
}
internalEnv := target.EnvAMQPInternal
if k != config.Default {
internalEnv = internalEnv + config.Default + k
}
noWaitEnv := target.EnvAMQPNoWait
if k != config.Default {
noWaitEnv = noWaitEnv + config.Default + k
}
autoDeletedEnv := target.EnvAMQPAutoDeleted
if k != config.Default {
autoDeletedEnv = autoDeletedEnv + config.Default + k
}
publisherConfirmsEnv := target.EnvAMQPPublisherConfirms
if k != config.Default {
publisherConfirmsEnv = publisherConfirmsEnv + config.Default + k
}
queueDirEnv := target.EnvAMQPQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
queueLimitEnv := target.EnvAMQPQueueLimit
if k != config.Default {
queueLimitEnv = queueLimitEnv + config.Default + k
}
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.AmqpQueueLimit)), 10, 64)
if err != nil {
return nil, err
}
amqpArgs := target.AMQPArgs{
Enable: enabled,
URL: *url,
Exchange: env.Get(exchangeEnv, kv.Get(target.AmqpExchange)),
RoutingKey: env.Get(routingKeyEnv, kv.Get(target.AmqpRoutingKey)),
ExchangeType: env.Get(exchangeTypeEnv, kv.Get(target.AmqpExchangeType)),
DeliveryMode: uint8(deliveryMode),
Mandatory: env.Get(mandatoryEnv, kv.Get(target.AmqpMandatory)) == config.EnableOn,
Immediate: env.Get(immediateEnv, kv.Get(target.AmqpImmediate)) == config.EnableOn,
Durable: env.Get(durableEnv, kv.Get(target.AmqpDurable)) == config.EnableOn,
Internal: env.Get(internalEnv, kv.Get(target.AmqpInternal)) == config.EnableOn,
NoWait: env.Get(noWaitEnv, kv.Get(target.AmqpNoWait)) == config.EnableOn,
AutoDeleted: env.Get(autoDeletedEnv, kv.Get(target.AmqpAutoDeleted)) == config.EnableOn,
PublisherConfirms: env.Get(publisherConfirmsEnv, kv.Get(target.AmqpPublisherConfirms)) == config.EnableOn,
QueueDir: env.Get(queueDirEnv, kv.Get(target.AmqpQueueDir)),
QueueLimit: queueLimit,
}
if err = amqpArgs.Validate(); err != nil {
return nil, err
}
amqpTargets[k] = amqpArgs
}
return amqpTargets, nil
}