Add support for MQTT server as a notification target (#4474)

This implementation is similar to AMQP notifications:

* Notifications are published on a single topic as a JSON feed
* Topic is configurable, as is the QoS. Uses the paho.mqtt.golang
  library for the mqtt connection, and supports connections over tcp
  and websockets, with optional secure tls support.
* Additionally the minio server configuration has been bumped up
  so mqtt configuration can be added.
* Configuration migration code is added with tests.

MQTT is an ISO standard M2M/IoT messaging protocol and was
originally designed for applications for limited bandwidth
networks. Today it's use is growing in the IoT space.
This commit is contained in:
splinter98
2017-06-15 01:27:03 +01:00
committed by Harshavardhana
parent af8071c86a
commit 8293f546af
60 changed files with 5916 additions and 32 deletions

View File

@@ -144,6 +144,9 @@ func isValidQueueID(queueARN string) bool {
if isAMQPQueue(sqsARN) { // AMQP eueue.
amqpN := serverConfig.Notify.GetAMQPByID(sqsARN.AccountID)
return amqpN.Enable && amqpN.URL != ""
} else if isMQTTQueue(sqsARN) {
mqttN := serverConfig.Notify.GetMQTTByID(sqsARN.AccountID)
return mqttN.Enable && mqttN.Broker != ""
} else if isNATSQueue(sqsARN) {
natsN := serverConfig.Notify.GetNATSByID(sqsARN.AccountID)
return natsN.Enable && natsN.Address != ""
@@ -251,6 +254,7 @@ func validateNotificationConfig(nConfig notificationConfig) APIErrorCode {
// Unmarshals input value of AWS ARN format into minioSqs object.
// Returned value represents minio sqs types, currently supported are
// - amqp
// - mqtt
// - nats
// - elasticsearch
// - redis
@@ -273,6 +277,8 @@ func unmarshalSqsARN(queueARN string) (mSqs arnSQS) {
switch sqsType {
case queueTypeAMQP:
mSqs.Type = queueTypeAMQP
case queueTypeMQTT:
mSqs.Type = queueTypeMQTT
case queueTypeNATS:
mSqs.Type = queueTypeNATS
case queueTypeElastic:

View File

@@ -358,6 +358,11 @@ func TestUnmarshalSQSARN(t *testing.T) {
queueARN: "arn:minio:sqs:us-east-1:1:amqp",
Type: "amqp",
},
// Valid mqtt queue arn.
{
queueARN: "arn:minio:sqs:us-east-1:1:mqtt",
Type: "mqtt",
},
// Invalid empty queue arn.
{
queueARN: "",

View File

@@ -141,7 +141,13 @@ func migrateConfig() error {
return err
}
fallthrough
case v18:
case "18":
// Migrate version '17' to '18'.
if err = migrateV18ToV19(); err != nil {
return err
}
fallthrough
case v19:
// No migration needed. this always points to current version.
err = nil
}
@@ -1366,3 +1372,109 @@ func migrateV17ToV18() error {
log.Printf(configMigrateMSGTemplate, configFile, cv17.Version, srvConfig.Version)
return nil
}
func migrateV18ToV19() error {
configFile := getConfigFile()
cv18 := &serverConfigV18{}
_, err := quick.Load(configFile, cv18)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return fmt.Errorf("Unable to load config version 18. %v", err)
}
if cv18.Version != "18" {
return nil
}
// Copy over fields from V18 into V19 config struct
srvConfig := &serverConfigV18{
Logger: &loggers{},
Notify: &notifier{},
}
srvConfig.Version = "19"
srvConfig.Credential = cv18.Credential
srvConfig.Region = cv18.Region
if srvConfig.Region == "" {
// Region needs to be set for AWS Signature Version 4.
srvConfig.Region = globalMinioDefaultRegion
}
srvConfig.Logger.Console = cv18.Logger.Console
srvConfig.Logger.File = cv18.Logger.File
// check and set notifiers config
if len(cv18.Notify.AMQP) == 0 {
srvConfig.Notify.AMQP = make(map[string]amqpNotify)
srvConfig.Notify.AMQP["1"] = amqpNotify{}
} else {
// New deliveryMode parameter is added for AMQP,
// default value is already 0, so nothing to
// explicitly migrate here.
srvConfig.Notify.AMQP = cv18.Notify.AMQP
}
if len(cv18.Notify.ElasticSearch) == 0 {
srvConfig.Notify.ElasticSearch = make(map[string]elasticSearchNotify)
srvConfig.Notify.ElasticSearch["1"] = elasticSearchNotify{
Format: formatNamespace,
}
} else {
srvConfig.Notify.ElasticSearch = cv18.Notify.ElasticSearch
}
if len(cv18.Notify.Redis) == 0 {
srvConfig.Notify.Redis = make(map[string]redisNotify)
srvConfig.Notify.Redis["1"] = redisNotify{
Format: formatNamespace,
}
} else {
srvConfig.Notify.Redis = cv18.Notify.Redis
}
if len(cv18.Notify.PostgreSQL) == 0 {
srvConfig.Notify.PostgreSQL = make(map[string]postgreSQLNotify)
srvConfig.Notify.PostgreSQL["1"] = postgreSQLNotify{
Format: formatNamespace,
}
} else {
srvConfig.Notify.PostgreSQL = cv18.Notify.PostgreSQL
}
if len(cv18.Notify.Kafka) == 0 {
srvConfig.Notify.Kafka = make(map[string]kafkaNotify)
srvConfig.Notify.Kafka["1"] = kafkaNotify{}
} else {
srvConfig.Notify.Kafka = cv18.Notify.Kafka
}
if len(cv18.Notify.NATS) == 0 {
srvConfig.Notify.NATS = make(map[string]natsNotify)
srvConfig.Notify.NATS["1"] = natsNotify{}
} else {
srvConfig.Notify.NATS = cv18.Notify.NATS
}
if len(cv18.Notify.Webhook) == 0 {
srvConfig.Notify.Webhook = make(map[string]webhookNotify)
srvConfig.Notify.Webhook["1"] = webhookNotify{}
} else {
srvConfig.Notify.Webhook = cv18.Notify.Webhook
}
if len(cv18.Notify.MySQL) == 0 {
srvConfig.Notify.MySQL = make(map[string]mySQLNotify)
srvConfig.Notify.MySQL["1"] = mySQLNotify{
Format: formatNamespace,
}
} else {
srvConfig.Notify.MySQL = cv18.Notify.MySQL
}
// V18 will not have mqtt support, so we add that here.
srvConfig.Notify.MQTT = make(map[string]mqttNotify)
srvConfig.Notify.MQTT["1"] = mqttNotify{}
// Load browser config from existing config in the file.
srvConfig.Browser = cv18.Browser
if err = quick.Save(configFile, srvConfig); err != nil {
return fmt.Errorf("Failed to migrate config from %s to %s. %v", cv18.Version, srvConfig.Version, err)
}
log.Printf(configMigrateMSGTemplate, configFile, cv18.Version, srvConfig.Version)
return nil
}

View File

@@ -122,11 +122,14 @@ func TestServerConfigMigrateInexistentConfig(t *testing.T) {
if err := migrateV17ToV18(); err != nil {
t.Fatal("migrate v17 to v18 should succeed when no config file is found")
}
if err := migrateV18ToV19(); err != nil {
t.Fatal("migrate v18 to v19 should succeed when no config file is found")
}
}
// Test if a config migration from v2 to v18 is successfully done
func TestServerConfigMigrateV2toV18(t *testing.T) {
// Test if a config migration from v2 to v19 is successfully done
func TestServerConfigMigrateV2toV19(t *testing.T) {
rootPath, err := newTestConfig(globalMinioDefaultRegion)
if err != nil {
t.Fatalf("Init Test config failed")
@@ -166,7 +169,7 @@ func TestServerConfigMigrateV2toV18(t *testing.T) {
}
// Check the version number in the upgraded config file
expectedVersion := v18
expectedVersion := v19
if serverConfig.Version != expectedVersion {
t.Fatalf("Expect version "+expectedVersion+", found: %v", serverConfig.Version)
}
@@ -246,6 +249,9 @@ func TestServerConfigMigrateFaultyConfig(t *testing.T) {
if err := migrateV17ToV18(); err == nil {
t.Fatal("migrateConfigV17ToV18() should fail with a corrupted json")
}
if err := migrateV18ToV19(); err == nil {
t.Fatal("migrateConfigV18ToV19() should fail with a corrupted json")
}
}
// Test if all migrate code returns error with corrupted config files

View File

@@ -449,3 +449,22 @@ type serverConfigV17 struct {
// Notification queue configuration.
Notify *notifier `json:"notify"`
}
// serverConfigV18 server configuration version '18' which is like
// version '17' except it adds support for "deliveryMode" parameter in
// the AMQP notification target.
type serverConfigV18 struct {
sync.RWMutex
Version string `json:"version"`
// S3 API configuration.
Credential credential `json:"credential"`
Region string `json:"region"`
Browser BrowserFlag `json:"browser"`
// Additional error logging configuration.
Logger *loggers `json:"logger"`
// Notification queue configuration.
Notify *notifier `json:"notify"`
}

View File

@@ -27,18 +27,17 @@ import (
)
// Config version
const v18 = "18"
const v19 = "19"
var (
// serverConfig server config.
serverConfig *serverConfigV18
serverConfig *serverConfigV19
serverConfigMu sync.RWMutex
)
// serverConfigV18 server configuration version '18' which is like
// version '17' except it adds support for "deliveryMode" parameter in
// the AMQP notification target.
type serverConfigV18 struct {
// serverConfigV19 server configuration version '19' which is like
// version '18' except it adds support for MQTT notifications.
type serverConfigV19 struct {
sync.RWMutex
Version string `json:"version"`
@@ -55,7 +54,7 @@ type serverConfigV18 struct {
}
// GetVersion get current config version.
func (s *serverConfigV18) GetVersion() string {
func (s *serverConfigV19) GetVersion() string {
s.RLock()
defer s.RUnlock()
@@ -63,7 +62,7 @@ func (s *serverConfigV18) GetVersion() string {
}
// SetRegion set new region.
func (s *serverConfigV18) SetRegion(region string) {
func (s *serverConfigV19) SetRegion(region string) {
s.Lock()
defer s.Unlock()
@@ -71,7 +70,7 @@ func (s *serverConfigV18) SetRegion(region string) {
}
// GetRegion get current region.
func (s *serverConfigV18) GetRegion() string {
func (s *serverConfigV19) GetRegion() string {
s.RLock()
defer s.RUnlock()
@@ -79,7 +78,7 @@ func (s *serverConfigV18) GetRegion() string {
}
// SetCredentials set new credentials.
func (s *serverConfigV18) SetCredential(creds credential) {
func (s *serverConfigV19) SetCredential(creds credential) {
s.Lock()
defer s.Unlock()
@@ -88,7 +87,7 @@ func (s *serverConfigV18) SetCredential(creds credential) {
}
// GetCredentials get current credentials.
func (s *serverConfigV18) GetCredential() credential {
func (s *serverConfigV19) GetCredential() credential {
s.RLock()
defer s.RUnlock()
@@ -96,7 +95,7 @@ func (s *serverConfigV18) GetCredential() credential {
}
// SetBrowser set if browser is enabled.
func (s *serverConfigV18) SetBrowser(b bool) {
func (s *serverConfigV19) SetBrowser(b bool) {
s.Lock()
defer s.Unlock()
@@ -105,7 +104,7 @@ func (s *serverConfigV18) SetBrowser(b bool) {
}
// GetCredentials get current credentials.
func (s *serverConfigV18) GetBrowser() bool {
func (s *serverConfigV19) GetBrowser() bool {
s.RLock()
defer s.RUnlock()
@@ -113,7 +112,7 @@ func (s *serverConfigV18) GetBrowser() bool {
}
// Save config.
func (s *serverConfigV18) Save() error {
func (s *serverConfigV19) Save() error {
s.RLock()
defer s.RUnlock()
@@ -121,9 +120,9 @@ func (s *serverConfigV18) Save() error {
return quick.Save(getConfigFile(), s)
}
func newServerConfigV18() *serverConfigV18 {
srvCfg := &serverConfigV18{
Version: v18,
func newServerConfigV19() *serverConfigV19 {
srvCfg := &serverConfigV19{
Version: v19,
Credential: mustGetNewCredential(),
Region: globalMinioDefaultRegion,
Browser: true,
@@ -137,6 +136,8 @@ func newServerConfigV18() *serverConfigV18 {
// Make sure to initialize notification configs.
srvCfg.Notify.AMQP = make(map[string]amqpNotify)
srvCfg.Notify.AMQP["1"] = amqpNotify{}
srvCfg.Notify.MQTT = make(map[string]mqttNotify)
srvCfg.Notify.MQTT["1"] = mqttNotify{}
srvCfg.Notify.ElasticSearch = make(map[string]elasticSearchNotify)
srvCfg.Notify.ElasticSearch["1"] = elasticSearchNotify{}
srvCfg.Notify.Redis = make(map[string]redisNotify)
@@ -159,7 +160,7 @@ func newServerConfigV18() *serverConfigV18 {
// found, otherwise use default parameters
func newConfig() error {
// Initialize server config.
srvCfg := newServerConfigV18()
srvCfg := newServerConfigV19()
// If env is set override the credentials from config file.
if globalIsEnvCreds {
@@ -237,8 +238,8 @@ func checkDupJSONKeys(json string) error {
}
// getValidConfig - returns valid server configuration
func getValidConfig() (*serverConfigV18, error) {
srvCfg := &serverConfigV18{
func getValidConfig() (*serverConfigV19, error) {
srvCfg := &serverConfigV19{
Region: globalMinioDefaultRegion,
Browser: true,
}
@@ -248,8 +249,8 @@ func getValidConfig() (*serverConfigV18, error) {
return nil, err
}
if srvCfg.Version != v18 {
return nil, fmt.Errorf("configuration version mismatch. Expected: %s, Got: %s", v18, srvCfg.Version)
if srvCfg.Version != v19 {
return nil, fmt.Errorf("configuration version mismatch. Expected: %s, Got: %s", v19, srvCfg.Version)
}
// Load config file json and check for duplication json keys

View File

@@ -80,13 +80,21 @@ func TestServerConfig(t *testing.T) {
}
// Set new console logger.
// Set new Webhook notification id.
// Set new MySQL notification id.
serverConfig.Notify.SetMySQLByID("2", mySQLNotify{})
savedNotifyCfg6 := serverConfig.Notify.GetMySQLByID("2")
if !reflect.DeepEqual(savedNotifyCfg6, mySQLNotify{}) {
t.Errorf("Expecting Webhook config %#v found %#v", mySQLNotify{}, savedNotifyCfg6)
}
// Set new console logger.
// Set new MQTT notification id.
serverConfig.Notify.SetMQTTByID("2", mqttNotify{})
savedNotifyCfg7 := serverConfig.Notify.GetMQTTByID("2")
if !reflect.DeepEqual(savedNotifyCfg7, mqttNotify{}) {
t.Errorf("Expecting Webhook config %#v found %#v", mqttNotify{}, savedNotifyCfg7)
}
consoleLogger := NewConsoleLogger()
serverConfig.Logger.SetConsole(consoleLogger)
consoleCfg := serverConfig.Logger.GetConsole()
@@ -109,8 +117,8 @@ func TestServerConfig(t *testing.T) {
serverConfig.Logger.SetFile(fileLogger)
// Match version.
if serverConfig.GetVersion() != v18 {
t.Errorf("Expecting version %s found %s", serverConfig.GetVersion(), v18)
if serverConfig.GetVersion() != v19 {
t.Errorf("Expecting version %s found %s", serverConfig.GetVersion(), v19)
}
// Attempt to save.
@@ -223,7 +231,7 @@ func TestValidateConfig(t *testing.T) {
configPath := filepath.Join(rootPath, minioConfigFile)
v := v18
v := v19
testCases := []struct {
configData string
@@ -309,6 +317,9 @@ func TestValidateConfig(t *testing.T) {
// Test 28 - Test valid Format for Redis
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "redis": { "1": { "enable": true, "format": "namespace", "address": "example.com:80", "password": "xxx", "key": "key1" } }}}`, true},
// Test 29 - Test MQTT
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "mqtt": { "1": { "enable": true, "broker": "", "topic": "", "qos": 0, "clientId": "", "username": "", "password": ""}}}}`, false},
}
for i, testCase := range testCases {

View File

@@ -609,6 +609,25 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
}
}
// Load all mqtt targets, initialize their respective loggers.
for accountID, mqttN := range serverConfig.Notify.GetMQTT() {
if !mqttN.Enable {
continue
}
if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeMQTT, newMQTTNotify); err != nil {
if _, ok := err.(net.Error); ok {
err = &net.OpError{
Op: "Connecting to " + queueARN,
Net: "tcp",
Err: err,
}
}
return nil, err
}
}
// Load all nats targets, initialize their respective loggers.
for accountID, natsN := range serverConfig.Notify.GetNATS() {
if !natsN.Enable {

View File

@@ -184,7 +184,7 @@ func newGatewayLayer(backendType gatewayBackend, endpoint, accessKey, secretKey
// only used in memory.
func newGatewayConfig(accessKey, secretKey, region string) error {
// Initialize server config.
srvCfg := newServerConfigV18()
srvCfg := newServerConfigV19()
// If env is set for a fresh start, save them to config file.
srvCfg.SetCredential(credential{

View File

@@ -32,6 +32,7 @@ type notifier struct {
Kafka kafkaConfigs `json:"kafka"`
Webhook webhookConfigs `json:"webhook"`
MySQL mySQLConfigs `json:"mysql"`
MQTT mqttConfigs `json:"mqtt"`
// Add new notification queues.
}
@@ -54,6 +55,25 @@ func (a amqpConfigs) Validate() error {
return nil
}
type mqttConfigs map[string]mqttNotify
func (a mqttConfigs) Clone() mqttConfigs {
a2 := make(mqttConfigs, len(a))
for k, v := range a {
a2[k] = v
}
return a2
}
func (a mqttConfigs) Validate() error {
for k, v := range a {
if err := v.Validate(); err != nil {
return fmt.Errorf("MQTT [%s] configuration invalid: %s", k, err.Error())
}
}
return nil
}
type natsConfigs map[string]natsNotify
func (a natsConfigs) Clone() natsConfigs {
@@ -215,6 +235,9 @@ func (n *notifier) Validate() error {
if err := n.MySQL.Validate(); err != nil {
return err
}
if err := n.MQTT.Validate(); err != nil {
return err
}
return nil
}
@@ -236,6 +259,24 @@ func (n *notifier) GetAMQPByID(accountID string) amqpNotify {
return n.AMQP[accountID]
}
func (n *notifier) SetMQTTByID(accountID string, mqttn mqttNotify) {
n.Lock()
defer n.Unlock()
n.MQTT[accountID] = mqttn
}
func (n *notifier) GetMQTT() map[string]mqttNotify {
n.RLock()
defer n.RUnlock()
return n.MQTT.Clone()
}
func (n *notifier) GetMQTTByID(accountID string) mqttNotify {
n.RLock()
defer n.RUnlock()
return n.MQTT[accountID]
}
func (n *notifier) SetNATSByID(accountID string, natsn natsNotify) {
n.Lock()
defer n.Unlock()

View File

@@ -30,6 +30,8 @@ const (
// Static string indicating queue type 'amqp'.
queueTypeAMQP = "amqp"
// Static string indicating queue type 'mqtt'.
queueTypeMQTT = "mqtt"
// Static string indicating queue type 'nats'.
queueTypeNATS = "nats"
// Static string indicating queue type 'elasticsearch'.
@@ -80,6 +82,25 @@ func isAMQPQueue(sqsArn arnSQS) bool {
return true
}
// Returns true if mqttARN is for an MQTT queue.
func isMQTTQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeMQTT {
return false
}
mqttL := serverConfig.Notify.GetMQTTByID(sqsArn.AccountID)
if !mqttL.Enable {
return false
}
// Connect to mqtt server to validate.
mqttC, err := dialMQTT(mqttL)
if err != nil {
errorIf(err, "Unable to connect to mqtt service. %#v", mqttL)
return false
}
defer mqttC.Client.Disconnect(250)
return true
}
// Returns true if natsArn is for an NATS queue.
func isNATSQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeNATS {

123
cmd/notify-mqtt.go Normal file
View File

@@ -0,0 +1,123 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"crypto/tls"
"io/ioutil"
"time"
"github.com/Sirupsen/logrus"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
type mqttNotify struct {
Enable bool `json:"enable"`
Broker string `json:"broker"`
Topic string `json:"topic"`
QoS int `json:"qos"`
ClientID string `json:"clientId"`
User string `json:"username"`
Password string `json:"password"`
}
func (m *mqttNotify) Validate() error {
if !m.Enable {
return nil
}
if _, err := checkURL(m.Broker); err != nil {
return err
}
return nil
}
type mqttConn struct {
params mqttNotify
Client MQTT.Client
}
func dialMQTT(mqttL mqttNotify) (mqttConn, error) {
if !mqttL.Enable {
return mqttConn{}, errNotifyNotEnabled
}
connOpts := &MQTT.ClientOptions{
ClientID: mqttL.ClientID,
CleanSession: true,
Username: mqttL.User,
Password: mqttL.Password,
MaxReconnectInterval: 1 * time.Second,
KeepAlive: 30 * time.Second,
TLSConfig: tls.Config{RootCAs: globalRootCAs},
}
connOpts.AddBroker(mqttL.Broker)
client := MQTT.NewClient(connOpts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return mqttConn{}, token.Error()
}
return mqttConn{Client: client, params: mqttL}, nil
}
func newMQTTNotify(accountID string) (*logrus.Logger, error) {
mqttL := serverConfig.Notify.GetMQTTByID(accountID)
//connect to MQTT Server
mqttC, err := dialMQTT(mqttL)
if err != nil {
return nil, err
}
mqttLog := logrus.New()
// Disable writing to console.
mqttLog.Out = ioutil.Discard
// Add a mqtt hook.
mqttLog.Hooks.Add(mqttC)
// Set default JSON formatter
mqttLog.Formatter = new(logrus.JSONFormatter)
// successfully enabled all MQTTs
return mqttLog, nil
}
// Fire if called when an event should be sent to the message broker.
func (q mqttConn) Fire(entry *logrus.Entry) error {
body, err := entry.String()
if err != nil {
return err
}
if !q.Client.IsConnected() {
if token := q.Client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
}
token := q.Client.Publish(q.params.Topic, byte(q.params.QoS), false, body)
if token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
}
// Levels is available logging levels.
func (q mqttConn) Levels() []logrus.Level {
return []logrus.Level{
logrus.InfoLevel,
}
}

View File

@@ -108,7 +108,7 @@ func initConfig() {
// Config file does not exist, we create it fresh and return upon success.
if isFile(getConfigFile()) {
fatalIf(migrateConfig(), "Config migration failed.")
fatalIf(loadConfig(), "Unable to load config version: '%s'.", v18)
fatalIf(loadConfig(), "Unable to load config version: '%s'.", v19)
} else {
fatalIf(newConfig(), "Unable to initialize minio config for the first time.")
log.Println("Created minio configuration file successfully at " + getConfigDir())