clientID removed in the MQTT config (#7157)

More than one client can't use the same clientID for MQTT connection. 
This causes problem in distributed deployments where config is shared 
across nodes, as each Minio instance tries to connect to MQTT using the
same clientID.

This commit removes the clientID field in config, and allows
MQTT client to create random clientID for each node.
This commit is contained in:
Praveen raj Mani 2019-01-29 15:00:15 +05:30 committed by Nitish Tiwari
parent 91c839ad28
commit fad59da29d
6 changed files with 15 additions and 12 deletions

View File

@ -119,7 +119,6 @@ var (
"broker": "", "broker": "",
"topic": "", "topic": "",
"qos": 0, "qos": 0,
"clientId": "",
"username": "", "username": "",
"password": "", "password": "",
"reconnectInterval": 0, "reconnectInterval": 0,

View File

@ -233,7 +233,7 @@ func TestValidateConfig(t *testing.T) {
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "redis": { "1": { "enable": true, "format": "namespace", "address": "example.com:80", "password": "xxx", "key": "key1" } }}}`, true}, {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "redis": { "1": { "enable": true, "format": "namespace", "address": "example.com:80", "password": "xxx", "key": "key1" } }}}`, true},
// Test 27 - Test MQTT // Test 27 - Test MQTT
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "mqtt": { "1": { "enable": true, "broker": "", "topic": "", "qos": 0, "clientId": "", "username": "", "password": "", "queueDir": ""}}}}`, false}, {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "mqtt": { "1": { "enable": true, "broker": "", "topic": "", "qos": 0, "username": "", "password": "", "queueDir": ""}}}}`, false},
// Test 28 - Test NSQ // Test 28 - Test NSQ
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "nsq": { "1": { "enable": true, "nsqdAddress": "", "topic": ""} }}}`, false}, {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "nsq": { "1": { "enable": true, "nsqdAddress": "", "topic": ""} }}}`, false},

View File

@ -893,7 +893,7 @@ type serverConfigV32 struct {
} `json:"policy"` } `json:"policy"`
} }
// serverConfigV33 is just like version '32', removes clientID from NATS and adds queueDir with MQTT. // serverConfigV33 is just like version '32', removes clientID from NATS and MQTT, and adds queueDir with MQTT.
type serverConfigV33 struct { type serverConfigV33 struct {
quick.Config `json:"-"` // ignore interfaces quick.Config `json:"-"` // ignore interfaces

View File

@ -160,7 +160,6 @@ The Minio server configuration file is stored on the backend in json format. The
| `broker` | _string_ | (Required) MQTT server endpoint, e.g. `tcp://localhost:1883` | | `broker` | _string_ | (Required) MQTT server endpoint, e.g. `tcp://localhost:1883` |
| `topic` | _string_ | (Required) Name of the MQTT topic to publish on, e.g. `minio` | | `topic` | _string_ | (Required) Name of the MQTT topic to publish on, e.g. `minio` |
| `qos` | _int_ | Set the Quality of Service Level | | `qos` | _int_ | Set the Quality of Service Level |
| `clientId` | _string_ | Unique ID for the MQTT broker to identify Minio |
| `username` | _string_ | Username to connect to the MQTT server (if required) | | `username` | _string_ | Username to connect to the MQTT server (if required) |
| `password` | _string_ | Password to connect to the MQTT server (if required) | | `password` | _string_ | Password to connect to the MQTT server (if required) |
| `queueDir` | _string_ | Persistent store for events when MQTT broker is offline | | `queueDir` | _string_ | Persistent store for events when MQTT broker is offline |
@ -174,7 +173,6 @@ An example configuration for MQTT is shown below:
"broker": "tcp://localhost:1883", "broker": "tcp://localhost:1883",
"topic": "minio", "topic": "minio",
"qos": 1, "qos": 1,
"clientId": "minio",
"username": "", "username": "",
"password": "", "password": "",
"queueDir": "" "queueDir": ""
@ -222,12 +220,14 @@ import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc): def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc)) print("Connected with result code "+str(rc))
client.subscribe("minio") # qos level is set to 1
client.subscribe("minio", 1)
def on_message(client, userdata, msg): def on_message(client, userdata, msg):
print(msg.payload) print(msg.payload)
client = mqtt.Client() # client_id is a randomly generated unique ID for the mqtt broker to identify the connection.
client = mqtt.Client(client_id="myclientid",clean_session=False)
client.on_connect = on_connect client.on_connect = on_connect
client.on_message = on_message client.on_message = on_message

View File

@ -82,7 +82,6 @@
"broker": "", "broker": "",
"topic": "", "topic": "",
"qos": 0, "qos": 0,
"clientId": "",
"username": "", "username": "",
"password": "", "password": "",
"reconnectInterval": 0, "reconnectInterval": 0,

View File

@ -36,7 +36,6 @@ type MQTTArgs struct {
Broker xnet.URL `json:"broker"` Broker xnet.URL `json:"broker"`
Topic string `json:"topic"` Topic string `json:"topic"`
QoS byte `json:"qos"` QoS byte `json:"qos"`
ClientID string `json:"clientId"`
User string `json:"username"` User string `json:"username"`
Password string `json:"password"` Password string `json:"password"`
MaxReconnectInterval time.Duration `json:"reconnectInterval"` MaxReconnectInterval time.Duration `json:"reconnectInterval"`
@ -59,9 +58,15 @@ func (m MQTTArgs) Validate() error {
default: default:
return errors.New("unknown protocol in broker address") return errors.New("unknown protocol in broker address")
} }
if m.QueueDir != "" && !filepath.IsAbs(m.QueueDir) { if m.QueueDir != "" {
if !filepath.IsAbs(m.QueueDir) {
return errors.New("queueDir path should be absolute") return errors.New("queueDir path should be absolute")
} }
if m.QoS == 0 {
return errors.New("qos should be set to 1 or 2 if queueDir is set")
}
}
return nil return nil
} }
@ -120,7 +125,7 @@ func (target *MQTTTarget) Close() error {
// NewMQTTTarget - creates new MQTT target. // NewMQTTTarget - creates new MQTT target.
func NewMQTTTarget(id string, args MQTTArgs) (*MQTTTarget, error) { func NewMQTTTarget(id string, args MQTTArgs) (*MQTTTarget, error) {
options := mqtt.NewClientOptions(). options := mqtt.NewClientOptions().
SetClientID(args.ClientID). SetClientID("").
SetCleanSession(true). SetCleanSession(true).
SetUsername(args.User). SetUsername(args.User).
SetPassword(args.Password). SetPassword(args.Password).