From 11ccefc61295cd3f68bf05784e4dc9acb1e992bf Mon Sep 17 00:00:00 2001 From: Ylian Saint-Hilaire Date: Mon, 27 Apr 2020 16:25:54 -0700 Subject: [PATCH] More work on desktop multiplexor. --- meshdesktopmultiplex.js | 174 ++++++++++++++++++++++++++++------------ webserver.js | 1 + 2 files changed, 123 insertions(+), 52 deletions(-) diff --git a/meshdesktopmultiplex.js b/meshdesktopmultiplex.js index a0859fdd..148814db 100644 --- a/meshdesktopmultiplex.js +++ b/meshdesktopmultiplex.js @@ -86,13 +86,11 @@ function CreateDesktopDecoder() { // Add an agent or viewer obj.addPeer = function (peer) { if (peer.req.query.browser) { - console.log('addPeer-viewer'); - - // This is a viewer + //console.log('addPeer-viewer'); + + // Setup the viewer if (obj.viewers.indexOf(peer) >= 0) return true; obj.viewers.push(peer); - - // Setup the viewer peer.desktopPaused = true; peer.imageCompression = 30; peer.imageScaling = 1024; @@ -101,19 +99,22 @@ function CreateDesktopDecoder() { peer.dataPtr = obj.firstData; peer.sending = false; peer.sendQueue = []; + peer.paused = false; + + // Indicated we are connected + obj.sendToViewer(peer, 'c'); } else { - console.log('addPeer-agent'); + //console.log('addPeer-agent'); if (obj.agent != null) return false; - // This is the agent - obj.agent = peer; - // Setup the agent + obj.agent = peer; peer.sending = false; peer.sendQueue = []; - //peer.ws.send('{"tsid":10,"type":"options"}'); - //peer.ws.send('2'); + peer.paused = false; + // Indicated we are connected and send connection options and protocol if needed + obj.sendToAgent('c'); if (obj.viewerConnected == true) { if (obj.protocolOptions != null) { obj.sendToAgent(JSON.stringify(obj.protocolOptions)); } // Send connection options obj.sendToAgent('2'); // Send remote desktop connect @@ -123,23 +124,52 @@ function CreateDesktopDecoder() { } // Remove an agent or viewer + // Return true if this multiplexor is no longer needed. obj.removePeer = function (peer) { - if (peer == agent) { - console.log('removePeer-agent'); + if (peer == obj.agent) { + //console.log('removePeer-agent'); // Clean up the agent + obj.agent = null; // Agent has disconnected, disconnect everyone. + for (var i in obj.viewers) { obj.viewers[i].close(); } + dispose(); + return true; } else { - console.log('removePeer-viewer'); + //console.log('removePeer-viewer'); // Remove a viewer var i = obj.viewers.indexOf(peer); if (i == -1) return false; obj.viewers.splice(i, 1); - // Clean up the viewer - + // Aggressive clean up of the viewer + delete peer.desktopPaused; + delete peer.imageCompression; + delete peer.imageScaling; + delete peer.imageFrameRate; + delete peer.lastImageNumberSent; + delete peer.dataPtr; + delete peer.sending; + delete peer.sendQueue; + + // Resume flow control if this was the peer + if (peer.sending == true) { + obj.viewersSendingCount--; + peer.sending = false; + if ((obj.viewersSendingCount < obj.viewers.length) && obj.agent && (obj.agent.paused == true)) { obj.agent.paused = false; obj.agent.ws._socket.resume(); } + } + + // If this is the last viewer, disconnect the agent + if ((obj.viewers.length == 0) && (obj.agent != null)) { obj.agent.close(); dispose(); return true; } } - return true; + return false; + } + + // Clean up ourselves + function dispose() { + delete obj.viewers; + delete obj.imagesCounters; + delete obj.images; } // Send data to the agent or queue it up for sending @@ -148,21 +178,32 @@ function CreateDesktopDecoder() { //console.log('SendToAgent', data.length); if (obj.agent.sending) { obj.agent.sendQueue.push(data); - // TODO: Flow control, stop all viewers } else { obj.agent.ws.send(data, sendAgentNext); + + // Flow control, pause all viewers + for (var i in obj.viewers) { + var v = obj.viewers[i]; + if (v.paused == false) { v.paused = true; v.ws._socket.pause(); } + } } } // Send more data to the agent function sendAgentNext() { + if (obj.agent == null) return; if (obj.agent.sendQueue.length > 0) { // Send from the pending send queue obj.agent.ws.send(obj.agent.sendQueue.shift(), sendAgentNext); } else { // Nothing to send obj.agent.sending = false; - // TODO: Flow control, start all viewers + + // Flow control, resume all viewers + for (var i in obj.viewers) { + var v = obj.viewers[i]; + if (v.paused == true) { v.paused = false; v.ws._socket.resume(); } + } } } @@ -177,16 +218,19 @@ function CreateDesktopDecoder() { //console.log('SendToViewer', data.length); if (viewer.sending) { viewer.sendQueue.push(data); - // TODO: Flow control, stop the agent } else { viewer.sending = true; - obj.viewersSendingCount++; viewer.ws.send(data, function () { sendViewerNext(viewer); }); + + // Flow control, pause the agent if needed + obj.viewersSendingCount++; + if ((obj.viewersSendingCount >= obj.viewers.length) && obj.agent && (obj.agent.paused == false)) { obj.agent.paused = true; obj.agent.ws._socket.pause(); } } } // Send more data to the viewer function sendViewerNext(viewer) { + if (viewer.sendQueue == null) return; if (viewer.sendQueue.length > 0) { // Send from the pending send queue if (viewer.sending == false) { viewer.sending = true; obj.viewersSendingCount++; } @@ -199,13 +243,21 @@ function CreateDesktopDecoder() { viewer.lastImageNumberSent = viewer.dataPtr; if ((image.next != null) && ((viewer.dataPtr + 1) != image.next)) { console.log('SVIEW-S2', viewer.dataPtr, image.next); } // DEBUG viewer.dataPtr = image.next; - if (viewer.sending == false) { viewer.sending = true; obj.viewersSendingCount++; } viewer.ws.send(image.data, function () { sendViewerNext(viewer); }); + + // Flow control, pause the agent if needed + if (viewer.sending == false) { + viewer.sending = true; + obj.viewersSendingCount++; + if ((obj.viewersSendingCount >= obj.viewers.length) && obj.agent && (obj.agent.paused == false)) { obj.agent.paused = true; obj.agent.ws._socket.pause(); } + } } else { // Nothing to send viewer.sending = false; + + // Flow control, resume agent if needed obj.viewersSendingCount--; - // TODO: Flow control, start agent + if ((obj.viewersSendingCount < obj.viewers.length) && obj.agent && (obj.agent.paused == true)) { obj.agent.paused = false; obj.agent.ws._socket.resume(); } } } } @@ -470,7 +522,7 @@ module.exports.CreateMeshRelay = function (parent, ws, req, domain, user, cookie } // If there is no authentication, drop this connection - if ((obj.id != null) && (obj.id.startsWith('meshmessenger/') == false) && (obj.user == null) && (obj.ruserid == null)) { try { ws.close(); parent.parent.debug('relay', 'Relay: Connection with no authentication (' + cleanRemoteAddr(obj.req.ip) + ')'); } catch (e) { console.log(e); } return; } + if ((obj.id != null) && (obj.user == null) && (obj.ruserid == null)) { try { ws.close(); parent.parent.debug('relay', 'Relay: Connection with no authentication (' + cleanRemoteAddr(obj.req.ip) + ')'); } catch (e) { console.log(e); } return; } // Relay session count (we may remove this in the future) obj.relaySessionCounted = true; @@ -500,13 +552,25 @@ module.exports.CreateMeshRelay = function (parent, ws, req, domain, user, cookie // Disconnect this agent obj.close = function (arg) { + if (obj.ws == null) return; // Already closed. + + // Close the connection if ((arg == 1) || (arg == null)) { try { ws.close(); parent.parent.debug('relay', 'Relay: Soft disconnect (' + cleanRemoteAddr(obj.req.ip) + ')'); } catch (e) { console.log(e); } } // Soft close, close the websocket if (arg == 2) { try { ws._socket._parent.end(); parent.parent.debug('relay', 'Relay: Hard disconnect (' + cleanRemoteAddr(obj.req.ip) + ')'); } catch (e) { console.log(e); } } // Hard close, close the TCP socket + if (obj.relaySessionCounted) { parent.relaySessionCount--; delete obj.relaySessionCounted; } + if (obj.deskDecoder != null) { if (obj.deskDecoder.removePeer(obj) == true) { delete parent.desktoprelays[obj.id]; } } // Aggressive cleanup delete obj.id; delete obj.ws; - delete obj.peer; + delete obj.req; + delete obj.user; + delete obj.ruserid; + delete obj.deskDecoder; + + // Clear timers if present + if (obj.pingtimer != null) { clearInterval(obj.pingtimer); delete obj.pingtimer; } + if (obj.pongtimer != null) { clearInterval(obj.pongtimer); delete obj.pongtimer; } }; obj.sendAgentMessage = function (command, userid, domainid) { @@ -572,6 +636,31 @@ module.exports.CreateMeshRelay = function (parent, ws, req, domain, user, cookie if (obj.id == null) { try { obj.close(); } catch (e) { } return null; } // Attempt to connect without id, drop this. ws._socket.setKeepAlive(true, 240000); // Set TCP keep alive + // Validate that the id is valid, we only need to do this on non-authenticated sessions. + // TODO: Figure out when this needs to be done. + if ((user == null) && (!parent.args.notls)) { + // Check the identifier, if running without TLS, skip this. + var ids = obj.id.split(':'); + if (ids.length != 3) { ws.close(); delete obj.id; return null; } // Invalid ID, drop this. + if (parent.crypto.createHmac('SHA384', parent.relayRandom).update(ids[0] + ':' + ids[1]).digest('hex') != ids[2]) { ws.close(); delete obj.id; return null; } // Invalid HMAC, drop this. + if ((Date.now() - parseInt(ids[1])) > 120000) { ws.close(); delete obj.id; return null; } // Expired time, drop this. + obj.id = ids[0]; + } + + // Setup the agent PING/PONG timers + if ((typeof parent.parent.args.agentping == 'number') && (obj.pingtimer == null)) { obj.pingtimer = setInterval(sendPing, parent.parent.args.agentping * 1000); } + else if ((typeof parent.parent.args.agentpong == 'number') && (obj.pongtimer == null)) { obj.pongtimer = setInterval(sendPong, parent.parent.args.agentpong * 1000); } + + // Create if needed and add this peer to the desktop multiplexor + obj.deskDecoder = parent.desktoprelays[obj.id]; + if (obj.deskDecoder == null) { + obj.deskDecoder = CreateDesktopDecoder(); + parent.desktoprelays[obj.id] = obj.deskDecoder; + } + obj.deskDecoder.addPeer(obj); + ws._socket.resume(); // Release the traffic + + /* // If this is a MeshMessenger session, the ID is the two userid's and authentication must match one of them. if (obj.id.startsWith('meshmessenger/')) { if ((obj.id.startsWith('meshmessenger/user/') == true) && (user == null)) { try { obj.close(); } catch (e) { } return null; } // If user-to-user, both sides need to be authenticated. @@ -588,7 +677,6 @@ module.exports.CreateMeshRelay = function (parent, ws, req, domain, user, cookie // Validate that the id is valid, we only need to do this on non-authenticated sessions. // TODO: Figure out when this needs to be done. - /* if (!parent.args.notls) { // Check the identifier, if running without TLS, skip this. var ids = obj.id.split(':'); @@ -597,7 +685,6 @@ module.exports.CreateMeshRelay = function (parent, ws, req, domain, user, cookie if ((Date.now() - parseInt(ids[1])) > 120000) { ws.close(); delete obj.id; return null; } // Expired time, drop this. obj.id = ids[0]; } - */ // Check the peer connection status { @@ -746,48 +833,30 @@ module.exports.CreateMeshRelay = function (parent, ws, req, domain, user, cookie } } } + */ } - ws.flushSink = function () { try { ws._socket.resume(); } catch (ex) { console.log(ex); } }; - // When data is received from the mesh relay web socket ws.on('message', function (data) { - // If this data was received by the agent, decode it. + // If this data was received by the agent, decode it. if (this.me.deskDecoder != null) { this.me.deskDecoder.processData(this.me, data); } - - /* - //console.log(typeof data, data.length); - if (this.peer != null) { - //if (typeof data == 'string') { console.log('Relay: ' + data); } else { console.log('Relay:' + data.length + ' byte(s)'); } - try { - this._socket.pause(); - if (this.logfile != null) { - // Write data to log file then perform relay - var xthis = this; - recordingEntry(this.logfile.fd, 2, ((obj.req.query.browser) ? 2 : 0), data, function () { xthis.peer.send(data, ws.flushSink); }); - } else { - // Perform relay - this.peer.send(data, ws.flushSink); - } - } catch (ex) { console.log(ex); } - } - */ }); // If error, close both sides of the relay. ws.on('error', function (err) { + //console.log('ws-error', err); parent.relaySessionErrorCount++; - if (obj.relaySessionCounted) { parent.relaySessionCount--; delete obj.relaySessionCounted; } console.log('Relay error from ' + cleanRemoteAddr(obj.req.ip) + ', ' + err.toString().split('\r')[0] + '.'); - closeBothSides(); + obj.close(); }); // If the relay web socket is closed, close both sides. ws.on('close', function (req) { - if (obj.relaySessionCounted) { parent.relaySessionCount--; delete obj.relaySessionCounted; } - closeBothSides(); + //console.log('ws-close', req); + obj.close(); }); + /* // Close both our side and the peer side. function closeBothSides() { if (obj.id != null) { @@ -876,6 +945,7 @@ module.exports.CreateMeshRelay = function (parent, ws, req, domain, user, cookie } } catch (ex) { console.log(ex); func(fd, tag); } } + */ // Mark this relay session as authenticated if this is the user end. obj.authenticated = (user != null); diff --git a/webserver.js b/webserver.js index 612a3930..9e13db89 100644 --- a/webserver.js +++ b/webserver.js @@ -172,6 +172,7 @@ module.exports.CreateWebServer = function (parent, db, args, certificates) { obj.wsPeerSessions3 = {}; // ServerId --> UserId --> [ SessionId ] obj.sessionsCount = {}; // Merged session counters, used when doing server peering. UserId --> SessionCount obj.wsrelays = {}; // Id -> Relay + obj.desktoprelays = {}; // Id -> Desktop Multiplexor Relay obj.wsPeerRelays = {}; // Id -> { ServerId, Time } var tlsSessionStore = {}; // Store TLS session information for quick resume. var tlsSessionStoreCount = 0; // Number of cached TLS session information in store.