More MQTT improvements.
This commit is contained in:
parent
268236b684
commit
51b7dc4fc2
|
@ -12,9 +12,13 @@ module.exports.CreateMQTTBroker = function (parent, db, args) {
|
||||||
obj.parent = parent;
|
obj.parent = parent;
|
||||||
obj.db = db;
|
obj.db = db;
|
||||||
obj.args = args;
|
obj.args = args;
|
||||||
obj.aedes = require("aedes")();
|
|
||||||
obj.handle = obj.aedes.handle;
|
|
||||||
obj.connections = {}; // NodesID --> client array
|
obj.connections = {}; // NodesID --> client array
|
||||||
|
const aedes = require("aedes")();
|
||||||
|
obj.handle = aedes.handle;
|
||||||
|
const allowedSubscriptionTopics = [ 'presence' ];
|
||||||
|
const denyError = new Error('denied');
|
||||||
|
var authError = new Error('Auth error')
|
||||||
|
authError.returnCode = 1
|
||||||
|
|
||||||
// Generate a username and password for MQTT login
|
// Generate a username and password for MQTT login
|
||||||
obj.generateLogin = function (meshid, nodeid) {
|
obj.generateLogin = function (meshid, nodeid) {
|
||||||
|
@ -25,19 +29,28 @@ module.exports.CreateMQTTBroker = function (parent, db, args) {
|
||||||
return { meshid: meshid, nodeid: nodeid, user: username, pass: parent.config.settings.mqtt.auth.keyid + ':' + nonce + ':' + parent.crypto.createHash('sha384').update(username + ':' + nonce + ':' + parent.config.settings.mqtt.auth.key).digest("base64") };
|
return { meshid: meshid, nodeid: nodeid, user: username, pass: parent.config.settings.mqtt.auth.keyid + ':' + nonce + ':' + parent.crypto.createHash('sha384').update(username + ':' + nonce + ':' + parent.config.settings.mqtt.auth.key).digest("base64") };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Publish a message to a specific nodeid & topic
|
||||||
|
obj.publish = function (nodeid, topic, message) {
|
||||||
|
var clients = obj.connections[nodeid];
|
||||||
|
if (clients == null) return;
|
||||||
|
if (typeof message == 'string') { message = new Buffer(message); }
|
||||||
|
for (var i in clients) { clients[i].publish({ cmd: 'publish', qos: 0, topic: topic, payload: message, retain: false }); }
|
||||||
|
}
|
||||||
|
|
||||||
// Connection Authentication
|
// Connection Authentication
|
||||||
obj.aedes.authenticate = function (client, username, password, callback) {
|
aedes.authenticate = function (client, username, password, callback) {
|
||||||
obj.parent.debug("mqtt", "Authentication User:" + username + ", Pass:" + password.toString() + ", ClientID:" + client.id + ", " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip));
|
obj.parent.debug("mqtt", "Authentication User:" + username + ", Pass:" + password.toString() + ", ClientID:" + client.id + ", " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip));
|
||||||
|
console.log('MQTT Connect');
|
||||||
|
|
||||||
// Parse the username and password
|
// Parse the username and password
|
||||||
var usersplit = username.split(':');
|
var usersplit = username.split(':');
|
||||||
var passsplit = password.toString().split(':');
|
var passsplit = password.toString().split(':');
|
||||||
if ((usersplit.length !== 4) || (passsplit.length !== 3)) { obj.parent.debug("mqtt", "Invalid user/pass format, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(null, false); return; }
|
if ((usersplit.length !== 4) || (passsplit.length !== 3)) { obj.parent.debug("mqtt", "Invalid user/pass format, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(authError, null); return; }
|
||||||
if (usersplit[0] !== 'MCAuth1') { obj.parent.debug("mqtt", "Invalid auth method, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(null, false); return; }
|
if (usersplit[0] !== 'MCAuth1') { obj.parent.debug("mqtt", "Invalid auth method, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(authError, null); return; }
|
||||||
|
|
||||||
// Check authentication
|
// Check authentication
|
||||||
if (passsplit[0] !== parent.config.settings.mqtt.auth.keyid) { obj.parent.debug("mqtt", "Invalid auth keyid, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(null, false); return; }
|
if (passsplit[0] !== parent.config.settings.mqtt.auth.keyid) { obj.parent.debug("mqtt", "Invalid auth keyid, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(authError, null); return; }
|
||||||
if (parent.crypto.createHash('sha384').update(username + ':' + passsplit[1] + ':' + parent.config.settings.mqtt.auth.key).digest("base64") !== passsplit[2]) { obj.parent.debug("mqtt", "Invalid password, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(null, false); return; }
|
if (parent.crypto.createHash('sha384').update(username + ':' + passsplit[1] + ':' + parent.config.settings.mqtt.auth.key).digest("base64") !== passsplit[2]) { obj.parent.debug("mqtt", "Invalid password, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(authError, null); return; }
|
||||||
|
|
||||||
// Setup the identifiers
|
// Setup the identifiers
|
||||||
const xnodeid = usersplit[1];
|
const xnodeid = usersplit[1];
|
||||||
|
@ -49,16 +62,15 @@ module.exports.CreateMQTTBroker = function (parent, db, args) {
|
||||||
|
|
||||||
// Convert meshid from HEX to Base64 if needed
|
// Convert meshid from HEX to Base64 if needed
|
||||||
if (xmeshid.length === 96) { xmeshid = Buffer.from(xmeshid, 'hex').toString('base64'); }
|
if (xmeshid.length === 96) { xmeshid = Buffer.from(xmeshid, 'hex').toString('base64'); }
|
||||||
if ((xmeshid.length !== 64) || (xnodeid.length != 64)) { callback(null, false); return; }
|
if ((xmeshid.length !== 64) || (xnodeid.length != 64)) { callback(authError, null); return; }
|
||||||
|
|
||||||
|
// Set the client nodeid and meshid
|
||||||
client.xdbNodeKey = 'node/' + xdomainid + '/' + xnodeid;
|
client.xdbNodeKey = 'node/' + xdomainid + '/' + xnodeid;
|
||||||
client.xdbMeshKey = 'mesh/' + xdomainid + '/' + xmeshid;
|
client.xdbMeshKey = 'mesh/' + xdomainid + '/' + xmeshid;
|
||||||
|
|
||||||
//console.log(obj.generateLogin(client.xdbMeshKey, client.xdbNodeKey));
|
|
||||||
|
|
||||||
// Check if this node exists in the database
|
// Check if this node exists in the database
|
||||||
db.Get(client.xdbNodeKey, function (err, nodes) {
|
db.Get(client.xdbNodeKey, function (err, nodes) {
|
||||||
if ((nodes == null) || (nodes.length != 1)) { callback(null, false); return; } // Node does not exist
|
if ((nodes == null) || (nodes.length != 1)) { callback(authError, null); return; } // Node does not exist
|
||||||
|
|
||||||
// If this device now has a different meshid, fix it here.
|
// If this device now has a different meshid, fix it here.
|
||||||
client.xdbMeshKey = nodes[0].meshid;
|
client.xdbMeshKey = nodes[0].meshid;
|
||||||
|
@ -93,27 +105,33 @@ module.exports.CreateMQTTBroker = function (parent, db, args) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if a client can publish a packet
|
// Check if a client can publish a packet
|
||||||
obj.aedes.authorizePublish = function (client, packet, callback) {
|
aedes.authorizeSubscribe = function (client, sub, callback) {
|
||||||
// TODO: add authorized publish control
|
// Subscription control
|
||||||
//console.log(packet);
|
obj.parent.debug("mqtt", "AuthorizeSubscribe \"" + sub.topic + "\", " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip));
|
||||||
obj.parent.debug("mqtt", "AuthorizePublish, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip));
|
if (allowedSubscriptionTopics.indexOf(sub.topic) === -1) { sub = null; } // If not a supported subscription, deny it.
|
||||||
callback(null);
|
callback(null, sub); // We authorize supported topics, but will not allow agents to publish anything to other agents.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if a client can publish a packet
|
// Check if a client can publish a packet
|
||||||
obj.aedes.authorizeSubscribe = function (client, sub, callback) {
|
aedes.authorizePublish = function (client, packet, callback) {
|
||||||
// TODO: add subscription control here
|
// Handle a published message
|
||||||
obj.parent.debug("mqtt", "AuthorizeSubscribe \"" + sub.topic + "\", " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip));
|
obj.parent.debug("mqtt", "AuthorizePublish, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip));
|
||||||
callback(null, sub);
|
handleMessage(client.xdbNodeKey, client.xdbNodeKey, packet.topic, packet.payload);
|
||||||
|
//callback(denyError); // Deny all, clients can't publish anything to other agents.
|
||||||
|
//callback(null); // Deny all, clients can't publish anything to other agents.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if a client can forward a packet
|
// Check if a client can forward a packet
|
||||||
obj.aedes.authorizeForward = function (client, packet) {
|
//aedes.authorizeForward = function (client, packet) {
|
||||||
// TODO: add forwarding control
|
// TODO: add forwarding control
|
||||||
//console.log(packet);
|
//obj.parent.debug("mqtt", "AuthorizeForward, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip));
|
||||||
obj.parent.debug("mqtt", "AuthorizeForward, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip));
|
|
||||||
//return packet;
|
//return packet;
|
||||||
return packet;
|
//}
|
||||||
|
|
||||||
|
// Handle messages coming from clients
|
||||||
|
function handleMessage(nodeid, meshid, topic, message) {
|
||||||
|
console.log('handleMessage', nodeid, topic, message.toString());
|
||||||
|
obj.publish(nodeid, 'abc', "This is a server reply");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean a IPv6 address that encodes a IPv4 address
|
// Clean a IPv6 address that encodes a IPv4 address
|
||||||
|
|
Loading…
Reference in New Issue