send keepalives on live.m4s

Chrome appears to time out at 60 seconds of inactivity otherwise.
I think it's better to keep the stream open, even if the camera is
broken.

The implementation looks awkward, but that might be the state of Rust
async right now.
This commit is contained in:
Scott Lamb
2021-03-25 23:01:38 -07:00
parent f9c46dca89
commit 2954a56fce
4 changed files with 58 additions and 27 deletions

13
server/Cargo.lock generated
View File

@@ -1256,6 +1256,7 @@ dependencies = [
"tempdir",
"time",
"tokio",
"tokio-stream",
"tokio-tungstenite",
"url",
"uuid",
@@ -2354,9 +2355,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.1.0"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8efab2086f17abcddb8f756117665c958feee6b2e39974c2f1600592ab3a4195"
checksum = "134af885d758d645f0f0505c9a8b3f9bf8a348fd822e112ab5248138348f1722"
dependencies = [
"autocfg",
"bytes",
@@ -2374,9 +2375,9 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "1.0.0"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42517d2975ca3114b22a16192634e8241dc5cc1f130be194645970cc1c371494"
checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.8",
@@ -2385,9 +2386,9 @@ dependencies = [
[[package]]
name = "tokio-stream"
version = "0.1.2"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76066865172052eb8796c686f0b441a93df8b08d40a950b062ffb9a426f00edd"
checksum = "e177a5d8c3bf36de9ebe6d58537d8879e964332f93fb3339e43f618c81361af0"
dependencies = [
"futures-core",
"pin-project-lite",

View File

@@ -57,7 +57,8 @@ smallvec = "1.0"
structopt = { version = "0.3.13", features = ["default", "wrap_help"] }
sync_wrapper = "0.1.0"
time = "0.1"
tokio = { version = "1.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal"] }
tokio = { version = "1.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] }
tokio-stream = "0.1.5"
tokio-tungstenite = "0.13.0"
url = "2.1.1"
uuid = { version = "0.8", features = ["serde", "std", "v4"] }

View File

@@ -14,8 +14,8 @@ use db::dir::SampleFileDir;
use db::{auth, recording};
use failure::{bail, format_err, Error};
use fnv::FnvHashMap;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use futures::{future::Either, sink::SinkExt};
use http::header::{self, HeaderValue};
use http::{status::StatusCode, Request, Response};
use http_serve::dir::FsDir;
@@ -443,7 +443,7 @@ impl Service {
stream_id: i32,
open_id: u32,
req: hyper::Request<hyper::Body>,
mut sub_rx: futures::channel::mpsc::UnboundedReceiver<db::LiveSegment>,
sub_rx: futures::channel::mpsc::UnboundedReceiver<db::LiveSegment>,
) {
let upgraded = match hyper::upgrade::on(req).await {
Ok(u) => u,
@@ -452,33 +452,60 @@ impl Service {
return;
}
};
let mut ws = tokio_tungstenite::WebSocketStream::from_raw_socket(
let ws = tokio_tungstenite::WebSocketStream::from_raw_socket(
upgraded,
tungstenite::protocol::Role::Server,
None,
)
.await;
// Start the first segment at a key frame to reduce startup latency.
let mut start_at_key = true;
loop {
let live = match sub_rx.next().await {
Some(l) => l,
None => return,
};
info!("chunk: is_key={:?}", live.is_key);
if let Err(e) = self
.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live, start_at_key)
.await
{
info!("Dropping WebSocket after error: {}", e);
return;
}
start_at_key = false;
if let Err(e) = self
.stream_live_m4s_ws_loop(stream_id, open_id, sub_rx, ws)
.await
{
info!("Dropping WebSocket after error: {}", e);
}
}
/// Helper for `stream_live_m4s_ws` that returns error when the stream is dropped.
/// The outer function logs the error.
async fn stream_live_m4s_ws_loop(
self: Arc<Self>,
stream_id: i32,
open_id: u32,
sub_rx: futures::channel::mpsc::UnboundedReceiver<db::LiveSegment>,
mut ws: tokio_tungstenite::WebSocketStream<hyper::upgrade::Upgraded>,
) -> Result<(), Error> {
let keepalive = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
std::time::Duration::new(30, 0),
));
let mut combo = futures::stream::select(
sub_rx.map(|s| Either::Left(s)),
keepalive.map(|_| Either::Right(())),
);
// On the first LiveSegment, send all the data from the previous key frame onward.
// For LiveSegments, it's okay to send a single non-key frame at a time.
let mut start_at_key = true;
loop {
let next = combo
.next()
.await
.unwrap_or_else(|| unreachable!("timer stream never ends"));
match next {
Either::Left(live) => {
self.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live, start_at_key)
.await?;
start_at_key = false;
}
Either::Right(_) => {
ws.send(tungstenite::Message::Ping(Vec::new())).await?;
}
}
}
}
/// Sends a single live segment chunk of a `live.m4s` stream.
async fn stream_live_m4s_chunk(
&self,
open_id: u32,