Boosted speed of desktop multiplexor.

This commit is contained in:
Ylian Saint-Hilaire 2020-05-01 22:08:49 -07:00
parent ad8bb46718
commit 67f62cd00f
1 changed files with 53 additions and 28 deletions

View File

@ -63,7 +63,7 @@ function CreateDesktopMultiplexor(parent, domain, nodeid, func) {
obj.parent = parent; obj.parent = parent;
obj.agent = null; // Reference to the connection object that is the agent. obj.agent = null; // Reference to the connection object that is the agent.
obj.viewers = []; // Array of references to all viewers. obj.viewers = []; // Array of references to all viewers.
obj.viewersSendingCount = 0; // Number of viewers currently activaly sending something. obj.viewersOverflowCount = 0; // Number of viewers currently in overflow state.
obj.width = 0; // Current width of the display in pixels. obj.width = 0; // Current width of the display in pixels.
obj.height = 0; // Current height of the display in pixels. obj.height = 0; // Current height of the display in pixels.
obj.swidth = 0; // Current width of the display in tiles. obj.swidth = 0; // Current width of the display in tiles.
@ -104,6 +104,7 @@ function CreateDesktopMultiplexor(parent, domain, nodeid, func) {
peer.lastImageNumberSent = null; peer.lastImageNumberSent = null;
peer.dataPtr = obj.firstData; peer.dataPtr = obj.firstData;
peer.sending = false; peer.sending = false;
peer.overflow = false;
peer.sendQueue = []; peer.sendQueue = [];
peer.paused = false; peer.paused = false;
@ -133,6 +134,7 @@ function CreateDesktopMultiplexor(parent, domain, nodeid, func) {
// Setup the agent // Setup the agent
obj.agent = peer; obj.agent = peer;
peer.sending = false; peer.sending = false;
peer.overflow = false;
peer.sendQueue = []; peer.sendQueue = [];
peer.paused = false; peer.paused = false;
@ -172,6 +174,13 @@ function CreateDesktopMultiplexor(parent, domain, nodeid, func) {
if (i == -1) return false; if (i == -1) return false;
obj.viewers.splice(i, 1); obj.viewers.splice(i, 1);
// Resume flow control if this was the peer that was limiting traffic (because it was the fastest one).
if (peer.overflow == true) {
obj.viewersOverflowCount--;
peer.overflow = false;
if ((obj.viewersOverflowCount < obj.viewers.length) && (obj.recordingFileWriting == false) && obj.agent && (obj.agent.paused == true)) { obj.agent.paused = false; obj.agent.ws._socket.resume(); }
}
// Aggressive clean up of the viewer // Aggressive clean up of the viewer
delete peer.desktopPaused; delete peer.desktopPaused;
delete peer.imageCompression; delete peer.imageCompression;
@ -180,15 +189,9 @@ function CreateDesktopMultiplexor(parent, domain, nodeid, func) {
delete peer.lastImageNumberSent; delete peer.lastImageNumberSent;
delete peer.dataPtr; delete peer.dataPtr;
delete peer.sending; delete peer.sending;
delete peer.overflow;
delete peer.sendQueue; delete peer.sendQueue;
// Resume flow control if this was the peer that was limiting traffic (because it was the fastest one).
if (peer.sending == true) {
obj.viewersSendingCount--;
peer.sending = false;
if ((obj.viewersSendingCount < obj.viewers.length) && (obj.recordingFileWriting == false) && obj.agent && (obj.agent.paused == true)) { obj.agent.paused = false; obj.agent.ws._socket.resume(); }
}
// Log leaving the multiplex session // Log leaving the multiplex session
if (obj.startTime != null) { if (obj.startTime != null) {
var event = { etype: 'relay', action: 'relaylog', domain: domain.id, nodeid: obj.nodeid, userid: peer.user._id, username: peer.user.name, msg: "Left the desktop multiplex session", protocol: 2 }; var event = { etype: 'relay', action: 'relaylog', domain: domain.id, nodeid: obj.nodeid, userid: peer.user._id, username: peer.user.name, msg: "Left the desktop multiplex session", protocol: 2 };
@ -237,14 +240,17 @@ function CreateDesktopMultiplexor(parent, domain, nodeid, func) {
//console.log('SendToAgent', data.length); //console.log('SendToAgent', data.length);
if (obj.agent.sending) { if (obj.agent.sending) {
obj.agent.sendQueue.push(data); obj.agent.sendQueue.push(data);
// Flow control, pause all viewers is the queue is backing up
if (obj.agent.sendQueue > 10) {
obj.agent.overflow = true;
for (var i in obj.viewers) {
var v = obj.viewers[i];
if (v.paused == false) { v.paused = true; v.ws._socket.pause(); }
}
}
} else { } else {
obj.agent.ws.send(data, sendAgentNext); obj.agent.ws.send(data, sendAgentNext);
// Flow control, pause all viewers
for (var i in obj.viewers) {
var v = obj.viewers[i];
if (v.paused == false) { v.paused = true; v.ws._socket.pause(); }
}
} }
} }
@ -259,9 +265,12 @@ function CreateDesktopMultiplexor(parent, domain, nodeid, func) {
obj.agent.sending = false; obj.agent.sending = false;
// Flow control, resume all viewers // Flow control, resume all viewers
for (var i in obj.viewers) { if (obj.agent.overflow == true) {
var v = obj.viewers[i]; obj.agent.overflow = false;
if (v.paused == true) { v.paused = false; v.ws._socket.resume(); } for (var i in obj.viewers) {
var v = obj.viewers[i];
if (v.paused == true) { v.paused = false; v.ws._socket.resume(); }
}
} }
} }
} }
@ -286,8 +295,27 @@ function CreateDesktopMultiplexor(parent, domain, nodeid, func) {
} }
// Flow control, pause the agent if needed // Flow control, pause the agent if needed
obj.viewersSendingCount++; checkViewerOverflow(viewer);
if ((obj.viewersSendingCount >= obj.viewers.length) && obj.agent && (obj.agent.paused == false)) { obj.agent.paused = true; obj.agent.ws._socket.pause(); } }
}
// Check if a viewer is in overflow situation
function checkViewerOverflow(viewer) {
if (viewer.overflow == true) return;
if ((viewer.sendQueue.length > 5) || ((viewer.dataPtr != null) && (viewer.dataPtr != obj.lastData))) {
viewer.overflow = true;
obj.viewersOverflowCount++;
if ((obj.viewersOverflowCount >= obj.viewers.length) && obj.agent && (obj.agent.paused == false)) { obj.agent.paused = true; obj.agent.ws._socket.pause(); }
}
}
// Check if a viewer is in underflow situation
function checkViewerUnderflow(viewer) {
if (viewer.overflow == false) return;
if ((viewer.sendQueue.length <= 5) && ((viewer.dataPtr == null) || (viewer.dataPtr == obj.lastData))) {
viewer.overflow = false;
obj.viewersOverflowCount--;
if ((obj.viewersOverflowCount < obj.viewers.length) && (obj.recordingFileWriting == false) && obj.agent && (obj.agent.paused == true)) { obj.agent.paused = false; obj.agent.ws._socket.resume(); }
} }
} }
@ -296,12 +324,13 @@ function CreateDesktopMultiplexor(parent, domain, nodeid, func) {
if (viewer.sendQueue == null) return; if (viewer.sendQueue == null) return;
if (viewer.sendQueue.length > 0) { if (viewer.sendQueue.length > 0) {
// Send from the pending send queue // Send from the pending send queue
if (viewer.sending == false) { viewer.sending = true; obj.viewersSendingCount++; } if (viewer.sending == false) { viewer.sending = true; }
if (viewer.slowRelay) { if (viewer.slowRelay) {
setTimeout(function () { try { viewer.ws.send(viewer.sendQueue.shift(), function () { sendViewerNext(viewer); }); } catch (ex) { } }, viewer.slowRelay); setTimeout(function () { try { viewer.ws.send(viewer.sendQueue.shift(), function () { sendViewerNext(viewer); }); } catch (ex) { } }, viewer.slowRelay);
} else { } else {
try { viewer.ws.send(viewer.sendQueue.shift(), function () { sendViewerNext(viewer); }); } catch (ex) { } try { viewer.ws.send(viewer.sendQueue.shift(), function () { sendViewerNext(viewer); }); } catch (ex) { }
} }
checkViewerOverflow(viewer);
} else { } else {
if (viewer.dataPtr != null) { if (viewer.dataPtr != null) {
// Send the next image // Send the next image
@ -317,18 +346,14 @@ function CreateDesktopMultiplexor(parent, domain, nodeid, func) {
} }
// Flow control, pause the agent if needed // Flow control, pause the agent if needed
if (viewer.sending == false) { if (viewer.sending == false) { viewer.sending = true; }
viewer.sending = true; checkViewerOverflow(viewer);
obj.viewersSendingCount++;
if (((obj.viewersSendingCount >= obj.viewers.length) || (obj.recordingFileWriting == true)) && obj.agent && (obj.agent.paused == false)) { obj.agent.paused = true; obj.agent.ws._socket.pause(); }
}
} else { } else {
// Nothing to send // Nothing to send
viewer.sending = false; viewer.sending = false;
// Flow control, resume agent if needed // Flow control, resume agent if needed
obj.viewersSendingCount--; checkViewerUnderflow(viewer);
if ((obj.viewersSendingCount < obj.viewers.length) && (obj.recordingFileWriting == false) && obj.agent && (obj.agent.paused == true)) { obj.agent.paused = false; obj.agent.ws._socket.resume(); }
} }
} }
} }
@ -339,7 +364,7 @@ function CreateDesktopMultiplexor(parent, domain, nodeid, func) {
obj.recordingFileWriting = true; obj.recordingFileWriting = true;
recordData(true, data, function () { recordData(true, data, function () {
obj.recordingFileWriting = false; obj.recordingFileWriting = false;
if ((obj.viewersSendingCount < obj.viewers.length) && obj.agent && (obj.agent.paused == true)) { obj.agent.paused = false; obj.agent.ws._socket.resume(); } if ((obj.viewersOverflowCount < obj.viewers.length) && obj.agent && (obj.agent.paused == true)) { obj.agent.paused = false; obj.agent.ws._socket.resume(); }
obj.processAgentData(data); obj.processAgentData(data);
}); });
} else { } else {