From 9fbe211cadf3211b44d0b506631d9fe6712c71b1 Mon Sep 17 00:00:00 2001 From: Ylian Saint-Hilaire Date: Fri, 4 Oct 2019 17:24:30 -0700 Subject: [PATCH] More MQTT improvements --- meshcentral.js | 8 +-- mpsserver.js | 6 +- mqttbroker.js | 86 +++++++++++++++++++++++------ package.json | 2 +- views/default-min.handlebars | 2 +- views/default-mobile-min.handlebars | 2 +- views/default-mobile.handlebars | 3 + views/default.handlebars | 7 +++ webserver.js | 15 +++-- 9 files changed, 99 insertions(+), 32 deletions(-) diff --git a/meshcentral.js b/meshcentral.js index 17f96280..98f09b44 100644 --- a/meshcentral.js +++ b/meshcentral.js @@ -826,7 +826,7 @@ function CreateMeshCentralServer(config, args) { obj.apfserver = require('./apfserver.js').CreateApfServer(obj, obj.db, obj.args); // Create MQTT Broker to hook into webserver and mpsserver - if (obj.config.mqtt != null) { obj.mqttbroker = require("./mqttbroker.js").CreateMQTTBroker(obj, obj.db, obj.args); } + if (obj.config.settings.mqtt != null) { obj.mqttbroker = require("./mqttbroker.js").CreateMQTTBroker(obj, obj.db, obj.args); } // Start the web server and if needed, the redirection web server. obj.webserver = require('./webserver.js').CreateWebServer(obj, obj.db, obj.args, obj.certificates); @@ -1100,9 +1100,9 @@ function CreateMeshCentralServer(config, args) { // meshId: mesh identifier of format mesh/domain/meshidhex // nodeId: node identifier of format node/domain/nodeidhex // connectTime: time of connection, milliseconds elapsed since the UNIX epoch. - // connectType: Bitmask, 1 = MeshAgent, 2 = Intel AMT CIRA, 4 = Intel AMT local. + // connectType: Bitmask, 1 = MeshAgent, 2 = Intel AMT CIRA, 4 = Intel AMT local, 8 = Intel AMT Relay, 16 = MQTT // powerState: Value, 0 = Unknown, 1 = S0 power on, 2 = S1 Sleep, 3 = S2 Sleep, 4 = S3 Sleep, 5 = S4 Hibernate, 6 = S5 Soft-Off, 7 = Present - //var connectTypeStrings = ['', 'MeshAgent', 'Intel AMT CIRA', '', 'Intel AMT local']; + //var connectTypeStrings = ['', 'MeshAgent', 'Intel AMT CIRA', '', 'Intel AMT local', '', '', '', 'Intel AMT Relay', '', '', '', '', '', '', '', 'MQTT']; //var powerStateStrings = ['Unknown', 'Powered', 'Sleep', 'Sleep', 'Deep Sleep', 'Hibernating', 'Soft-Off', 'Present']; obj.SetConnectivityState = function (meshid, nodeid, connectTime, connectType, powerState, serverid) { //console.log('SetConnectivity for ' + nodeid.substring(0, 16) + ', Type: ' + connectTypeStrings[connectType] + ', Power: ' + powerStateStrings[powerState] + (serverid == null ? ('') : (', ServerId: ' + serverid))); @@ -1829,7 +1829,7 @@ function mainStart() { if (require('os').platform() == 'win32') { modules.push('node-windows'); if (sspi == true) { modules.push('node-sspi'); } } // Add Windows modules if (ldap == true) { modules.push('ldapauth-fork'); } if (config.letsencrypt != null) { modules.push('greenlock'); modules.push('le-store-certbot'); modules.push('le-challenge-fs'); modules.push('le-acme-core'); } // Add Greenlock Modules - if (config.mqtt != null) { modules.push('mqtt'); modules.push('aedes'); } // Add MQTT Modules + if (config.settings.mqtt != null) { modules.push('aedes'); } // Add MQTT Modules if (config.settings.mongodb != null) { modules.push('mongodb'); } // Add MongoDB, official driver. else if (config.settings.xmongodb != null) { modules.push('mongojs'); } // Add MongoJS, old driver. if (config.smtp != null) { modules.push('nodemailer'); } // Add SMTP support diff --git a/mpsserver.js b/mpsserver.js index 0a6e8335..7135ecfd 100644 --- a/mpsserver.js +++ b/mpsserver.js @@ -238,9 +238,11 @@ module.exports.CreateMpsServer = function (parent, db, args, certificates) { socket.removeAllListeners("close"); socket.setNoDelay(true); socket.serialtunnel = SerialTunnel(); - socket.on('data', function (b) { socket.serialtunnel.updateBuffer(Buffer.from(b, 'binary')) }); + socket.serialtunnel.xtransport = 'mps'; + socket.serialtunnel.xip = socket.remoteAddress; + socket.on("data", function (b) { socket.serialtunnel.updateBuffer(Buffer.from(b, "binary")) }); socket.serialtunnel.forwardwrite = function (b) { socket.write(b, "binary") } - socket.on("close", function () { socket.serialtunnel.emit('end'); }); + socket.on("close", function () { socket.serialtunnel.emit("end"); }); // Pass socket wrapper to the MQTT broker parent.mqttbroker.handle(socket.serialtunnel); diff --git a/mqttbroker.js b/mqttbroker.js index 7836d0c3..42ea6cee 100644 --- a/mqttbroker.js +++ b/mqttbroker.js @@ -1,51 +1,101 @@ /** * @description MQTT broker reference implementation based on AEDES -* @author Joko Banu Sastriawan +* @author Joko Banu Sastriawan, Ylian Saint-Hilaire * @copyright Intel Corporation 2018-2019 * @license Apache-2.0 * @version v0.0.1 */ module.exports.CreateMQTTBroker = function (parent, db, args) { - - // Internal objects container + var obj = {} obj.parent = parent; obj.db = db; obj.args = args; obj.aedes = require("aedes")(); - - // argument parsing -- tbd - - // event handling and filtering - // authentication filter + obj.handle = obj.aedes.handle; + obj.connections = {}; // NodesID --> client array + + // Connection Authentication obj.aedes.authenticate = function (client, username, password, callback) { // TODO: add authentication handler - obj.parent.debug("mqtt", "Authentication with " + username + ":" + password); - callback(null, true); + obj.parent.debug("mqtt", "Authentication with " + username + ":" + password + ":" + client.id + ", " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); + + var usersplit = username.split(':'); + if (usersplit.length != 5) { callback(null, false); return; } + + // Setup the identifiers + var xnodeid = usersplit[1]; + var xmeshid = usersplit[2]; + var xdomainid = usersplit[3]; + + // Convert meshid from HEX to Base64 if needed + if (xmeshid.length == 96) { xmeshid = Buffer.from(xmeshid, 'hex').toString('base64'); } + if ((xmeshid.length != 64) || (xnodeid.length != 64)) { callback(null, false); return; } + + client.xdbNodeKey = 'node/' + xdomainid + '/' + xnodeid; + client.xdbMeshKey = 'mesh/' + xdomainid + '/' + xmeshid; + + // Check if this node exists in the database + db.Get(client.xdbNodeKey, function (err, nodes) { + if ((nodes == null) || (nodes.length != 1)) { callback(null, false); return; } // Node does not exist + + // If this device now has a different meshid, fix it here. + client.xdbMeshKey = nodes[0].meshid; + + if (obj.connections[client.xdbNodeKey] == null) { + obj.connections[client.xdbNodeKey] = [client]; + parent.SetConnectivityState(client.xdbMeshKey, client.xdbNodeKey, Date.now(), 16, 7); // Indicate this node has a MQTT connection, 7 = Present state + } else { + obj.connections[client.xdbNodeKey].push(client); + } + + client.conn.parent = client; + client.conn.on('end', function () { + // client is "this.parent" + obj.parent.debug("mqtt", "Connection closed, " + this.parent.conn.xtransport + "://" + cleanRemoteAddr(this.parent.conn.xip)); + + // Remove this client from the connections list + if ((this.parent.xdbNodeKey != null) && (obj.connections[this.parent.xdbNodeKey] != null)) { + var clients = obj.connections[this.parent.xdbNodeKey], i = clients.indexOf(client); + if (i >= 0) { + if (clients.length == 1) { + delete obj.connections[this.parent.xdbNodeKey]; + parent.ClearConnectivityState(this.parent.xdbMeshKey, this.parent.xdbNodeKey, 16); // Remove the MQTT connection for this node + } else { clients.splice(i, 1); } + } + } + + this.parent.close(); + }); + callback(null, true); + }); } - - // check if a client can publish a packet + + // Check if a client can publish a packet obj.aedes.authorizePublish = function (client, packet, callback) { // TODO: add authorized publish control - obj.parent.debug("mqtt", "AuthorizePublish"); + obj.parent.debug("mqtt", "AuthorizePublish, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(null); } - + // Check if a client can publish a packet obj.aedes.authorizeSubscribe = function (client, sub, callback) { // TODO: add subscription control here - obj.parent.debug("mqtt", "AuthorizeSubscribe"); + obj.parent.debug("mqtt", "AuthorizeSubscribe, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(null, sub); } - + // Check if a client can publish a packet obj.aedes.authorizeForward = function (client, packet) { // TODO: add forwarding control - obj.parent.debug("mqtt", "AuthorizeForward"); + obj.parent.debug("mqtt", "AuthorizeForward, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); + //return packet; return packet; } - obj.handle = obj.aedes.handle; + // Clean a IPv6 address that encodes a IPv4 address + function cleanRemoteAddr(addr) { if (typeof addr != 'string') { return null; } if (addr.indexOf('::ffff:') == 0) { return addr.substring(7); } else { return addr; } } + return obj; } diff --git a/package.json b/package.json index dd3173b8..36e1b4b5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "meshcentral", - "version": "0.4.1-s", + "version": "0.4.1-t", "keywords": [ "Remote Management", "Intel AMT", diff --git a/views/default-min.handlebars b/views/default-min.handlebars index b7bf1f00..6c7f7d99 100644 --- a/views/default-min.handlebars +++ b/views/default-min.handlebars @@ -1 +1 @@ - {{{title}}}
{{{title}}}
{{{title2}}}

{{{logoutControl}}}

 

\ No newline at end of file + {{{title}}}
{{{title}}}
{{{title2}}}

{{{logoutControl}}}

 

\ No newline at end of file diff --git a/views/default-mobile-min.handlebars b/views/default-mobile-min.handlebars index ad9bf7e9..abff9cbb 100644 --- a/views/default-mobile-min.handlebars +++ b/views/default-mobile-min.handlebars @@ -1 +1 @@ - {{{title}}}
{{{title}}}
{{{title2}}}
\ No newline at end of file + {{{title}}}
{{{title}}}
{{{title2}}}
\ No newline at end of file diff --git a/views/default-mobile.handlebars b/views/default-mobile.handlebars index 86c59a10..4e5fdfaf 100644 --- a/views/default-mobile.handlebars +++ b/views/default-mobile.handlebars @@ -1771,6 +1771,7 @@ if ((node.conn & 2) != 0) { states.push('CIRA'); } else if ((node.conn & 4) != 0) { states.push('Intel® AMT'); } if ((node.conn & 8) != 0) { states.push('Relay'); } + if ((node.conn & 16) != 0) { states.push('MQTT'); } } if ((node.pwr != null) && (node.pwr != 0)) { states.push(powerStateStrings[node.pwr]); } return states.join(', '); @@ -1945,6 +1946,7 @@ if ((node.conn & 2) != 0) cstate.push('Intel® AMT CIRA'); else if ((node.conn & 4) != 0) cstate.push('Intel® AMT'); if ((node.conn & 8) != 0) cstate.push('Agent Relay'); + if ((node.conn & 16) != 0) cstate.push('MQTT'); x += addDeviceAttribute('Connectivity', cstate.join(', ')); } @@ -1984,6 +1986,7 @@ if ((connectivity & 1) != 0) { if (powerstate.length > 0) { powerstate += ', '; } powerstate += 'Mesh Agent'; } if ((connectivity & 2) != 0) { if (powerstate.length > 0) { powerstate += ', '; } powerstate += 'Intel® AMT connected'; } else if ((connectivity & 4) != 0) { if (powerstate.length > 0) { powerstate += ', '; } powerstate += 'Intel® AMT detected'; } + if ((connectivity & 16) != 0) { if (powerstate.length > 0) { powerstate += '
'; } powerstate += 'MQTT channel connected'; } QH('MainComputerState', powerstate); // Set the node icon diff --git a/views/default.handlebars b/views/default.handlebars index f0bc0e30..c7889394 100644 --- a/views/default.handlebars +++ b/views/default.handlebars @@ -2151,11 +2151,13 @@ if (((node.conn & 1) == 0) && ((message.event.conn & 1) != 0)) { addNotification({ text: 'Agent connected', title: node.name, icon: node.icon, nodeid: node._id }); } if (((node.conn & 2) == 0) && ((message.event.conn & 2) != 0)) { addNotification({ text: 'Intel AMT detected', title: node.name, icon: node.icon, nodeid: node._id }); } if (((node.conn & 4) == 0) && ((message.event.conn & 4) != 0)) { addNotification({ text: 'Intel AMT CIRA connected', title: node.name, icon: node.icon, nodeid: node._id }); } + if (((node.conn & 16) == 0) && ((message.event.conn & 16) != 0)) { addNotification({ text: 'MQTT connected', title: node.name, icon: node.icon, nodeid: node._id }); } } if (n & 4) { if (((node.conn & 1) != 0) && ((message.event.conn & 1) == 0)) { addNotification({ text: 'Agent disconnected', title: node.name, icon: node.icon, nodeid: node._id }); } if (((node.conn & 2) != 0) && ((message.event.conn & 2) == 0)) { addNotification({ text: 'Intel AMT not detected', title: node.name, icon: node.icon, nodeid: node._id }); } if (((node.conn & 4) != 0) && ((message.event.conn & 4) == 0)) { addNotification({ text: 'Intel AMT CIRA disconnected', title: node.name, icon: node.icon, nodeid: node._id }); } + if (((node.conn & 16) != 0) && ((message.event.conn & 16) == 0)) { addNotification({ text: 'MQTT disconnected', title: node.name, icon: node.icon, nodeid: node._id }); } } // Change the node connection state @@ -2569,6 +2571,7 @@ if ((node.conn & 2) != 0) { states.push('CIRA'); } else if ((node.conn & 4) != 0) { states.push('AMT'); } if ((node.conn & 8) != 0) { states.push('Relay'); } + if ((node.conn & 16) != 0) { states.push('MQTT'); } } r += '
'; r += '
'; @@ -3221,6 +3224,7 @@ if ((node.conn & 2) != 0) { states.push('CIRA'); } else if ((node.conn & 4) != 0) { states.push('Intel® AMT'); } if ((node.conn & 8) != 0) { states.push('Relay'); } + if ((node.conn & 16) != 0) { states.push('MQTT'); } } if ((node.pwr != null) && (node.pwr != 0)) { states.push(powerStateStrings[node.pwr]); } return states.join(', '); @@ -4233,6 +4237,7 @@ if ((node.conn & 2) != 0) cstate.push('Intel® AMT CIRA'); else if ((node.conn & 4) != 0) cstate.push('Intel® AMT'); if ((node.conn & 8) != 0) cstate.push('Mesh Relay'); + if ((node.conn & 16) != 0) { cstate.push('MQTT'); } x += addDeviceAttribute('Connectivity', cstate.join(', ')); } @@ -4285,6 +4290,7 @@ if ((connectivity & 1) != 0) { if (powerstate.length > 0) { powerstate += '
'; } powerstate += 'Agent connected'; } if ((connectivity & 2) != 0) { if (powerstate.length > 0) { powerstate += '
'; } powerstate += 'Intel® AMT connected'; } else if ((connectivity & 4) != 0) { if (powerstate.length > 0) { powerstate += '
'; } powerstate += 'Intel® AMT detected'; } + if ((connectivity & 16) != 0) { if (powerstate.length > 0) { powerstate += '
'; } powerstate += 'MQTT channel connected'; } if ((powerstate == '') && node.lastconnect) { powerstate = 'Last seen:
' + printDateTime(new Date(node.lastconnect)) + '
'; } QH('MainComputerState', powerstate); @@ -7136,6 +7142,7 @@ if (xxdialogMode) return; var x = '', consent = (currentMesh.consent) ? currentMesh.consent : 0; x += '
Desktop
'; + if (debugmode) { x += "
"; } x += "
"; x += "
"; x += '
Terminal
'; diff --git a/webserver.js b/webserver.js index 06a58dce..98506eed 100644 --- a/webserver.js +++ b/webserver.js @@ -3326,11 +3326,16 @@ module.exports.CreateWebServer = function (parent, db, args, certificates) { // Setup MQTT broker over websocket if (obj.parent.mqttbroker != null) { obj.app.ws(url + 'mqtt.ashx', function (ws, req) { - var ser = SerialTunnel(); - ws.on('message', function (b) { ser.updateBuffer(Buffer.from(b, 'binary')) }); - ser.forwardwrite = function (b) { ws.send(b, "binary") } - ws.on("close", function () { ser.emit('end'); }); - obj.parent.mqttbroker.handle(ser); // Pass socket wrapper to MQTT broker + var domain = checkAgentIpAddress(ws, req); + if (domain == null) { parent.debug('web', 'Got agent connection from blocked IP address ' + cleanRemoteAddr(req.ip) + ', holding.'); return; } + var serialtunnel = SerialTunnel(); + serialtunnel.xtransport = 'ws'; + serialtunnel.xdomain = domain; + serialtunnel.xip = req.ip; + ws.on('message', function (b) { serialtunnel.updateBuffer(Buffer.from(b, 'binary')) }); + serialtunnel.forwardwrite = function (b) { ws.send(b, "binary") } + ws.on("close", function () { serialtunnel.emit('end'); }); + obj.parent.mqttbroker.handle(serialtunnel); // Pass socket wrapper to MQTT broker }); }