mirror of
https://github.com/Ylianst/MeshCentral.git
synced 2024-12-26 07:05:52 -05:00
Merge pull request #530 from jsastriawan/add_mqtt
Added MQTT support over WSS and multiplexed with MPS
This commit is contained in:
commit
8ada7e39dc
@ -27,6 +27,7 @@ function CreateMeshCentralServer(config, args) {
|
|||||||
obj.redirserver = null;
|
obj.redirserver = null;
|
||||||
obj.mpsserver = null;
|
obj.mpsserver = null;
|
||||||
obj.apfserver = null;
|
obj.apfserver = null;
|
||||||
|
obj.mqttbroker = null;
|
||||||
obj.swarmserver = null;
|
obj.swarmserver = null;
|
||||||
obj.mailserver = null;
|
obj.mailserver = null;
|
||||||
obj.amtEventHandler = null;
|
obj.amtEventHandler = null;
|
||||||
@ -823,6 +824,9 @@ function CreateMeshCentralServer(config, args) {
|
|||||||
|
|
||||||
// Create APF server to hook into webserver
|
// Create APF server to hook into webserver
|
||||||
obj.apfserver = require('./apfserver.js').CreateApfServer(obj, obj.db, obj.args);
|
obj.apfserver = require('./apfserver.js').CreateApfServer(obj, obj.db, obj.args);
|
||||||
|
// Create MQTT Broker to hook into webserver and mpsserver
|
||||||
|
obj.mqttbroker = require("./mqttbroker.js").CreateMQTTBroker(obj,obj.db,obj.args);
|
||||||
|
|
||||||
// Start the web server and if needed, the redirection web server.
|
// Start the web server and if needed, the redirection web server.
|
||||||
obj.webserver = require('./webserver.js').CreateWebServer(obj, obj.db, obj.args, obj.certificates);
|
obj.webserver = require('./webserver.js').CreateWebServer(obj, obj.db, obj.args, obj.certificates);
|
||||||
if (obj.redirserver != null) { obj.redirserver.hookMainWebServer(obj.certificates); }
|
if (obj.redirserver != null) { obj.redirserver.hookMainWebServer(obj.certificates); }
|
||||||
|
63
mpsserver.js
63
mpsserver.js
@ -159,6 +159,44 @@ module.exports.CreateMpsServer = function (parent, db, args, certificates) {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// required for TLS piping to MQTT broker
|
||||||
|
function SerialTunnel(options) {
|
||||||
|
var obj = new require('stream').Duplex(options);
|
||||||
|
obj.forwardwrite = null;
|
||||||
|
obj.updateBuffer = function (chunk) { this.push(chunk); };
|
||||||
|
obj._write = function (chunk, encoding, callback) { if (obj.forwardwrite != null) { obj.forwardwrite(chunk); } else { console.err("Failed to fwd _write."); } if (callback) callback(); }; // Pass data written to forward
|
||||||
|
obj._read = function (size) { }; // Push nothing, anything to read should be pushed from updateBuffer()
|
||||||
|
return obj;
|
||||||
|
}
|
||||||
|
|
||||||
|
function getMQTTPacketLength(chunk) {
|
||||||
|
var packet_len = 0;
|
||||||
|
if (chunk.readUInt8(0)==16) {
|
||||||
|
if (chunk.readUInt8(1) < 128 ) {
|
||||||
|
packet_len += chunk.readUInt8(1) + 2;
|
||||||
|
} else {
|
||||||
|
// continuation bit, get real value and do next
|
||||||
|
packet_len += (chunk.readUInt8(1) & 0x7F) + 2;
|
||||||
|
if (chunk.readUInt8(2) < 128) {
|
||||||
|
packet_len += 1 + chunk.readUInt8(2) * 128;
|
||||||
|
} else {
|
||||||
|
packet_len += 1 + (chunk.readUInt8(2) & 0x7F) * 128;
|
||||||
|
if (chunk.readUInt8(3) < 128) {
|
||||||
|
packet_len += 1 + chunk.readUInt8(3) * 128 * 128;
|
||||||
|
} else {
|
||||||
|
packet_len += 1 + (chunk.readUInt8(3) & 0x7F) * 128 * 128;
|
||||||
|
if (chunk.readUInt8(4) < 128) {
|
||||||
|
packet_len += 1 + chunk.readUInt8(4) * 128 * 128 * 128;
|
||||||
|
} else {
|
||||||
|
packet_len += 1 + (chunk.readUInt8(4) & 0x7F) * 128* 128 * 128;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return packet_len;
|
||||||
|
}
|
||||||
|
|
||||||
function onConnection(socket) {
|
function onConnection(socket) {
|
||||||
connectionCount++;
|
connectionCount++;
|
||||||
if (obj.args.mpstlsoffload) {
|
if (obj.args.mpstlsoffload) {
|
||||||
@ -182,6 +220,31 @@ module.exports.CreateMpsServer = function (parent, db, args, certificates) {
|
|||||||
if (socket.tag.accumulator.length < 3) return;
|
if (socket.tag.accumulator.length < 3) return;
|
||||||
//if (!socket.tag.clientCert.subject) { console.log("MPS Connection, no client cert: " + socket.remoteAddress); socket.write('HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\nMeshCentral2 MPS server.\r\nNo client certificate given.'); socket.end(); return; }
|
//if (!socket.tag.clientCert.subject) { console.log("MPS Connection, no client cert: " + socket.remoteAddress); socket.write('HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\nMeshCentral2 MPS server.\r\nNo client certificate given.'); socket.end(); return; }
|
||||||
if (socket.tag.accumulator.substring(0, 3) == "GET") { if (args.mpsdebug) { console.log("MPS Connection, HTTP GET detected: " + socket.remoteAddress); } socket.write("HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\n\r\n<!DOCTYPE html><html><head><meta charset=\"UTF-8\"></head><body>MeshCentral2 MPS server.<br />Intel® AMT computers should connect here.</body></html>"); socket.end(); return; }
|
if (socket.tag.accumulator.substring(0, 3) == "GET") { if (args.mpsdebug) { console.log("MPS Connection, HTTP GET detected: " + socket.remoteAddress); } socket.write("HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\n\r\n<!DOCTYPE html><html><head><meta charset=\"UTF-8\"></head><body>MeshCentral2 MPS server.<br />Intel® AMT computers should connect here.</body></html>"); socket.end(); return; }
|
||||||
|
|
||||||
|
var chunk = Buffer.from(socket.tag.accumulator,"binary");
|
||||||
|
var packet_len = 0;
|
||||||
|
if (chunk.readUInt8(0)==16) {
|
||||||
|
packet_len = getMQTTPacketLength(chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (chunk.readUInt8(0)==16 && (socket.tag.accumulator.length < packet_len )) return;// minimum MQTT detection
|
||||||
|
|
||||||
|
// check if it is MQTT, need more initial packet to probe
|
||||||
|
if (chunk.readUInt8(0) == 16 && ((chunk.slice(4, 8).toString() === "MQTT") || (chunk.slice(5, 9).toString() === "MQTT")
|
||||||
|
|| (chunk.slice(6, 10).toString() === "MQTT") || (chunk.slice(7, 11).toString() === "MQTT"))) {
|
||||||
|
parent.debug("mps", "MQTT connection detected.");
|
||||||
|
socket.removeAllListeners("data");
|
||||||
|
socket.removeAllListeners("close");
|
||||||
|
socket.setNoDelay(true);
|
||||||
|
socket.serialtunnel = SerialTunnel();
|
||||||
|
socket.on('data', function(b) { socket.serialtunnel.updateBuffer(Buffer.from(b,'binary'))});
|
||||||
|
socket.serialtunnel.forwardwrite = function(b) { socket.write(b,"binary")}
|
||||||
|
socket.on("close", function() { socket.serialtunnel.emit('end');});
|
||||||
|
//pass socket wrapper to mqtt broker
|
||||||
|
parent.mqttbroker.handle(socket.serialtunnel);
|
||||||
|
socket.unshift(socket.tag.accumulator);
|
||||||
|
return;
|
||||||
|
}
|
||||||
socket.tag.first = false;
|
socket.tag.first = false;
|
||||||
|
|
||||||
// Setup this node with certificate authentication
|
// Setup this node with certificate authentication
|
||||||
|
54
mqttbroker.js
Normal file
54
mqttbroker.js
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
/**
|
||||||
|
* @description MQTT broker reference implementation based on AEDES
|
||||||
|
* @author Joko Banu Sastriawan
|
||||||
|
* @copyright Intel Corporation 2018-2019
|
||||||
|
* @license Apache-2.0
|
||||||
|
* @version v0.0.1
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
module.exports.CreateMQTTBroker = function (parent, db, args) {
|
||||||
|
|
||||||
|
// internal objects container
|
||||||
|
var obj = {}
|
||||||
|
obj.parent = parent;
|
||||||
|
obj.db = db;
|
||||||
|
obj.args = args;
|
||||||
|
|
||||||
|
obj.aedes = require("aedes")();
|
||||||
|
|
||||||
|
|
||||||
|
// argument parsing -- tbd
|
||||||
|
|
||||||
|
// event handling and filtering
|
||||||
|
// authentication filter
|
||||||
|
obj.aedes.authenticate = function (client, username, password, callback) {
|
||||||
|
// accept all user
|
||||||
|
// TODO: add authentication handler
|
||||||
|
obj.parent.debug("mqtt","Authentication with "+username+":"+password);
|
||||||
|
callback(null, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if a client can publish a packet
|
||||||
|
obj.aedes.authorizePublish = function (client, packet, callback) {
|
||||||
|
//TODO: add authorized publish control
|
||||||
|
obj.parent.debug("mqtt","AuthorizePublish");
|
||||||
|
callback(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if a client can publish a packet
|
||||||
|
obj.aedes.authorizeSubscribe = function (client, sub, callback) {
|
||||||
|
//TODO: add subscription control here
|
||||||
|
obj.parent.debug("mqtt","AuthorizeSubscribe");
|
||||||
|
callback(null, sub);
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if a client can publish a packet
|
||||||
|
obj.aedes.authorizeForward = function (client, packet) {
|
||||||
|
//TODO: add forwarding control
|
||||||
|
obj.parent.debug("mqtt","AuthorizeForward");
|
||||||
|
return packet;
|
||||||
|
}
|
||||||
|
obj.handle = obj.aedes.handle;
|
||||||
|
return obj;
|
||||||
|
}
|
@ -27,6 +27,7 @@
|
|||||||
"sample-config.json"
|
"sample-config.json"
|
||||||
],
|
],
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
"aedes": "^0.39.0",
|
||||||
"archiver": "^3.0.0",
|
"archiver": "^3.0.0",
|
||||||
"body-parser": "^1.19.0",
|
"body-parser": "^1.19.0",
|
||||||
"cbor": "4.1.5",
|
"cbor": "4.1.5",
|
||||||
@ -39,9 +40,11 @@
|
|||||||
"ipcheck": "^0.1.0",
|
"ipcheck": "^0.1.0",
|
||||||
"meshcentral": "*",
|
"meshcentral": "*",
|
||||||
"minimist": "^1.2.0",
|
"minimist": "^1.2.0",
|
||||||
|
"mqtt": "^3.0.0",
|
||||||
"multiparty": "^4.2.1",
|
"multiparty": "^4.2.1",
|
||||||
"nedb": "^1.8.0",
|
"nedb": "^1.8.0",
|
||||||
"node-forge": "^0.8.4",
|
"node-forge": "^0.8.4",
|
||||||
|
"otplib": "^11.0.1",
|
||||||
"ws": "^6.2.1",
|
"ws": "^6.2.1",
|
||||||
"xmldom": "^0.1.27",
|
"xmldom": "^0.1.27",
|
||||||
"yauzl": "^2.10.0"
|
"yauzl": "^2.10.0"
|
||||||
|
10
webserver.js
10
webserver.js
@ -3323,6 +3323,16 @@ module.exports.CreateWebServer = function (parent, db, args, certificates) {
|
|||||||
try { obj.meshAgentHandler.CreateMeshAgent(obj, obj.db, ws, req, obj.args, domain); } catch (e) { console.log(e); }
|
try { obj.meshAgentHandler.CreateMeshAgent(obj, obj.db, ws, req, obj.args, domain); } catch (e) { console.log(e); }
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// MQTT broker over websocket
|
||||||
|
obj.app.ws(url+'mqtt.ashx', function (ws, req) {
|
||||||
|
var ser = SerialTunnel();
|
||||||
|
ws.on('message', function(b) { ser.updateBuffer(Buffer.from(b,'binary'))});
|
||||||
|
ser.forwardwrite = function(b) { ws.send(b,"binary")}
|
||||||
|
ws.on("close", function() { ser.emit('end');});
|
||||||
|
//pass socket wrapper to mqtt broker
|
||||||
|
obj.parent.mqttbroker.handle(ser);
|
||||||
|
})
|
||||||
|
|
||||||
// Memory Tracking
|
// Memory Tracking
|
||||||
if (typeof obj.args.memorytracking == 'number') {
|
if (typeof obj.args.memorytracking == 'number') {
|
||||||
obj.app.get(url + 'memorytracking.csv', function (req, res) {
|
obj.app.get(url + 'memorytracking.csv', function (req, res) {
|
||||||
|
Loading…
Reference in New Issue
Block a user