/* * Websock: high-performance buffering wrapper * Copyright (C) 2019 The noVNC Authors * Licensed under MPL 2.0 (see LICENSE.txt) * * Websock is similar to the standard WebSocket / RTCDataChannel object * but with extra buffer handling. * * Websock has built-in receive queue buffering; the message event * does not contain actual data but is simply a notification that * there is new data available. Several rQ* methods are available to * read binary data off of the receive queue. */ import * as Log from './util/logging.js'; // this has performance issues in some versions Chromium, and // doesn't gain a tremendous amount of performance increase in Firefox // at the moment. It may be valuable to turn it on in the future. const MAX_RQ_GROW_SIZE = 40 * 1024 * 1024; // 40 MiB // Constants pulled from RTCDataChannelState enum // https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/readyState#RTCDataChannelState_enum const DataChannel = { CONNECTING: "connecting", OPEN: "open", CLOSING: "closing", CLOSED: "closed" }; const ReadyStates = { CONNECTING: [WebSocket.CONNECTING, DataChannel.CONNECTING], OPEN: [WebSocket.OPEN, DataChannel.OPEN], CLOSING: [WebSocket.CLOSING, DataChannel.CLOSING], CLOSED: [WebSocket.CLOSED, DataChannel.CLOSED], }; // Properties a raw channel must have, WebSocket and RTCDataChannel are two examples const rawChannelProps = [ "send", "close", "binaryType", "onerror", "onmessage", "onopen", "protocol", "readyState", ]; export default class Websock { constructor() { this._websocket = null; // WebSocket or RTCDataChannel object this._rQi = 0; // Receive queue index this._rQlen = 0; // Next write position in the receive queue this._rQbufferSize = 1024 * 1024 * 4; // Receive queue buffer size (4 MiB) // called in init: this._rQ = new Uint8Array(this._rQbufferSize); this._rQ = null; // Receive queue this._sQbufferSize = 1024 * 10; // 10 KiB // called in init: this._sQ = new Uint8Array(this._sQbufferSize); this._sQlen = 0; this._sQ = null; // Send queue this._eventHandlers = { message: () => {}, open: () => {}, close: () => {}, error: () => {} }; } // Getters and Setters get readyState() { let subState; if (this._websocket === null) { return "unused"; } subState = this._websocket.readyState; if (ReadyStates.CONNECTING.includes(subState)) { return "connecting"; } else if (ReadyStates.OPEN.includes(subState)) { return "open"; } else if (ReadyStates.CLOSING.includes(subState)) { return "closing"; } else if (ReadyStates.CLOSED.includes(subState)) { return "closed"; } return "unknown"; } // Receive Queue rQpeek8() { return this._rQ[this._rQi]; } rQskipBytes(bytes) { this._rQi += bytes; } rQshift8() { return this._rQshift(1); } rQshift16() { return this._rQshift(2); } rQshift32() { return this._rQshift(4); } // TODO(directxman12): test performance with these vs a DataView _rQshift(bytes) { let res = 0; for (let byte = bytes - 1; byte >= 0; byte--) { res += this._rQ[this._rQi++] << (byte * 8); } return res >>> 0; } rQshiftStr(len) { let str = ""; // Handle large arrays in steps to avoid long strings on the stack for (let i = 0; i < len; i += 4096) { let part = this.rQshiftBytes(Math.min(4096, len - i), false); str += String.fromCharCode.apply(null, part); } return str; } rQshiftBytes(len, copy=true) { this._rQi += len; if (copy) { return this._rQ.slice(this._rQi - len, this._rQi); } else { return this._rQ.subarray(this._rQi - len, this._rQi); } } rQshiftTo(target, len) { // TODO: make this just use set with views when using a ArrayBuffer to store the rQ target.set(new Uint8Array(this._rQ.buffer, this._rQi, len)); this._rQi += len; } rQpeekBytes(len, copy=true) { if (copy) { return this._rQ.slice(this._rQi, this._rQi + len); } else { return this._rQ.subarray(this._rQi, this._rQi + len); } } // Check to see if we must wait for 'num' bytes (default to FBU.bytes) // to be available in the receive queue. Return true if we need to // wait (and possibly print a debug message), otherwise false. rQwait(msg, num, goback) { if (this._rQlen - this._rQi < num) { if (goback) { if (this._rQi < goback) { throw new Error("rQwait cannot backup " + goback + " bytes"); } this._rQi -= goback; } return true; // true means need more data } return false; } // Send Queue sQpush8(num) { this._sQensureSpace(1); this._sQ[this._sQlen++] = num; } sQpush16(num) { this._sQensureSpace(2); this._sQ[this._sQlen++] = (num >> 8) & 0xff; this._sQ[this._sQlen++] = (num >> 0) & 0xff; } sQpush32(num) { this._sQensureSpace(4); this._sQ[this._sQlen++] = (num >> 24) & 0xff; this._sQ[this._sQlen++] = (num >> 16) & 0xff; this._sQ[this._sQlen++] = (num >> 8) & 0xff; this._sQ[this._sQlen++] = (num >> 0) & 0xff; } sQpushString(str) { let bytes = str.split('').map(chr => chr.charCodeAt(0)); this.sQpushBytes(new Uint8Array(bytes)); } sQpushBytes(bytes) { for (let offset = 0;offset < bytes.length;) { this._sQensureSpace(1); let chunkSize = this._sQbufferSize - this._sQlen; if (chunkSize > bytes.length - offset) { chunkSize = bytes.length - offset; } this._sQ.set(bytes.subarray(offset, offset + chunkSize), this._sQlen); this._sQlen += chunkSize; offset += chunkSize; } } flush() { if (this._sQlen > 0 && this.readyState === 'open') { this._websocket.send(new Uint8Array(this._sQ.buffer, 0, this._sQlen)); this._sQlen = 0; } } _sQensureSpace(bytes) { if (this._sQbufferSize - this._sQlen < bytes) { this.flush(); } } // Event Handlers off(evt) { this._eventHandlers[evt] = () => {}; } on(evt, handler) { this._eventHandlers[evt] = handler; } _allocateBuffers() { this._rQ = new Uint8Array(this._rQbufferSize); this._sQ = new Uint8Array(this._sQbufferSize); } init() { this._allocateBuffers(); this._rQi = 0; this._websocket = null; } open(uri, protocols) { this.attach(new WebSocket(uri, protocols)); } attach(rawChannel) { this.init(); // Must get object and class methods to be compatible with the tests. const channelProps = [...Object.keys(rawChannel), ...Object.getOwnPropertyNames(Object.getPrototypeOf(rawChannel))]; for (let i = 0; i < rawChannelProps.length; i++) { const prop = rawChannelProps[i]; if (channelProps.indexOf(prop) < 0) { throw new Error('Raw channel missing property: ' + prop); } } this._websocket = rawChannel; this._websocket.binaryType = "arraybuffer"; this._websocket.onmessage = this._recvMessage.bind(this); this._websocket.onopen = () => { Log.Debug('>> WebSock.onopen'); if (this._websocket.protocol) { Log.Info("Server choose sub-protocol: " + this._websocket.protocol); } this._eventHandlers.open(); Log.Debug("<< WebSock.onopen"); }; this._websocket.onclose = (e) => { Log.Debug(">> WebSock.onclose"); this._eventHandlers.close(e); Log.Debug("<< WebSock.onclose"); }; this._websocket.onerror = (e) => { Log.Debug(">> WebSock.onerror: " + e); this._eventHandlers.error(e); Log.Debug("<< WebSock.onerror: " + e); }; } close() { if (this._websocket) { if (this.readyState === 'connecting' || this.readyState === 'open') { Log.Info("Closing WebSocket connection"); this._websocket.close(); } this._websocket.onmessage = () => {}; } } // private methods // We want to move all the unread data to the start of the queue, // e.g. compacting. // The function also expands the receive que if needed, and for // performance reasons we combine these two actions to avoid // unnecessary copying. _expandCompactRQ(minFit) { // if we're using less than 1/8th of the buffer even with the incoming bytes, compact in place // instead of resizing const requiredBufferSize = (this._rQlen - this._rQi + minFit) * 8; const resizeNeeded = this._rQbufferSize < requiredBufferSize; if (resizeNeeded) { // Make sure we always *at least* double the buffer size, and have at least space for 8x // the current amount of data this._rQbufferSize = Math.max(this._rQbufferSize * 2, requiredBufferSize); } // we don't want to grow unboundedly if (this._rQbufferSize > MAX_RQ_GROW_SIZE) { this._rQbufferSize = MAX_RQ_GROW_SIZE; if (this._rQbufferSize - (this._rQlen - this._rQi) < minFit) { throw new Error("Receive Queue buffer exceeded " + MAX_RQ_GROW_SIZE + " bytes, and the new message could not fit"); } } if (resizeNeeded) { const oldRQbuffer = this._rQ.buffer; this._rQ = new Uint8Array(this._rQbufferSize); this._rQ.set(new Uint8Array(oldRQbuffer, this._rQi, this._rQlen - this._rQi)); } else { this._rQ.copyWithin(0, this._rQi, this._rQlen); } this._rQlen = this._rQlen - this._rQi; this._rQi = 0; } // push arraybuffer values onto the end of the receive que _recvMessage(e) { if (this._rQlen == this._rQi) { // All data has now been processed, this means we // can reset the receive queue. this._rQlen = 0; this._rQi = 0; } const u8 = new Uint8Array(e.data); if (u8.length > this._rQbufferSize - this._rQlen) { this._expandCompactRQ(u8.length); } this._rQ.set(u8, this._rQlen); this._rQlen += u8.length; if (this._rQlen - this._rQi > 0) { this._eventHandlers.message(); } else { Log.Debug("Ignoring empty message"); } } }