More MQTT improvements.

This commit is contained in:
Ylian Saint-Hilaire 2019-10-08 11:08:41 -07:00
parent 241b42ac00
commit 00e6ff0e91
5 changed files with 183 additions and 19 deletions

View File

@ -2882,6 +2882,45 @@ module.exports.CreateMeshUser = function (parent, db, ws, req, args, domain, use
}
break;
}
case 'sendmqttmsg': {
if (common.validateArray(command.nodeids, 1) == false) { err = 'Invalid nodeids'; }; // Check nodeid's
if (common.validateString(command.topic, 1, 64) == false) { err = 'Invalid topic'; } // Check the topic
if (common.validateString(command.msg, 1, 4096) == false) { err = 'Invalid msg'; } // Check the message
// Handle any errors
if (err != null) {
if (command.responseid != null) { try { ws.send(JSON.stringify({ action: 'sendmqttmsg', responseid: command.responseid, result: err })); } catch (ex) { } }
break;
}
// TODO: We can optimize this a lot.
// - We should get a full list of all MAC's to wake first.
// - We should try to only have one agent per subnet (using Gateway MAC) send a wake-on-lan.
for (i in command.nodeids) {
nodeid = command.nodeids[i];
var wakeActions = 0;
if (common.validateString(nodeid, 1, 1024) == false) break; // Check nodeid
if ((nodeid.split('/').length == 3) && (nodeid.split('/')[1] == domain.id)) { // Validate the domain, operation only valid for current domain
// Get the device
db.Get(nodeid, function (err, nodes) {
if ((nodes == null) || (nodes.length != 1)) return;
var node = nodes[0];
// Get the mesh for this device
mesh = parent.meshes[node.meshid];
if (mesh) {
// Check if this user has rights to do this
if (mesh.links[user._id] != null && ((mesh.links[user._id].rights & 64) != 0)) {
// If this device is connected on MQTT, send a wake action.
if (parent.parent.mqttbroker != null) { parent.parent.mqttbroker.publish(node._id, command.topic, command.msg); }
}
}
});
}
}
break;
}
case 'getmqttlogin': {
var err = null;
if (parent.parent.mqttbroker == null) { err = 'MQTT not supported on this server'; }
@ -2927,7 +2966,6 @@ module.exports.CreateMeshUser = function (parent, db, ws, req, args, domain, use
}
});
}
break;
}
case 'amt': {

View File

@ -107,7 +107,7 @@ module.exports.CreateMQTTBroker = function (parent, db, args) {
aedes.authorizePublish = function (client, packet, callback) {
// Handle a published message
obj.parent.debug("mqtt", "AuthorizePublish, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip));
handleMessage(client.xdbNodeKey, client.xdbNodeKey, packet.topic, packet.payload);
handleMessage(client.xdbNodeKey, client.xdbMeshKey, packet.topic, packet.payload);
// We don't accept that any client message be published, so don't call the callback.
}
@ -129,7 +129,9 @@ module.exports.CreateMQTTBroker = function (parent, db, args) {
// Handle messages coming from clients
function handleMessage(nodeid, meshid, topic, message) {
// TODO: Handle messages here.
// Handle messages here
if (topic == 'console') { routeMessage({ action: 'msg', type: 'console', value: message.toString(), source: 'MQTT' }, nodeid, meshid); return; } // Handle console messages
//console.log('handleMessage', nodeid, topic, message.toString());
//obj.publish(nodeid, 'echoTopic', "Echo: " + message.toString());
}
@ -137,5 +139,79 @@ module.exports.CreateMQTTBroker = function (parent, db, args) {
// Clean a IPv6 address that encodes a IPv4 address
function cleanRemoteAddr(addr) { if (typeof addr != 'string') { return null; } if (addr.indexOf('::ffff:') == 0) { return addr.substring(7); } else { return addr; } }
// Route a message
function routeMessage(command, dbNodeKey, dbMeshKey) {
// Route a message.
// If this command has a sessionid, that is the target.
if (command.sessionid != null) {
if (typeof command.sessionid != 'string') return;
var splitsessionid = command.sessionid.split('/');
// Check that we are in the same domain and the user has rights over this node.
if ((splitsessionid[0] == 'user') && (splitsessionid[1] == domain.id)) {
// Check if this user has rights to get this message
//if (mesh.links[user._id] == null || ((mesh.links[user._id].rights & 16) == 0)) return; // TODO!!!!!!!!!!!!!!!!!!!!!
// See if the session is connected. If so, go ahead and send this message to the target node
var ws = parent.webserver.wssessions2[command.sessionid];
if (ws != null) {
command.nodeid = dbNodeKey; // Set the nodeid, required for responses.
delete command.sessionid; // Remove the sessionid, since we are sending to that sessionid, so it's implyed.
try { ws.send(JSON.stringify(command)); } catch (ex) { }
} else if (parent.multiServer != null) {
// See if we can send this to a peer server
var serverid = parent.webserver.wsPeerSessions2[command.sessionid];
if (serverid != null) {
command.fromNodeid = dbNodeKey;
parent.multiServer.DispatchMessageSingleServer(command, serverid);
}
}
}
} else if (command.userid != null) { // If this command has a userid, that is the target.
if (typeof command.userid != 'string') return;
var splituserid = command.userid.split('/');
// Check that we are in the same domain and the user has rights over this node.
if ((splituserid[0] == 'user') && (splituserid[1] == domain.id)) {
// Check if this user has rights to get this message
//if (mesh.links[user._id] == null || ((mesh.links[user._id].rights & 16) == 0)) return; // TODO!!!!!!!!!!!!!!!!!!!!!
// See if the session is connected
var sessions = parent.webserver.wssessions[command.userid];
// Go ahead and send this message to the target node
if (sessions != null) {
command.nodeid = dbNodeKey; // Set the nodeid, required for responses.
delete command.userid; // Remove the userid, since we are sending to that userid, so it's implyed.
for (i in sessions) { sessions[i].send(JSON.stringify(command)); }
}
if (parent.multiServer != null) {
// TODO: Add multi-server support
}
}
} else { // Route this command to the mesh
command.nodeid = dbNodeKey;
var cmdstr = JSON.stringify(command);
for (var userid in parent.webserver.wssessions) { // Find all connected users for this mesh and send the message
var user = parent.webserver.users[userid];
if ((user != null) && (user.links != null)) {
var rights = user.links[dbMeshKey];
if (rights != null) { // TODO: Look at what rights are needed for message routing
var xsessions = parent.webserver.wssessions[userid];
// Send the message to all users on this server
for (i in xsessions) { try { xsessions[i].send(cmdstr); } catch (e) { } }
}
}
}
// Send the message to all users of other servers
if (parent.multiServer != null) {
delete command.nodeid;
command.fromNodeid = dbNodeKey;
command.meshid = dbMeshKey;
parent.multiServer.DispatchMessage(command);
}
}
}
return obj;
}

View File

@ -1,6 +1,6 @@
{
"name": "meshcentral",
"version": "0.4.1-w",
"version": "0.4.1-x",
"keywords": [
"Remote Management",
"Intel AMT",

File diff suppressed because one or more lines are too long

View File

@ -702,6 +702,12 @@
<input id=p15consoleText style=width:100% onkeyup=p15consoleSend(event) onfocus=onConsoleFocus(1) onblur=onConsoleFocus(0) />
</td>
<td>&nbsp;</td>
<td id="p15outputselecttd">
<select id=p15outputselect>
<option value=1>Agent</option>
<option value=2>MQTT</option>
</select>
</td>
<td style="width:1%"><input id="id_p15consoleClear" type="button" class="bottombutton" value="Clear" onclick="p15consoleClear()"></td>
</tr>
</table>
@ -1591,7 +1597,7 @@
if (nodes != null) { for (var i in nodes) { if (nodes[i]._id == message.nodeid) { index = i; break; } } }
if (index != -1) {
// Node was found, dispatch the message
if (message.type == 'console') { p15consoleReceive(nodes[index], message.value); } // This is a console message.
if (message.type == 'console') { p15consoleReceive(nodes[index], message.value, message.source); } // This is a console message.
else if (message.type == 'notify') { // This is a notification message.
var n = getstore('notifications', 0);
if (((n & 8) == 0) && (message.amtMessage != null)) { break; } // Intel AMT desktop & terminal messages should be ignored.
@ -3282,8 +3288,13 @@
}
function groupActionFunction() {
var mqttx = '';
if (features & 0x00400000) { // Check if any of the selected devices have a MQTT connection active
var nodeids = getCheckedDevices();
for (var i in nodeids) { if ((getNodeFromId(nodeids[i]).conn & 16) != 0) { mqttx = '<option value=103>Send MQTT Message</option>'; } }
}
var x = "Select an operation to perform on all selected devices. Actions will be performed only with proper rights.<br /><br />";
x += addHtmlValue('Operation', '<select id=d2groupop><option value=100>Wake-up devices</option><option value=4>Sleep devices</option><option value=3>Reset devices</option><option value=2>Power off devices</option><option value=102>Move to device group</option><option value=101>Delete devices</option></select>');
x += addHtmlValue('Operation', '<select id=d2groupop><option value=100>Wake-up devices</option><option value=4>Sleep devices</option><option value=3>Reset devices</option><option value=2>Power off devices</option><option value=102>Move to device group</option>' + mqttx + '<option value=101>Delete devices</option></select>');
setDialogMode(2, "Group Action", 3, groupActionFunctionEx, x);
}
@ -3308,6 +3319,9 @@
} else if (op == 102) {
// Move computers to a different group
p10showChangeGroupDialog(getCheckedDevices());
} else if (op == 103) {
// Send MQTT Message
p10showSendMqttMsgDialog(getCheckedDevices());
} else {
// Power operation
meshserver.send({ action: 'poweraction', nodeids: getCheckedDevices(), actiontype: parseInt(op) });
@ -4434,6 +4448,7 @@
var y = '<select id=d2deviceop style=float:right;width:250px>';
if ((meshrights & 64) != 0) { y += '<option value=100>Wake-up</option>'; } // Wake-up permission
if ((meshrights & 8) != 0) { y += '<option value=4>Sleep</option><option value=3>Reset</option><option value=2>Power off</option>'; } // Remote control permission
if ((currentNode.conn & 16) != 0) { y += '<option value=103>Send MQTT Message</option>'; }
y += '</select>';
x += addHtmlValue('Operation', y);
setDialogMode(2, "Device Action", 3, deviceActionFunctionEx, x);
@ -4443,7 +4458,10 @@
var op = Q('d2deviceop').value;
if (op == 100) {
// Device wake
meshserver.send({ action: 'wakedevices', nodeids: [ currentNode._id ] });
meshserver.send({ action: 'wakedevices', nodeids: [currentNode._id] });
} else if (op == 103) {
// Send MQTT Message
p10showSendMqttMsgDialog([currentNode._id]);
} else {
// Power operation
meshserver.send({ action: 'poweraction', nodeids: [ currentNode._id ], actiontype: parseInt(op) });
@ -4572,6 +4590,24 @@
}
}
function p10showSendMqttMsgDialog(nodeids) {
if (xxdialogMode) return false;
var x = addHtmlValue('Topic', '<input id=dp2topic style=width:230px maxlength=64 onchange=p10validateSendMqttMsgDialog() onkeyup=p10validateSendMqttMsgDialog(event,1) />');
x += addHtmlValue('Message', '<div style=width:230px;margin:0;padding:0><textarea id=dp2msg maxlength=4096 style=width:100%;height:150px;resize:none onchange=p10validateSendMqttMsgDialog() onkeyup=p10validateSendMqttMsgDialog(event,1)></textarea></div>');
setDialogMode(2, "Send MQTT message", 3, p10showSendMqttMsgDialogEx, x, nodeids);
p10validateSendMqttMsgDialog();
Q('dp2topic').focus();
return false;
}
function p10validateSendMqttMsgDialog() {
QE('idx_dlgOkButton', (Q('dp2topic').value.length > 0) && (Q('dp2msg').value.length > 0));
}
function p10showSendMqttMsgDialogEx(b, nodeids) {
meshserver.send({ action: 'sendmqttmsg', nodeids: nodeids, topic: Q('dp2topic').value, msg: Q('dp2msg').value });
}
function p10showChangeGroupDialog(nodeids) {
if (xxdialogMode) return false;
var targetMeshId = null;
@ -6416,6 +6452,7 @@
QE('p15consoleText', true);
QH('p15statetext', '');
QH('p15coreName', '');
QV('p15outputselecttd', false);
if (samenode == false) {
QH('p15agentConsoleText', consoleServerText);
@ -6434,14 +6471,18 @@
QH('p15agentConsoleText', consoleNode.consoleText);
Q('p15agentConsoleText').scrollTop = Q('p15agentConsoleText').scrollHeight;
}
var online = ((consoleNode.conn & 1) != 0) ? true : false;
QH('p15statetext', online ? "Agent is online" : "Agent is offline");
var online = (((consoleNode.conn & 1) != 0) || ((consoleNode.conn & 16) != 0)) ? true : false;
var onlineText = ((consoleNode.conn & 1) != 0) ? "Agent is online" : "Agent is offline"
if ((consoleNode.conn & 16) != 0) { onlineText += ', MQTT is online' }
QH('p15statetext', onlineText);
QE('p15consoleText', online);
QE('p15uploadCore', online);
QV('p15outputselecttd', (consoleNode.conn & 17) == 17);
} else {
QH('p15statetext', 'Access Denied');
QE('p15consoleText', false);
QE('p15uploadCore', false);
QV('p15outputselecttd', false);
}
}
}
@ -6461,21 +6502,29 @@
var consoleHistory = [];
function p15consoleSend(e) {
if (e && e.keyCode != 13) return;
var v = Q('p15consoleText').value, t = '<div style=color:green>&gt; ' + EscapeHtml(Q('p15consoleText').value) + '<br/></div>';
Q('p15agentConsoleText').innerHTML += t;
Q('p15agentConsoleText').scrollTop = Q('p15agentConsoleText').scrollHeight;
Q('p15consoleText').value = '';
var v = Q('p15consoleText').value, t = '<div style=color:green>&gt; ' + v + '<br/></div>';
if (xxcurrentView == 115) {
// Send the command to the server - TODO: In the future, we may support multiple servers.
consoleServerText += t;
meshserver.send({ action: 'serverconsole', value: v });
} else {
// Send the command to the mesh agent
consoleNode.consoleText += t;
meshserver.send({ action: 'msg', type: 'console', nodeid: consoleNode._id, value: v });
if (((consoleNode.conn & 16) != 0) && ((Q('p15outputselect').value == 2) || ((consoleNode.conn & 1) == 0))) {
// Send the command to MQTT
t = '<div style=color:orange>MQTT&gt; ' + EscapeHtml(v) + '<br/></div>';
consoleNode.consoleText += t;
meshserver.send({ action: 'sendmqttmsg', topic: 'console', nodeids: [ consoleNode._id ], msg: v });
} else {
// Send the command to the mesh agent
consoleNode.consoleText += t;
meshserver.send({ action: 'msg', type: 'console', nodeid: consoleNode._id, value: v });
}
}
Q('p15agentConsoleText').innerHTML += t;
Q('p15agentConsoleText').scrollTop = Q('p15agentConsoleText').scrollHeight;
Q('p15consoleText').value = '';
// Add command to history list
if (v.length > 0) {
// Move this command to the top if it already exists
@ -6487,10 +6536,10 @@
}
// Handle Mesh Agent console data
function p15consoleReceive(node, data) {
data = '<div>' + data + '</div>'
function p15consoleReceive(node, data, source) {
if (node === 'serverconsole') {
// Server console data
data = '<div>' + EscapeHtml(data) + '</div>'
consoleServerText += data;
if (consoleNode == 'server') {
Q('p15agentConsoleText').innerHTML += data;
@ -6498,6 +6547,7 @@
}
} else {
// Agent console data
if (source == 'MQTT') { data = '<div style=color:red>MQTT&gt; ' + EscapeHtml(data) + '<br/></div>'; } else { data = '<div>' + EscapeHtml(data) + '</div>' }
if (node.consoleText == null) { node.consoleText = data; } else { node.consoleText += data; }
if (consoleNode == node) {
Q('p15agentConsoleText').innerHTML += data;