mirror of
https://github.com/minio/minio.git
synced 2025-01-12 15:33:22 -05:00
1bfbe354f5
This is a regression from #14037, distributed setups with MQTT was not working anymore. According to MQTT spec it is expected this is unique per server. We shall proceed to use unix nano timestamp hex value instead here.
289 lines
8.1 KiB
Go
289 lines
8.1 KiB
Go
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package target
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/url"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
"github.com/minio/minio/internal/event"
|
|
xnet "github.com/minio/pkg/net"
|
|
)
|
|
|
|
const (
|
|
reconnectInterval = 5 // In Seconds
|
|
storePrefix = "minio"
|
|
)
|
|
|
|
// MQTT input constants
|
|
const (
|
|
MqttBroker = "broker"
|
|
MqttTopic = "topic"
|
|
MqttQoS = "qos"
|
|
MqttUsername = "username"
|
|
MqttPassword = "password"
|
|
MqttReconnectInterval = "reconnect_interval"
|
|
MqttKeepAliveInterval = "keep_alive_interval"
|
|
MqttQueueDir = "queue_dir"
|
|
MqttQueueLimit = "queue_limit"
|
|
|
|
EnvMQTTEnable = "MINIO_NOTIFY_MQTT_ENABLE"
|
|
EnvMQTTBroker = "MINIO_NOTIFY_MQTT_BROKER"
|
|
EnvMQTTTopic = "MINIO_NOTIFY_MQTT_TOPIC"
|
|
EnvMQTTQoS = "MINIO_NOTIFY_MQTT_QOS"
|
|
EnvMQTTUsername = "MINIO_NOTIFY_MQTT_USERNAME"
|
|
EnvMQTTPassword = "MINIO_NOTIFY_MQTT_PASSWORD"
|
|
EnvMQTTReconnectInterval = "MINIO_NOTIFY_MQTT_RECONNECT_INTERVAL"
|
|
EnvMQTTKeepAliveInterval = "MINIO_NOTIFY_MQTT_KEEP_ALIVE_INTERVAL"
|
|
EnvMQTTQueueDir = "MINIO_NOTIFY_MQTT_QUEUE_DIR"
|
|
EnvMQTTQueueLimit = "MINIO_NOTIFY_MQTT_QUEUE_LIMIT"
|
|
)
|
|
|
|
// MQTTArgs - MQTT target arguments.
|
|
type MQTTArgs struct {
|
|
Enable bool `json:"enable"`
|
|
Broker xnet.URL `json:"broker"`
|
|
Topic string `json:"topic"`
|
|
QoS byte `json:"qos"`
|
|
User string `json:"username"`
|
|
Password string `json:"password"`
|
|
MaxReconnectInterval time.Duration `json:"reconnectInterval"`
|
|
KeepAlive time.Duration `json:"keepAliveInterval"`
|
|
RootCAs *x509.CertPool `json:"-"`
|
|
QueueDir string `json:"queueDir"`
|
|
QueueLimit uint64 `json:"queueLimit"`
|
|
}
|
|
|
|
// Validate MQTTArgs fields
|
|
func (m MQTTArgs) Validate() error {
|
|
if !m.Enable {
|
|
return nil
|
|
}
|
|
u, err := xnet.ParseURL(m.Broker.String())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
switch u.Scheme {
|
|
case "ws", "wss", "tcp", "ssl", "tls", "tcps":
|
|
default:
|
|
return errors.New("unknown protocol in broker address")
|
|
}
|
|
if m.QueueDir != "" {
|
|
if !filepath.IsAbs(m.QueueDir) {
|
|
return errors.New("queueDir path should be absolute")
|
|
}
|
|
if m.QoS == 0 {
|
|
return errors.New("qos should be set to 1 or 2 if queueDir is set")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// MQTTTarget - MQTT target.
|
|
type MQTTTarget struct {
|
|
id event.TargetID
|
|
args MQTTArgs
|
|
client mqtt.Client
|
|
store Store
|
|
quitCh chan struct{}
|
|
loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})
|
|
}
|
|
|
|
// ID - returns target ID.
|
|
func (target *MQTTTarget) ID() event.TargetID {
|
|
return target.id
|
|
}
|
|
|
|
// HasQueueStore - Checks if the queueStore has been configured for the target
|
|
func (target *MQTTTarget) HasQueueStore() bool {
|
|
return target.store != nil
|
|
}
|
|
|
|
// IsActive - Return true if target is up and active
|
|
func (target *MQTTTarget) IsActive() (bool, error) {
|
|
if !target.client.IsConnectionOpen() {
|
|
return false, errNotConnected
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// send - sends an event to the mqtt.
|
|
func (target *MQTTTarget) send(eventData event.Event) error {
|
|
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
key := eventData.S3.Bucket.Name + "/" + objectName
|
|
|
|
data, err := json.Marshal(event.Log{EventName: eventData.EventName, Key: key, Records: []event.Event{eventData}})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
token := target.client.Publish(target.args.Topic, target.args.QoS, false, string(data))
|
|
if !token.WaitTimeout(reconnectInterval * time.Second) {
|
|
return errNotConnected
|
|
}
|
|
return token.Error()
|
|
}
|
|
|
|
// Send - reads an event from store and sends it to MQTT.
|
|
func (target *MQTTTarget) Send(eventKey string) error {
|
|
// Do not send if the connection is not active.
|
|
_, err := target.IsActive()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
eventData, err := target.store.Get(eventKey)
|
|
if err != nil {
|
|
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
|
|
// Such events will not exist and wouldve been already been sent successfully.
|
|
if os.IsNotExist(err) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
if err = target.send(eventData); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Delete the event from store.
|
|
return target.store.Del(eventKey)
|
|
}
|
|
|
|
// Save - saves the events to the store if queuestore is configured, which will
|
|
// be replayed when the mqtt connection is active.
|
|
func (target *MQTTTarget) Save(eventData event.Event) error {
|
|
if target.store != nil {
|
|
return target.store.Put(eventData)
|
|
}
|
|
|
|
// Do not send if the connection is not active.
|
|
_, err := target.IsActive()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return target.send(eventData)
|
|
}
|
|
|
|
// Close - does nothing and available for interface compatibility.
|
|
func (target *MQTTTarget) Close() error {
|
|
target.client.Disconnect(100)
|
|
close(target.quitCh)
|
|
return nil
|
|
}
|
|
|
|
// NewMQTTTarget - creates new MQTT target.
|
|
func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MQTTTarget, error) {
|
|
if args.MaxReconnectInterval == 0 {
|
|
// Default interval
|
|
// https://github.com/eclipse/paho.mqtt.golang/blob/master/options.go#L115
|
|
args.MaxReconnectInterval = 10 * time.Minute
|
|
}
|
|
|
|
if args.KeepAlive == 0 {
|
|
args.KeepAlive = 10 * time.Second
|
|
}
|
|
|
|
// Using hex here, to make sure we avoid 23
|
|
// character limit on client_id according to
|
|
// MQTT spec.
|
|
clientID := fmt.Sprintf("%x", time.Now().UnixNano())
|
|
|
|
options := mqtt.NewClientOptions().
|
|
SetClientID(clientID).
|
|
SetCleanSession(true).
|
|
SetUsername(args.User).
|
|
SetPassword(args.Password).
|
|
SetMaxReconnectInterval(args.MaxReconnectInterval).
|
|
SetKeepAlive(args.KeepAlive).
|
|
SetTLSConfig(&tls.Config{RootCAs: args.RootCAs}).
|
|
AddBroker(args.Broker.String())
|
|
|
|
client := mqtt.NewClient(options)
|
|
|
|
target := &MQTTTarget{
|
|
id: event.TargetID{ID: id, Name: "mqtt"},
|
|
args: args,
|
|
client: client,
|
|
quitCh: make(chan struct{}),
|
|
loggerOnce: loggerOnce,
|
|
}
|
|
|
|
token := client.Connect()
|
|
retryRegister := func() {
|
|
for {
|
|
retry:
|
|
select {
|
|
case <-doneCh:
|
|
return
|
|
case <-target.quitCh:
|
|
return
|
|
default:
|
|
ok := token.WaitTimeout(reconnectInterval * time.Second)
|
|
if ok && token.Error() != nil {
|
|
target.loggerOnce(context.Background(),
|
|
fmt.Errorf("Previous connect failed with %w attempting a reconnect",
|
|
token.Error()),
|
|
target.ID())
|
|
time.Sleep(reconnectInterval * time.Second)
|
|
token = client.Connect()
|
|
goto retry
|
|
}
|
|
if ok {
|
|
// Successfully connected.
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if args.QueueDir != "" {
|
|
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id)
|
|
target.store = NewQueueStore(queueDir, args.QueueLimit)
|
|
if err := target.store.Open(); err != nil {
|
|
target.loggerOnce(context.Background(), err, target.ID())
|
|
return target, err
|
|
}
|
|
|
|
if !test {
|
|
go retryRegister()
|
|
// Replays the events from the store.
|
|
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
|
|
// Start replaying events from the store.
|
|
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
|
|
}
|
|
} else if token.Wait() && token.Error() != nil {
|
|
return target, token.Error()
|
|
}
|
|
return target, nil
|
|
}
|