mirror of
https://github.com/minio/minio.git
synced 2025-01-23 20:53:18 -05:00
476111968a
Final update to all messages across sub-systems after final review, the only change here is that NATS now has TLS and TLSSkipVerify to be consistent for all other notification targets.
585 lines
12 KiB
Go
585 lines
12 KiB
Go
package notify
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/minio/minio/cmd/config"
|
|
"github.com/minio/minio/pkg/event/target"
|
|
)
|
|
|
|
// SetNotifyKafka - helper for config migration from older config.
|
|
func SetNotifyKafka(s config.Config, kName string, cfg target.KafkaArgs) error {
|
|
if !cfg.Enable {
|
|
return nil
|
|
}
|
|
|
|
if err := cfg.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
s[config.NotifyKafkaSubSys][kName] = config.KVS{
|
|
config.KV{
|
|
Key: config.Enable,
|
|
Value: config.EnableOn,
|
|
},
|
|
config.KV{
|
|
Key: target.KafkaBrokers,
|
|
Value: func() string {
|
|
var brokers []string
|
|
for _, broker := range cfg.Brokers {
|
|
brokers = append(brokers, broker.String())
|
|
}
|
|
return strings.Join(brokers, config.ValueSeparator)
|
|
}(),
|
|
},
|
|
config.KV{
|
|
Key: target.KafkaTopic,
|
|
Value: cfg.Topic,
|
|
},
|
|
config.KV{
|
|
Key: target.KafkaQueueDir,
|
|
Value: cfg.QueueDir,
|
|
},
|
|
config.KV{
|
|
Key: target.KafkaClientTLSCert,
|
|
Value: cfg.TLS.ClientTLSCert,
|
|
},
|
|
config.KV{
|
|
Key: target.KafkaClientTLSKey,
|
|
Value: cfg.TLS.ClientTLSKey,
|
|
},
|
|
config.KV{
|
|
Key: target.KafkaQueueLimit,
|
|
Value: strconv.Itoa(int(cfg.QueueLimit)),
|
|
},
|
|
config.KV{
|
|
Key: target.KafkaTLS,
|
|
Value: config.FormatBool(cfg.TLS.Enable),
|
|
},
|
|
config.KV{
|
|
Key: target.KafkaTLSSkipVerify,
|
|
Value: config.FormatBool(cfg.TLS.SkipVerify),
|
|
},
|
|
config.KV{
|
|
Key: target.KafkaTLSClientAuth,
|
|
Value: strconv.Itoa(int(cfg.TLS.ClientAuth)),
|
|
},
|
|
config.KV{
|
|
Key: target.KafkaSASL,
|
|
Value: config.FormatBool(cfg.SASL.Enable),
|
|
},
|
|
config.KV{
|
|
Key: target.KafkaSASLUsername,
|
|
Value: cfg.SASL.User,
|
|
},
|
|
config.KV{
|
|
Key: target.KafkaSASLPassword,
|
|
Value: cfg.SASL.Password,
|
|
},
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SetNotifyAMQP - helper for config migration from older config.
|
|
func SetNotifyAMQP(s config.Config, amqpName string, cfg target.AMQPArgs) error {
|
|
if !cfg.Enable {
|
|
return nil
|
|
}
|
|
|
|
if err := cfg.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
s[config.NotifyAMQPSubSys][amqpName] = config.KVS{
|
|
config.KV{
|
|
Key: config.Enable,
|
|
Value: config.EnableOn,
|
|
},
|
|
config.KV{
|
|
Key: target.AmqpURL,
|
|
Value: cfg.URL.String(),
|
|
},
|
|
config.KV{
|
|
Key: target.AmqpExchange,
|
|
Value: cfg.Exchange,
|
|
},
|
|
config.KV{
|
|
Key: target.AmqpRoutingKey,
|
|
Value: cfg.RoutingKey,
|
|
},
|
|
config.KV{
|
|
Key: target.AmqpExchangeType,
|
|
Value: cfg.ExchangeType,
|
|
},
|
|
config.KV{
|
|
Key: target.AmqpDeliveryMode,
|
|
Value: strconv.Itoa(int(cfg.DeliveryMode)),
|
|
},
|
|
config.KV{
|
|
Key: target.AmqpMandatory,
|
|
Value: config.FormatBool(cfg.Mandatory),
|
|
},
|
|
config.KV{
|
|
Key: target.AmqpInternal,
|
|
Value: config.FormatBool(cfg.Immediate),
|
|
},
|
|
config.KV{
|
|
Key: target.AmqpDurable,
|
|
Value: config.FormatBool(cfg.Durable),
|
|
},
|
|
config.KV{
|
|
Key: target.AmqpNoWait,
|
|
Value: config.FormatBool(cfg.NoWait),
|
|
},
|
|
config.KV{
|
|
Key: target.AmqpAutoDeleted,
|
|
Value: config.FormatBool(cfg.AutoDeleted),
|
|
},
|
|
config.KV{
|
|
Key: target.AmqpQueueDir,
|
|
Value: cfg.QueueDir,
|
|
},
|
|
config.KV{
|
|
Key: target.AmqpQueueLimit,
|
|
Value: strconv.Itoa(int(cfg.QueueLimit)),
|
|
},
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetNotifyES - helper for config migration from older config.
|
|
func SetNotifyES(s config.Config, esName string, cfg target.ElasticsearchArgs) error {
|
|
if !cfg.Enable {
|
|
return nil
|
|
}
|
|
|
|
if err := cfg.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
s[config.NotifyESSubSys][esName] = config.KVS{
|
|
config.KV{
|
|
Key: config.Enable,
|
|
Value: config.EnableOn,
|
|
},
|
|
config.KV{
|
|
Key: target.ElasticFormat,
|
|
Value: cfg.Format,
|
|
},
|
|
config.KV{
|
|
Key: target.ElasticURL,
|
|
Value: cfg.URL.String(),
|
|
},
|
|
config.KV{
|
|
Key: target.ElasticIndex,
|
|
Value: cfg.Index,
|
|
},
|
|
config.KV{
|
|
Key: target.ElasticQueueDir,
|
|
Value: cfg.QueueDir,
|
|
},
|
|
config.KV{
|
|
Key: target.ElasticQueueLimit,
|
|
Value: strconv.Itoa(int(cfg.QueueLimit)),
|
|
},
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetNotifyRedis - helper for config migration from older config.
|
|
func SetNotifyRedis(s config.Config, redisName string, cfg target.RedisArgs) error {
|
|
if !cfg.Enable {
|
|
return nil
|
|
}
|
|
|
|
if err := cfg.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
s[config.NotifyRedisSubSys][redisName] = config.KVS{
|
|
config.KV{
|
|
Key: config.Enable,
|
|
Value: config.EnableOn,
|
|
},
|
|
config.KV{
|
|
Key: target.RedisFormat,
|
|
Value: cfg.Format,
|
|
},
|
|
config.KV{
|
|
Key: target.RedisAddress,
|
|
Value: cfg.Addr.String(),
|
|
},
|
|
config.KV{
|
|
Key: target.RedisPassword,
|
|
Value: cfg.Password,
|
|
},
|
|
config.KV{
|
|
Key: target.RedisKey,
|
|
Value: cfg.Key,
|
|
},
|
|
config.KV{
|
|
Key: target.RedisQueueDir,
|
|
Value: cfg.QueueDir,
|
|
},
|
|
config.KV{
|
|
Key: target.RedisQueueLimit,
|
|
Value: strconv.Itoa(int(cfg.QueueLimit)),
|
|
},
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetNotifyWebhook - helper for config migration from older config.
|
|
func SetNotifyWebhook(s config.Config, whName string, cfg target.WebhookArgs) error {
|
|
if !cfg.Enable {
|
|
return nil
|
|
}
|
|
|
|
if err := cfg.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
s[config.NotifyWebhookSubSys][whName] = config.KVS{
|
|
config.KV{
|
|
Key: config.Enable,
|
|
Value: config.EnableOn,
|
|
},
|
|
config.KV{
|
|
Key: target.WebhookEndpoint,
|
|
Value: cfg.Endpoint.String(),
|
|
},
|
|
config.KV{
|
|
Key: target.WebhookAuthToken,
|
|
Value: cfg.AuthToken,
|
|
},
|
|
config.KV{
|
|
Key: target.WebhookQueueDir,
|
|
Value: cfg.QueueDir,
|
|
},
|
|
config.KV{
|
|
Key: target.WebhookQueueLimit,
|
|
Value: strconv.Itoa(int(cfg.QueueLimit)),
|
|
},
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetNotifyPostgres - helper for config migration from older config.
|
|
func SetNotifyPostgres(s config.Config, psqName string, cfg target.PostgreSQLArgs) error {
|
|
if !cfg.Enable {
|
|
return nil
|
|
}
|
|
|
|
if err := cfg.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
s[config.NotifyPostgresSubSys][psqName] = config.KVS{
|
|
config.KV{
|
|
Key: config.Enable,
|
|
Value: config.EnableOn,
|
|
},
|
|
config.KV{
|
|
Key: target.PostgresFormat,
|
|
Value: cfg.Format,
|
|
},
|
|
config.KV{
|
|
Key: target.PostgresConnectionString,
|
|
Value: cfg.ConnectionString,
|
|
},
|
|
config.KV{
|
|
Key: target.PostgresTable,
|
|
Value: cfg.Table,
|
|
},
|
|
config.KV{
|
|
Key: target.PostgresHost,
|
|
Value: cfg.Host.String(),
|
|
},
|
|
config.KV{
|
|
Key: target.PostgresPort,
|
|
Value: cfg.Port,
|
|
},
|
|
config.KV{
|
|
Key: target.PostgresUsername,
|
|
Value: cfg.User,
|
|
},
|
|
config.KV{
|
|
Key: target.PostgresPassword,
|
|
Value: cfg.Password,
|
|
},
|
|
config.KV{
|
|
Key: target.PostgresDatabase,
|
|
Value: cfg.Database,
|
|
},
|
|
config.KV{
|
|
Key: target.PostgresQueueDir,
|
|
Value: cfg.QueueDir,
|
|
},
|
|
config.KV{
|
|
Key: target.PostgresQueueLimit,
|
|
Value: strconv.Itoa(int(cfg.QueueLimit)),
|
|
},
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetNotifyNSQ - helper for config migration from older config.
|
|
func SetNotifyNSQ(s config.Config, nsqName string, cfg target.NSQArgs) error {
|
|
if !cfg.Enable {
|
|
return nil
|
|
}
|
|
|
|
if err := cfg.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
s[config.NotifyNSQSubSys][nsqName] = config.KVS{
|
|
config.KV{
|
|
Key: config.Enable,
|
|
Value: config.EnableOn,
|
|
},
|
|
config.KV{
|
|
Key: target.NSQAddress,
|
|
Value: cfg.NSQDAddress.String(),
|
|
},
|
|
config.KV{
|
|
Key: target.NSQTopic,
|
|
Value: cfg.Topic,
|
|
},
|
|
config.KV{
|
|
Key: target.NSQTLS,
|
|
Value: config.FormatBool(cfg.TLS.Enable),
|
|
},
|
|
config.KV{
|
|
Key: target.NSQTLSSkipVerify,
|
|
Value: config.FormatBool(cfg.TLS.SkipVerify),
|
|
},
|
|
config.KV{
|
|
Key: target.NSQQueueDir,
|
|
Value: cfg.QueueDir,
|
|
},
|
|
config.KV{
|
|
Key: target.NSQQueueLimit,
|
|
Value: strconv.Itoa(int(cfg.QueueLimit)),
|
|
},
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetNotifyNATS - helper for config migration from older config.
|
|
func SetNotifyNATS(s config.Config, natsName string, cfg target.NATSArgs) error {
|
|
if !cfg.Enable {
|
|
return nil
|
|
}
|
|
|
|
if err := cfg.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
s[config.NotifyNATSSubSys][natsName] = config.KVS{
|
|
config.KV{
|
|
Key: config.Enable,
|
|
Value: config.EnableOn,
|
|
},
|
|
config.KV{
|
|
Key: target.NATSAddress,
|
|
Value: cfg.Address.String(),
|
|
},
|
|
config.KV{
|
|
Key: target.NATSSubject,
|
|
Value: cfg.Subject,
|
|
},
|
|
config.KV{
|
|
Key: target.NATSUsername,
|
|
Value: cfg.Username,
|
|
},
|
|
config.KV{
|
|
Key: target.NATSPassword,
|
|
Value: cfg.Password,
|
|
},
|
|
config.KV{
|
|
Key: target.NATSToken,
|
|
Value: cfg.Token,
|
|
},
|
|
config.KV{
|
|
Key: target.NATSCertAuthority,
|
|
Value: cfg.CertAuthority,
|
|
},
|
|
config.KV{
|
|
Key: target.NATSClientCert,
|
|
Value: cfg.ClientCert,
|
|
},
|
|
config.KV{
|
|
Key: target.NATSClientKey,
|
|
Value: cfg.ClientKey,
|
|
},
|
|
config.KV{
|
|
Key: target.NATSTLS,
|
|
Value: config.FormatBool(cfg.Secure),
|
|
},
|
|
config.KV{
|
|
Key: target.NATSTLSSkipVerify,
|
|
Value: config.FormatBool(cfg.Secure),
|
|
},
|
|
config.KV{
|
|
Key: target.NATSPingInterval,
|
|
Value: strconv.FormatInt(cfg.PingInterval, 10),
|
|
},
|
|
config.KV{
|
|
Key: target.NATSQueueDir,
|
|
Value: cfg.QueueDir,
|
|
},
|
|
config.KV{
|
|
Key: target.NATSQueueLimit,
|
|
Value: strconv.Itoa(int(cfg.QueueLimit)),
|
|
},
|
|
config.KV{
|
|
Key: target.NATSStreaming,
|
|
Value: func() string {
|
|
if cfg.Streaming.Enable {
|
|
return config.EnableOn
|
|
}
|
|
return config.EnableOff
|
|
}(),
|
|
},
|
|
config.KV{
|
|
Key: target.NATSStreamingClusterID,
|
|
Value: cfg.Streaming.ClusterID,
|
|
},
|
|
config.KV{
|
|
Key: target.NATSStreamingAsync,
|
|
Value: config.FormatBool(cfg.Streaming.Async),
|
|
},
|
|
config.KV{
|
|
Key: target.NATSStreamingMaxPubAcksInFlight,
|
|
Value: strconv.Itoa(cfg.Streaming.MaxPubAcksInflight),
|
|
},
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetNotifyMySQL - helper for config migration from older config.
|
|
func SetNotifyMySQL(s config.Config, sqlName string, cfg target.MySQLArgs) error {
|
|
if !cfg.Enable {
|
|
return nil
|
|
}
|
|
|
|
if err := cfg.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
s[config.NotifyMySQLSubSys][sqlName] = config.KVS{
|
|
config.KV{
|
|
Key: config.Enable,
|
|
Value: config.EnableOn,
|
|
},
|
|
config.KV{
|
|
Key: target.MySQLFormat,
|
|
Value: cfg.Format,
|
|
},
|
|
config.KV{
|
|
Key: target.MySQLDSNString,
|
|
Value: cfg.DSN,
|
|
},
|
|
config.KV{
|
|
Key: target.MySQLTable,
|
|
Value: cfg.Table,
|
|
},
|
|
config.KV{
|
|
Key: target.MySQLHost,
|
|
Value: cfg.Host.String(),
|
|
},
|
|
config.KV{
|
|
Key: target.MySQLPort,
|
|
Value: cfg.Port,
|
|
},
|
|
config.KV{
|
|
Key: target.MySQLUsername,
|
|
Value: cfg.User,
|
|
},
|
|
config.KV{
|
|
Key: target.MySQLPassword,
|
|
Value: cfg.Password,
|
|
},
|
|
config.KV{
|
|
Key: target.MySQLDatabase,
|
|
Value: cfg.Database,
|
|
},
|
|
config.KV{
|
|
Key: target.MySQLQueueDir,
|
|
Value: cfg.QueueDir,
|
|
},
|
|
config.KV{
|
|
Key: target.MySQLQueueLimit,
|
|
Value: strconv.Itoa(int(cfg.QueueLimit)),
|
|
},
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetNotifyMQTT - helper for config migration from older config.
|
|
func SetNotifyMQTT(s config.Config, mqttName string, cfg target.MQTTArgs) error {
|
|
if !cfg.Enable {
|
|
return nil
|
|
}
|
|
|
|
if err := cfg.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
s[config.NotifyMQTTSubSys][mqttName] = config.KVS{
|
|
config.KV{
|
|
Key: config.Enable,
|
|
Value: config.EnableOn,
|
|
},
|
|
config.KV{
|
|
Key: target.MqttBroker,
|
|
Value: cfg.Broker.String(),
|
|
},
|
|
config.KV{
|
|
Key: target.MqttTopic,
|
|
Value: cfg.Topic,
|
|
},
|
|
config.KV{
|
|
Key: target.MqttQoS,
|
|
Value: fmt.Sprintf("%d", cfg.QoS),
|
|
},
|
|
config.KV{
|
|
Key: target.MqttUsername,
|
|
Value: cfg.User,
|
|
},
|
|
config.KV{
|
|
Key: target.MqttPassword,
|
|
Value: cfg.Password,
|
|
},
|
|
config.KV{
|
|
Key: target.MqttReconnectInterval,
|
|
Value: cfg.MaxReconnectInterval.String(),
|
|
},
|
|
config.KV{
|
|
Key: target.MqttKeepAliveInterval,
|
|
Value: cfg.KeepAlive.String(),
|
|
},
|
|
config.KV{
|
|
Key: target.MqttQueueDir,
|
|
Value: cfg.QueueDir,
|
|
},
|
|
config.KV{
|
|
Key: target.MqttQueueLimit,
|
|
Value: strconv.Itoa(int(cfg.QueueLimit)),
|
|
},
|
|
}
|
|
|
|
return nil
|
|
}
|