mirror of https://github.com/minio/minio.git
124 lines
2.9 KiB
Go
124 lines
2.9 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"io/ioutil"
|
|
"time"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
MQTT "github.com/eclipse/paho.mqtt.golang"
|
|
)
|
|
|
|
type mqttNotify struct {
|
|
Enable bool `json:"enable"`
|
|
Broker string `json:"broker"`
|
|
Topic string `json:"topic"`
|
|
QoS int `json:"qos"`
|
|
ClientID string `json:"clientId"`
|
|
User string `json:"username"`
|
|
Password string `json:"password"`
|
|
}
|
|
|
|
func (m *mqttNotify) Validate() error {
|
|
if !m.Enable {
|
|
return nil
|
|
}
|
|
if _, err := checkURL(m.Broker); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type mqttConn struct {
|
|
params mqttNotify
|
|
Client MQTT.Client
|
|
}
|
|
|
|
func dialMQTT(mqttL mqttNotify) (mqttConn, error) {
|
|
if !mqttL.Enable {
|
|
return mqttConn{}, errNotifyNotEnabled
|
|
}
|
|
connOpts := &MQTT.ClientOptions{
|
|
ClientID: mqttL.ClientID,
|
|
CleanSession: true,
|
|
Username: mqttL.User,
|
|
Password: mqttL.Password,
|
|
MaxReconnectInterval: 1 * time.Second,
|
|
KeepAlive: 30 * time.Second,
|
|
TLSConfig: tls.Config{RootCAs: globalRootCAs},
|
|
}
|
|
connOpts.AddBroker(mqttL.Broker)
|
|
client := MQTT.NewClient(connOpts)
|
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
return mqttConn{}, token.Error()
|
|
}
|
|
return mqttConn{Client: client, params: mqttL}, nil
|
|
}
|
|
|
|
func newMQTTNotify(accountID string) (*logrus.Logger, error) {
|
|
mqttL := serverConfig.Notify.GetMQTTByID(accountID)
|
|
|
|
//connect to MQTT Server
|
|
mqttC, err := dialMQTT(mqttL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
mqttLog := logrus.New()
|
|
|
|
// Disable writing to console.
|
|
mqttLog.Out = ioutil.Discard
|
|
|
|
// Add a mqtt hook.
|
|
mqttLog.Hooks.Add(mqttC)
|
|
|
|
// Set default JSON formatter
|
|
mqttLog.Formatter = new(logrus.JSONFormatter)
|
|
|
|
// successfully enabled all MQTTs
|
|
return mqttLog, nil
|
|
}
|
|
|
|
// Fire if called when an event should be sent to the message broker.
|
|
func (q mqttConn) Fire(entry *logrus.Entry) error {
|
|
body, err := entry.String()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !q.Client.IsConnected() {
|
|
if token := q.Client.Connect(); token.Wait() && token.Error() != nil {
|
|
return token.Error()
|
|
}
|
|
}
|
|
token := q.Client.Publish(q.params.Topic, byte(q.params.QoS), false, body)
|
|
if token.Wait() && token.Error() != nil {
|
|
return token.Error()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Levels is available logging levels.
|
|
func (q mqttConn) Levels() []logrus.Level {
|
|
return []logrus.Level{
|
|
logrus.InfoLevel,
|
|
}
|
|
}
|