More work on desktop multiplexor.

This commit is contained in:
Ylian Saint-Hilaire 2020-04-27 16:25:54 -07:00
parent a1dffcaf94
commit 1ea47cc7f9
2 changed files with 123 additions and 52 deletions

View File

@ -86,13 +86,11 @@ function CreateDesktopDecoder() {
// Add an agent or viewer
obj.addPeer = function (peer) {
if (peer.req.query.browser) {
console.log('addPeer-viewer');
// This is a viewer
if (obj.viewers.indexOf(peer) >= 0) return true;
obj.viewers.push(peer);
//console.log('addPeer-viewer');
// Setup the viewer
if (obj.viewers.indexOf(peer) >= 0) return true;
obj.viewers.push(peer);
peer.desktopPaused = true;
peer.imageCompression = 30;
peer.imageScaling = 1024;
@ -101,19 +99,22 @@ function CreateDesktopDecoder() {
peer.dataPtr = obj.firstData;
peer.sending = false;
peer.sendQueue = [];
peer.paused = false;
// Indicated we are connected
obj.sendToViewer(peer, 'c');
} else {
console.log('addPeer-agent');
//console.log('addPeer-agent');
if (obj.agent != null) return false;
// This is the agent
obj.agent = peer;
// Setup the agent
obj.agent = peer;
peer.sending = false;
peer.sendQueue = [];
//peer.ws.send('{"tsid":10,"type":"options"}');
//peer.ws.send('2');
peer.paused = false;
// Indicated we are connected and send connection options and protocol if needed
obj.sendToAgent('c');
if (obj.viewerConnected == true) {
if (obj.protocolOptions != null) { obj.sendToAgent(JSON.stringify(obj.protocolOptions)); } // Send connection options
obj.sendToAgent('2'); // Send remote desktop connect
@ -123,23 +124,52 @@ function CreateDesktopDecoder() {
}
// Remove an agent or viewer
// Return true if this multiplexor is no longer needed.
obj.removePeer = function (peer) {
if (peer == agent) {
console.log('removePeer-agent');
if (peer == obj.agent) {
//console.log('removePeer-agent');
// Clean up the agent
obj.agent = null;
// Agent has disconnected, disconnect everyone.
for (var i in obj.viewers) { obj.viewers[i].close(); }
dispose();
return true;
} else {
console.log('removePeer-viewer');
//console.log('removePeer-viewer');
// Remove a viewer
var i = obj.viewers.indexOf(peer);
if (i == -1) return false;
obj.viewers.splice(i, 1);
// Clean up the viewer
// Aggressive clean up of the viewer
delete peer.desktopPaused;
delete peer.imageCompression;
delete peer.imageScaling;
delete peer.imageFrameRate;
delete peer.lastImageNumberSent;
delete peer.dataPtr;
delete peer.sending;
delete peer.sendQueue;
// Resume flow control if this was the peer
if (peer.sending == true) {
obj.viewersSendingCount--;
peer.sending = false;
if ((obj.viewersSendingCount < obj.viewers.length) && obj.agent && (obj.agent.paused == true)) { obj.agent.paused = false; obj.agent.ws._socket.resume(); }
}
return true;
// If this is the last viewer, disconnect the agent
if ((obj.viewers.length == 0) && (obj.agent != null)) { obj.agent.close(); dispose(); return true; }
}
return false;
}
// Clean up ourselves
function dispose() {
delete obj.viewers;
delete obj.imagesCounters;
delete obj.images;
}
// Send data to the agent or queue it up for sending
@ -148,21 +178,32 @@ function CreateDesktopDecoder() {
//console.log('SendToAgent', data.length);
if (obj.agent.sending) {
obj.agent.sendQueue.push(data);
// TODO: Flow control, stop all viewers
} else {
obj.agent.ws.send(data, sendAgentNext);
// Flow control, pause all viewers
for (var i in obj.viewers) {
var v = obj.viewers[i];
if (v.paused == false) { v.paused = true; v.ws._socket.pause(); }
}
}
}
// Send more data to the agent
function sendAgentNext() {
if (obj.agent == null) return;
if (obj.agent.sendQueue.length > 0) {
// Send from the pending send queue
obj.agent.ws.send(obj.agent.sendQueue.shift(), sendAgentNext);
} else {
// Nothing to send
obj.agent.sending = false;
// TODO: Flow control, start all viewers
// Flow control, resume all viewers
for (var i in obj.viewers) {
var v = obj.viewers[i];
if (v.paused == true) { v.paused = false; v.ws._socket.resume(); }
}
}
}
@ -177,16 +218,19 @@ function CreateDesktopDecoder() {
//console.log('SendToViewer', data.length);
if (viewer.sending) {
viewer.sendQueue.push(data);
// TODO: Flow control, stop the agent
} else {
viewer.sending = true;
obj.viewersSendingCount++;
viewer.ws.send(data, function () { sendViewerNext(viewer); });
// Flow control, pause the agent if needed
obj.viewersSendingCount++;
if ((obj.viewersSendingCount >= obj.viewers.length) && obj.agent && (obj.agent.paused == false)) { obj.agent.paused = true; obj.agent.ws._socket.pause(); }
}
}
// Send more data to the viewer
function sendViewerNext(viewer) {
if (viewer.sendQueue == null) return;
if (viewer.sendQueue.length > 0) {
// Send from the pending send queue
if (viewer.sending == false) { viewer.sending = true; obj.viewersSendingCount++; }
@ -199,13 +243,21 @@ function CreateDesktopDecoder() {
viewer.lastImageNumberSent = viewer.dataPtr;
if ((image.next != null) && ((viewer.dataPtr + 1) != image.next)) { console.log('SVIEW-S2', viewer.dataPtr, image.next); } // DEBUG
viewer.dataPtr = image.next;
if (viewer.sending == false) { viewer.sending = true; obj.viewersSendingCount++; }
viewer.ws.send(image.data, function () { sendViewerNext(viewer); });
// Flow control, pause the agent if needed
if (viewer.sending == false) {
viewer.sending = true;
obj.viewersSendingCount++;
if ((obj.viewersSendingCount >= obj.viewers.length) && obj.agent && (obj.agent.paused == false)) { obj.agent.paused = true; obj.agent.ws._socket.pause(); }
}
} else {
// Nothing to send
viewer.sending = false;
// Flow control, resume agent if needed
obj.viewersSendingCount--;
// TODO: Flow control, start agent
if ((obj.viewersSendingCount < obj.viewers.length) && obj.agent && (obj.agent.paused == true)) { obj.agent.paused = false; obj.agent.ws._socket.resume(); }
}
}
}
@ -470,7 +522,7 @@ module.exports.CreateMeshRelay = function (parent, ws, req, domain, user, cookie
}
// If there is no authentication, drop this connection
if ((obj.id != null) && (obj.id.startsWith('meshmessenger/') == false) && (obj.user == null) && (obj.ruserid == null)) { try { ws.close(); parent.parent.debug('relay', 'Relay: Connection with no authentication (' + cleanRemoteAddr(obj.req.ip) + ')'); } catch (e) { console.log(e); } return; }
if ((obj.id != null) && (obj.user == null) && (obj.ruserid == null)) { try { ws.close(); parent.parent.debug('relay', 'Relay: Connection with no authentication (' + cleanRemoteAddr(obj.req.ip) + ')'); } catch (e) { console.log(e); } return; }
// Relay session count (we may remove this in the future)
obj.relaySessionCounted = true;
@ -500,13 +552,25 @@ module.exports.CreateMeshRelay = function (parent, ws, req, domain, user, cookie
// Disconnect this agent
obj.close = function (arg) {
if (obj.ws == null) return; // Already closed.
// Close the connection
if ((arg == 1) || (arg == null)) { try { ws.close(); parent.parent.debug('relay', 'Relay: Soft disconnect (' + cleanRemoteAddr(obj.req.ip) + ')'); } catch (e) { console.log(e); } } // Soft close, close the websocket
if (arg == 2) { try { ws._socket._parent.end(); parent.parent.debug('relay', 'Relay: Hard disconnect (' + cleanRemoteAddr(obj.req.ip) + ')'); } catch (e) { console.log(e); } } // Hard close, close the TCP socket
if (obj.relaySessionCounted) { parent.relaySessionCount--; delete obj.relaySessionCounted; }
if (obj.deskDecoder != null) { if (obj.deskDecoder.removePeer(obj) == true) { delete parent.desktoprelays[obj.id]; } }
// Aggressive cleanup
delete obj.id;
delete obj.ws;
delete obj.peer;
delete obj.req;
delete obj.user;
delete obj.ruserid;
delete obj.deskDecoder;
// Clear timers if present
if (obj.pingtimer != null) { clearInterval(obj.pingtimer); delete obj.pingtimer; }
if (obj.pongtimer != null) { clearInterval(obj.pongtimer); delete obj.pongtimer; }
};
obj.sendAgentMessage = function (command, userid, domainid) {
@ -572,6 +636,31 @@ module.exports.CreateMeshRelay = function (parent, ws, req, domain, user, cookie
if (obj.id == null) { try { obj.close(); } catch (e) { } return null; } // Attempt to connect without id, drop this.
ws._socket.setKeepAlive(true, 240000); // Set TCP keep alive
// Validate that the id is valid, we only need to do this on non-authenticated sessions.
// TODO: Figure out when this needs to be done.
if ((user == null) && (!parent.args.notls)) {
// Check the identifier, if running without TLS, skip this.
var ids = obj.id.split(':');
if (ids.length != 3) { ws.close(); delete obj.id; return null; } // Invalid ID, drop this.
if (parent.crypto.createHmac('SHA384', parent.relayRandom).update(ids[0] + ':' + ids[1]).digest('hex') != ids[2]) { ws.close(); delete obj.id; return null; } // Invalid HMAC, drop this.
if ((Date.now() - parseInt(ids[1])) > 120000) { ws.close(); delete obj.id; return null; } // Expired time, drop this.
obj.id = ids[0];
}
// Setup the agent PING/PONG timers
if ((typeof parent.parent.args.agentping == 'number') && (obj.pingtimer == null)) { obj.pingtimer = setInterval(sendPing, parent.parent.args.agentping * 1000); }
else if ((typeof parent.parent.args.agentpong == 'number') && (obj.pongtimer == null)) { obj.pongtimer = setInterval(sendPong, parent.parent.args.agentpong * 1000); }
// Create if needed and add this peer to the desktop multiplexor
obj.deskDecoder = parent.desktoprelays[obj.id];
if (obj.deskDecoder == null) {
obj.deskDecoder = CreateDesktopDecoder();
parent.desktoprelays[obj.id] = obj.deskDecoder;
}
obj.deskDecoder.addPeer(obj);
ws._socket.resume(); // Release the traffic
/*
// If this is a MeshMessenger session, the ID is the two userid's and authentication must match one of them.
if (obj.id.startsWith('meshmessenger/')) {
if ((obj.id.startsWith('meshmessenger/user/') == true) && (user == null)) { try { obj.close(); } catch (e) { } return null; } // If user-to-user, both sides need to be authenticated.
@ -588,7 +677,6 @@ module.exports.CreateMeshRelay = function (parent, ws, req, domain, user, cookie
// Validate that the id is valid, we only need to do this on non-authenticated sessions.
// TODO: Figure out when this needs to be done.
/*
if (!parent.args.notls) {
// Check the identifier, if running without TLS, skip this.
var ids = obj.id.split(':');
@ -597,7 +685,6 @@ module.exports.CreateMeshRelay = function (parent, ws, req, domain, user, cookie
if ((Date.now() - parseInt(ids[1])) > 120000) { ws.close(); delete obj.id; return null; } // Expired time, drop this.
obj.id = ids[0];
}
*/
// Check the peer connection status
{
@ -746,48 +833,30 @@ module.exports.CreateMeshRelay = function (parent, ws, req, domain, user, cookie
}
}
}
*/
}
ws.flushSink = function () { try { ws._socket.resume(); } catch (ex) { console.log(ex); } };
// When data is received from the mesh relay web socket
ws.on('message', function (data) {
// If this data was received by the agent, decode it.
if (this.me.deskDecoder != null) { this.me.deskDecoder.processData(this.me, data); }
/*
//console.log(typeof data, data.length);
if (this.peer != null) {
//if (typeof data == 'string') { console.log('Relay: ' + data); } else { console.log('Relay:' + data.length + ' byte(s)'); }
try {
this._socket.pause();
if (this.logfile != null) {
// Write data to log file then perform relay
var xthis = this;
recordingEntry(this.logfile.fd, 2, ((obj.req.query.browser) ? 2 : 0), data, function () { xthis.peer.send(data, ws.flushSink); });
} else {
// Perform relay
this.peer.send(data, ws.flushSink);
}
} catch (ex) { console.log(ex); }
}
*/
});
// If error, close both sides of the relay.
ws.on('error', function (err) {
//console.log('ws-error', err);
parent.relaySessionErrorCount++;
if (obj.relaySessionCounted) { parent.relaySessionCount--; delete obj.relaySessionCounted; }
console.log('Relay error from ' + cleanRemoteAddr(obj.req.ip) + ', ' + err.toString().split('\r')[0] + '.');
closeBothSides();
obj.close();
});
// If the relay web socket is closed, close both sides.
ws.on('close', function (req) {
if (obj.relaySessionCounted) { parent.relaySessionCount--; delete obj.relaySessionCounted; }
closeBothSides();
//console.log('ws-close', req);
obj.close();
});
/*
// Close both our side and the peer side.
function closeBothSides() {
if (obj.id != null) {
@ -876,6 +945,7 @@ module.exports.CreateMeshRelay = function (parent, ws, req, domain, user, cookie
}
} catch (ex) { console.log(ex); func(fd, tag); }
}
*/
// Mark this relay session as authenticated if this is the user end.
obj.authenticated = (user != null);

View File

@ -172,6 +172,7 @@ module.exports.CreateWebServer = function (parent, db, args, certificates) {
obj.wsPeerSessions3 = {}; // ServerId --> UserId --> [ SessionId ]
obj.sessionsCount = {}; // Merged session counters, used when doing server peering. UserId --> SessionCount
obj.wsrelays = {}; // Id -> Relay
obj.desktoprelays = {}; // Id -> Desktop Multiplexor Relay
obj.wsPeerRelays = {}; // Id -> { ServerId, Time }
var tlsSessionStore = {}; // Store TLS session information for quick resume.
var tlsSessionStoreCount = 0; // Number of cached TLS session information in store.