mirror of
https://github.com/minio/minio.git
synced 2025-11-09 21:49:46 -05:00
Improve MQTT token registration retry (#8397)
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* MinIO Cloud Storage, (C) 2018 MinIO, Inc.
|
||||
* MinIO Cloud Storage, (C) 2018-2019 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -88,6 +88,7 @@ type MQTTTarget struct {
|
||||
args MQTTArgs
|
||||
client mqtt.Client
|
||||
store Store
|
||||
quitCh chan struct{}
|
||||
loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})
|
||||
}
|
||||
|
||||
@@ -110,15 +111,15 @@ func (target *MQTTTarget) send(eventData event.Event) error {
|
||||
}
|
||||
|
||||
token := target.client.Publish(target.args.Topic, target.args.QoS, false, string(data))
|
||||
token.Wait()
|
||||
if token.Error() != nil {
|
||||
return token.Error()
|
||||
if !token.WaitTimeout(reconnectInterval * time.Second) {
|
||||
return errNotConnected
|
||||
}
|
||||
return nil
|
||||
return token.Error()
|
||||
}
|
||||
|
||||
// Send - reads an event from store and sends it to MQTT.
|
||||
func (target *MQTTTarget) Send(eventKey string) error {
|
||||
// Do not send if the connection is not active.
|
||||
if !target.client.IsConnectionOpen() {
|
||||
return errNotConnected
|
||||
}
|
||||
@@ -158,11 +159,19 @@ func (target *MQTTTarget) Save(eventData event.Event) error {
|
||||
|
||||
// Close - does nothing and available for interface compatibility.
|
||||
func (target *MQTTTarget) Close() error {
|
||||
target.client.Disconnect(100)
|
||||
close(target.quitCh)
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewMQTTTarget - creates new MQTT target.
|
||||
func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*MQTTTarget, error) {
|
||||
if args.MaxReconnectInterval == 0 {
|
||||
// Default interval
|
||||
// https://github.com/eclipse/paho.mqtt.golang/blob/master/options.go#L115
|
||||
args.MaxReconnectInterval = 10 * time.Minute
|
||||
}
|
||||
|
||||
options := mqtt.NewClientOptions().
|
||||
SetClientID("").
|
||||
SetCleanSession(true).
|
||||
@@ -175,36 +184,38 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce
|
||||
|
||||
client := mqtt.NewClient(options)
|
||||
|
||||
// The client should establish a first time connection.
|
||||
// Connect() should be successful atleast once to publish events.
|
||||
token := client.Connect()
|
||||
|
||||
target := &MQTTTarget{
|
||||
id: event.TargetID{ID: id, Name: "mqtt"},
|
||||
args: args,
|
||||
client: client,
|
||||
quitCh: make(chan struct{}),
|
||||
loggerOnce: loggerOnce,
|
||||
}
|
||||
|
||||
// Retries until the clientID gets registered.
|
||||
token := client.Connect()
|
||||
retryRegister := func() {
|
||||
// Repeat the pings until the client registers the clientId and receives a token.
|
||||
for {
|
||||
var terr error
|
||||
retry:
|
||||
select {
|
||||
case <-doneCh:
|
||||
return
|
||||
case <-target.quitCh:
|
||||
return
|
||||
default:
|
||||
terr = token.Error()
|
||||
if token.Wait() && terr == nil {
|
||||
// Connected
|
||||
ok := token.WaitTimeout(reconnectInterval * time.Second)
|
||||
if ok && token.Error() != nil {
|
||||
target.loggerOnce(context.Background(),
|
||||
fmt.Errorf("Previous connect failed with %s attempting a reconnect",
|
||||
token.Error()),
|
||||
target.ID())
|
||||
time.Sleep(reconnectInterval * time.Second)
|
||||
token = client.Connect()
|
||||
goto retry
|
||||
}
|
||||
if ok {
|
||||
// Successfully connected.
|
||||
return
|
||||
}
|
||||
// Reconnecting
|
||||
time.Sleep(reconnectInterval * time.Second)
|
||||
terr = fmt.Errorf("Previous connect failed with %s, attempting a reconnect", terr)
|
||||
target.loggerOnce(context.Background(), terr, target.ID())
|
||||
token = client.Connect()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user