mirror of
				https://github.com/Ylianst/MeshCentral.git
				synced 2025-10-29 15:25:01 -04:00 
			
		
		
		
	More MQTT improvements.
This commit is contained in:
		
							parent
							
								
									08bf681afa
								
							
						
					
					
						commit
						2f70837ae6
					
				| @ -12,9 +12,13 @@ module.exports.CreateMQTTBroker = function (parent, db, args) { | |||||||
|     obj.parent = parent; |     obj.parent = parent; | ||||||
|     obj.db = db; |     obj.db = db; | ||||||
|     obj.args = args; |     obj.args = args; | ||||||
|     obj.aedes = require("aedes")(); |  | ||||||
|     obj.handle = obj.aedes.handle; |  | ||||||
|     obj.connections = {}; // NodesID --> client array
 |     obj.connections = {}; // NodesID --> client array
 | ||||||
|  |     const aedes = require("aedes")(); | ||||||
|  |     obj.handle = aedes.handle; | ||||||
|  |     const allowedSubscriptionTopics = [ 'presence' ]; | ||||||
|  |     const denyError = new Error('denied'); | ||||||
|  |     var authError = new Error('Auth error') | ||||||
|  |     authError.returnCode = 1 | ||||||
| 
 | 
 | ||||||
|     // Generate a username and password for MQTT login
 |     // Generate a username and password for MQTT login
 | ||||||
|     obj.generateLogin = function (meshid, nodeid) { |     obj.generateLogin = function (meshid, nodeid) { | ||||||
| @ -25,19 +29,28 @@ module.exports.CreateMQTTBroker = function (parent, db, args) { | |||||||
|         return { meshid: meshid, nodeid: nodeid, user: username, pass: parent.config.settings.mqtt.auth.keyid + ':' + nonce + ':' + parent.crypto.createHash('sha384').update(username + ':' + nonce + ':' + parent.config.settings.mqtt.auth.key).digest("base64") }; |         return { meshid: meshid, nodeid: nodeid, user: username, pass: parent.config.settings.mqtt.auth.keyid + ':' + nonce + ':' + parent.crypto.createHash('sha384').update(username + ':' + nonce + ':' + parent.config.settings.mqtt.auth.key).digest("base64") }; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     // Publish a message to a specific nodeid & topic
 | ||||||
|  |     obj.publish = function (nodeid, topic, message) { | ||||||
|  |         var clients = obj.connections[nodeid]; | ||||||
|  |         if (clients == null) return; | ||||||
|  |         if (typeof message == 'string') { message = new Buffer(message); } | ||||||
|  |         for (var i in clients) { clients[i].publish({ cmd: 'publish', qos: 0, topic: topic, payload: message, retain: false }); } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     // Connection Authentication
 |     // Connection Authentication
 | ||||||
|     obj.aedes.authenticate = function (client, username, password, callback) { |     aedes.authenticate = function (client, username, password, callback) { | ||||||
|         obj.parent.debug("mqtt", "Authentication User:" + username + ", Pass:" + password.toString() + ", ClientID:" + client.id + ", " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); |         obj.parent.debug("mqtt", "Authentication User:" + username + ", Pass:" + password.toString() + ", ClientID:" + client.id + ", " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); | ||||||
|  |         console.log('MQTT Connect'); | ||||||
| 
 | 
 | ||||||
|         // Parse the username and password
 |         // Parse the username and password
 | ||||||
|         var usersplit = username.split(':'); |         var usersplit = username.split(':'); | ||||||
|         var passsplit = password.toString().split(':'); |         var passsplit = password.toString().split(':'); | ||||||
|         if ((usersplit.length !== 4) || (passsplit.length !== 3)) { obj.parent.debug("mqtt", "Invalid user/pass format, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(null, false); return; } |         if ((usersplit.length !== 4) || (passsplit.length !== 3)) { obj.parent.debug("mqtt", "Invalid user/pass format, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(authError, null); return; } | ||||||
|         if (usersplit[0] !== 'MCAuth1') { obj.parent.debug("mqtt", "Invalid auth method, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(null, false); return; } |         if (usersplit[0] !== 'MCAuth1') { obj.parent.debug("mqtt", "Invalid auth method, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(authError, null); return; } | ||||||
| 
 | 
 | ||||||
|         // Check authentication
 |         // Check authentication
 | ||||||
|         if (passsplit[0] !== parent.config.settings.mqtt.auth.keyid) { obj.parent.debug("mqtt", "Invalid auth keyid, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(null, false); return; } |         if (passsplit[0] !== parent.config.settings.mqtt.auth.keyid) { obj.parent.debug("mqtt", "Invalid auth keyid, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(authError, null); return; } | ||||||
|         if (parent.crypto.createHash('sha384').update(username + ':' + passsplit[1] + ':' + parent.config.settings.mqtt.auth.key).digest("base64") !== passsplit[2]) { obj.parent.debug("mqtt", "Invalid password, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(null, false); return; } |         if (parent.crypto.createHash('sha384').update(username + ':' + passsplit[1] + ':' + parent.config.settings.mqtt.auth.key).digest("base64") !== passsplit[2]) { obj.parent.debug("mqtt", "Invalid password, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(authError, null); return; } | ||||||
| 
 | 
 | ||||||
|         // Setup the identifiers
 |         // Setup the identifiers
 | ||||||
|         const xnodeid = usersplit[1]; |         const xnodeid = usersplit[1]; | ||||||
| @ -49,16 +62,15 @@ module.exports.CreateMQTTBroker = function (parent, db, args) { | |||||||
| 
 | 
 | ||||||
|         // Convert meshid from HEX to Base64 if needed
 |         // Convert meshid from HEX to Base64 if needed
 | ||||||
|         if (xmeshid.length === 96) { xmeshid = Buffer.from(xmeshid, 'hex').toString('base64'); } |         if (xmeshid.length === 96) { xmeshid = Buffer.from(xmeshid, 'hex').toString('base64'); } | ||||||
|         if ((xmeshid.length !== 64) || (xnodeid.length != 64)) { callback(null, false); return; } |         if ((xmeshid.length !== 64) || (xnodeid.length != 64)) { callback(authError, null); return; } | ||||||
| 
 | 
 | ||||||
|  |         // Set the client nodeid and meshid
 | ||||||
|         client.xdbNodeKey = 'node/' + xdomainid + '/' + xnodeid; |         client.xdbNodeKey = 'node/' + xdomainid + '/' + xnodeid; | ||||||
|         client.xdbMeshKey = 'mesh/' + xdomainid + '/' + xmeshid; |         client.xdbMeshKey = 'mesh/' + xdomainid + '/' + xmeshid; | ||||||
| 
 | 
 | ||||||
|         //console.log(obj.generateLogin(client.xdbMeshKey, client.xdbNodeKey));
 |  | ||||||
| 
 |  | ||||||
|         // Check if this node exists in the database
 |         // Check if this node exists in the database
 | ||||||
|         db.Get(client.xdbNodeKey, function (err, nodes) { |         db.Get(client.xdbNodeKey, function (err, nodes) { | ||||||
|             if ((nodes == null) || (nodes.length != 1)) { callback(null, false); return; } // Node does not exist
 |             if ((nodes == null) || (nodes.length != 1)) { callback(authError, null); return; } // Node does not exist
 | ||||||
| 
 | 
 | ||||||
|             // If this device now has a different meshid, fix it here.
 |             // If this device now has a different meshid, fix it here.
 | ||||||
|             client.xdbMeshKey = nodes[0].meshid; |             client.xdbMeshKey = nodes[0].meshid; | ||||||
| @ -93,27 +105,33 @@ module.exports.CreateMQTTBroker = function (parent, db, args) { | |||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     // Check if a client can publish a packet
 |     // Check if a client can publish a packet
 | ||||||
|     obj.aedes.authorizePublish = function (client, packet, callback) { |     aedes.authorizeSubscribe = function (client, sub, callback) { | ||||||
|         // TODO: add authorized publish control
 |         // Subscription control
 | ||||||
|         //console.log(packet);
 |         obj.parent.debug("mqtt", "AuthorizeSubscribe \"" + sub.topic + "\", " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); | ||||||
|         obj.parent.debug("mqtt", "AuthorizePublish, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); |         if (allowedSubscriptionTopics.indexOf(sub.topic) === -1) { sub = null; } // If not a supported subscription, deny it.
 | ||||||
|         callback(null); |         callback(null, sub); // We authorize supported topics, but will not allow agents to publish anything to other agents.
 | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     // Check if a client can publish a packet
 |     // Check if a client can publish a packet
 | ||||||
|     obj.aedes.authorizeSubscribe = function (client, sub, callback) { |     aedes.authorizePublish = function (client, packet, callback) { | ||||||
|         // TODO: add subscription control here
 |         // Handle a published message
 | ||||||
|         obj.parent.debug("mqtt", "AuthorizeSubscribe \"" + sub.topic + "\", " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); |         obj.parent.debug("mqtt", "AuthorizePublish, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); | ||||||
|         callback(null, sub); |         handleMessage(client.xdbNodeKey, client.xdbNodeKey, packet.topic, packet.payload); | ||||||
|  |         //callback(denyError); // Deny all, clients can't publish anything to other agents.
 | ||||||
|  |         //callback(null); // Deny all, clients can't publish anything to other agents.
 | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     // Check if a client can forward a packet
 |     // Check if a client can forward a packet
 | ||||||
|     obj.aedes.authorizeForward = function (client, packet) { |     //aedes.authorizeForward = function (client, packet) {
 | ||||||
|         // TODO: add forwarding control
 |         // TODO: add forwarding control
 | ||||||
|         //console.log(packet);
 |         //obj.parent.debug("mqtt", "AuthorizeForward, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip));
 | ||||||
|         obj.parent.debug("mqtt", "AuthorizeForward, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); |  | ||||||
|         //return packet;
 |         //return packet;
 | ||||||
|         return packet; |     //}
 | ||||||
|  | 
 | ||||||
|  |     // Handle messages coming from clients
 | ||||||
|  |     function handleMessage(nodeid, meshid, topic, message) { | ||||||
|  |         console.log('handleMessage', nodeid, topic, message.toString()); | ||||||
|  |         obj.publish(nodeid, 'abc', "This is a server reply"); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     // Clean a IPv6 address that encodes a IPv4 address
 |     // Clean a IPv6 address that encodes a IPv4 address
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user