From 2954a56fce096d5900a39146d829971f616868e9 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Thu, 25 Mar 2021 23:01:38 -0700 Subject: [PATCH] send keepalives on live.m4s Chrome appears to time out at 60 seconds of inactivity otherwise. I think it's better to keep the stream open, even if the camera is broken. The implementation looks awkward, but that might be the state of Rust async right now. --- design/api.md | 2 ++ server/Cargo.lock | 13 ++++----- server/Cargo.toml | 3 ++- server/src/web.rs | 67 +++++++++++++++++++++++++++++++++-------------- 4 files changed, 58 insertions(+), 27 deletions(-) diff --git a/design/api.md b/design/api.md index dd61a80..0081832 100644 --- a/design/api.md +++ b/design/api.md @@ -490,6 +490,8 @@ followed by by a `.mp4` media segment. The following headers will be included: * `X-Media-Time-Range`: the relative media start and end times of these frames within the recording, as a half-open interval. +The server will also send pings, currently at 30-second intervals. + The WebSocket will always open immediately but will receive messages only while the backing RTSP stream is connected. diff --git a/server/Cargo.lock b/server/Cargo.lock index d31f055..8f6b5d1 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -1256,6 +1256,7 @@ dependencies = [ "tempdir", "time", "tokio", + "tokio-stream", "tokio-tungstenite", "url", "uuid", @@ -2354,9 +2355,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.1.0" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8efab2086f17abcddb8f756117665c958feee6b2e39974c2f1600592ab3a4195" +checksum = "134af885d758d645f0f0505c9a8b3f9bf8a348fd822e112ab5248138348f1722" dependencies = [ "autocfg", "bytes", @@ -2374,9 +2375,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42517d2975ca3114b22a16192634e8241dc5cc1f130be194645970cc1c371494" +checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57" dependencies = [ "proc-macro2 1.0.24", "quote 1.0.8", @@ -2385,9 +2386,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.2" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76066865172052eb8796c686f0b441a93df8b08d40a950b062ffb9a426f00edd" +checksum = "e177a5d8c3bf36de9ebe6d58537d8879e964332f93fb3339e43f618c81361af0" dependencies = [ "futures-core", "pin-project-lite", diff --git a/server/Cargo.toml b/server/Cargo.toml index b0ee0e8..3bcdf6c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -57,7 +57,8 @@ smallvec = "1.0" structopt = { version = "0.3.13", features = ["default", "wrap_help"] } sync_wrapper = "0.1.0" time = "0.1" -tokio = { version = "1.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal"] } +tokio = { version = "1.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] } +tokio-stream = "0.1.5" tokio-tungstenite = "0.13.0" url = "2.1.1" uuid = { version = "0.8", features = ["serde", "std", "v4"] } diff --git a/server/src/web.rs b/server/src/web.rs index d453360..31d06ed 100644 --- a/server/src/web.rs +++ b/server/src/web.rs @@ -14,8 +14,8 @@ use db::dir::SampleFileDir; use db::{auth, recording}; use failure::{bail, format_err, Error}; use fnv::FnvHashMap; -use futures::sink::SinkExt; use futures::stream::StreamExt; +use futures::{future::Either, sink::SinkExt}; use http::header::{self, HeaderValue}; use http::{status::StatusCode, Request, Response}; use http_serve::dir::FsDir; @@ -443,7 +443,7 @@ impl Service { stream_id: i32, open_id: u32, req: hyper::Request, - mut sub_rx: futures::channel::mpsc::UnboundedReceiver, + sub_rx: futures::channel::mpsc::UnboundedReceiver, ) { let upgraded = match hyper::upgrade::on(req).await { Ok(u) => u, @@ -452,33 +452,60 @@ impl Service { return; } }; - let mut ws = tokio_tungstenite::WebSocketStream::from_raw_socket( + let ws = tokio_tungstenite::WebSocketStream::from_raw_socket( upgraded, tungstenite::protocol::Role::Server, None, ) .await; - // Start the first segment at a key frame to reduce startup latency. - let mut start_at_key = true; - loop { - let live = match sub_rx.next().await { - Some(l) => l, - None => return, - }; - - info!("chunk: is_key={:?}", live.is_key); - if let Err(e) = self - .stream_live_m4s_chunk(open_id, stream_id, &mut ws, live, start_at_key) - .await - { - info!("Dropping WebSocket after error: {}", e); - return; - } - start_at_key = false; + if let Err(e) = self + .stream_live_m4s_ws_loop(stream_id, open_id, sub_rx, ws) + .await + { + info!("Dropping WebSocket after error: {}", e); } } + /// Helper for `stream_live_m4s_ws` that returns error when the stream is dropped. + /// The outer function logs the error. + async fn stream_live_m4s_ws_loop( + self: Arc, + stream_id: i32, + open_id: u32, + sub_rx: futures::channel::mpsc::UnboundedReceiver, + mut ws: tokio_tungstenite::WebSocketStream, + ) -> Result<(), Error> { + let keepalive = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval( + std::time::Duration::new(30, 0), + )); + let mut combo = futures::stream::select( + sub_rx.map(|s| Either::Left(s)), + keepalive.map(|_| Either::Right(())), + ); + + // On the first LiveSegment, send all the data from the previous key frame onward. + // For LiveSegments, it's okay to send a single non-key frame at a time. + let mut start_at_key = true; + loop { + let next = combo + .next() + .await + .unwrap_or_else(|| unreachable!("timer stream never ends")); + match next { + Either::Left(live) => { + self.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live, start_at_key) + .await?; + start_at_key = false; + } + Either::Right(_) => { + ws.send(tungstenite::Message::Ping(Vec::new())).await?; + } + } + } + } + + /// Sends a single live segment chunk of a `live.m4s` stream. async fn stream_live_m4s_chunk( &self, open_id: u32,