diff --git a/ref/api.md b/ref/api.md index 0706a67..17e0060 100644 --- a/ref/api.md +++ b/ref/api.md @@ -654,6 +654,9 @@ However, there are two important differences: * The `/view.m4s` endpoint always returns a time range that starts with a key frame; `/live.m4s` messages may not include a key frame. +If the caller falls too many frames behind, the connection will drop with an +text message error. + Note: an earlier version of this API used a `multipart/mixed` segment instead, compatible with the [multipart-stream-js][multipart-stream-js] library. The problem with this approach is that browsers have low limits on the number of diff --git a/server/Cargo.lock b/server/Cargo.lock index 1d0d217..ef61977 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -1204,7 +1204,6 @@ dependencies = [ "tempfile", "time 0.1.45", "tokio", - "tokio-stream", "tokio-tungstenite", "toml", "tracing", @@ -2236,17 +2235,6 @@ dependencies = [ "syn 2.0.48", ] -[[package]] -name = "tokio-stream" -version = "0.1.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-tungstenite" version = "0.20.1" diff --git a/server/Cargo.toml b/server/Cargo.toml index df0887e..3e0c748 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -67,7 +67,6 @@ smallvec = { version = "1.7", features = ["union"] } sync_wrapper = "0.1.0" time = "0.1" tokio = { version = "1.24", features = ["macros", "rt-multi-thread", "signal", "sync", "time"] } -tokio-stream = "0.1.5" tokio-tungstenite = "0.20.0" toml = "0.8" tracing = { workspace = true } diff --git a/server/db/db.rs b/server/db/db.rs index 742afce..a4925bd 100644 --- a/server/db/db.rs +++ b/server/db/db.rs @@ -66,6 +66,13 @@ pub const EXPECTED_SCHEMA_VERSION: i32 = 7; /// Make it one less than a power of two so that the data structure's size is efficient. const VIDEO_INDEX_CACHE_LEN: usize = 1023; +/// Maximum number of live segments references to keep. +/// +/// These should only be 16 bytes each, so they're fairly cheap, but we should +/// have some bound in case subscribers are slow, and anyway it's generally +/// not a good experience for subscribers to fall too far behind. +const LIVE_SEGMENTS_BUF_LEN: usize = 128; + const GET_RECORDING_PLAYBACK_SQL: &str = r#" select video_index @@ -500,21 +507,22 @@ pub struct Stream { /// The number of recordings in `uncommitted` which are synced and ready to commit. synced_recordings: usize, - on_live_segment: Vec bool + Send>>, + live_segments: tokio::sync::broadcast::Sender, } -/// Bounds of a live view segment. Currently this is a single frame of video. +/// Bounds of a live view frame. +/// /// This is used for live stream recordings. The stream id should already be known to the /// subscriber. Note this doesn't actually contain the video, just a reference that can be /// looked up within the database. #[derive(Clone, Debug)] -pub struct LiveSegment { +pub struct LiveFrame { pub recording: i32, /// If the segment's one frame is a key frame. pub is_key: bool, - /// The pts, relative to the start of the recording, of the start and end of this live segment, + /// The pts, relative to the start of the recording, of the start and end of this frame, /// in 90kHz units. pub media_off_90k: Range, } @@ -823,7 +831,7 @@ impl StreamStateChanger { cum_runs: 0, uncommitted: VecDeque::new(), synced_recordings: 0, - on_live_segment: Vec::new(), + live_segments: tokio::sync::broadcast::channel(LIVE_SEGMENTS_BUF_LEN).0, }); } (Entry::Vacant(_), None) => {} @@ -962,42 +970,27 @@ impl LockedDatabase { Ok(()) } - /// Registers a callback to run on every live segment immediately after it's recorded. - /// The callback is run with the database lock held, so it must not call back into the database - /// or block. The callback should return false to unregister. + /// Returns a watcher for live segments of the given stream. pub fn watch_live( &mut self, stream_id: i32, - cb: Box bool + Send>, - ) -> Result<(), Error> { + ) -> Result, Error> { let s = match self.streams_by_id.get_mut(&stream_id) { None => bail!(NotFound, msg("no such stream {stream_id}")), Some(s) => s, }; - s.on_live_segment.push(cb); - Ok(()) + Ok(s.live_segments.subscribe()) } - /// Clears all watches on all streams. - /// Normally watches are self-cleaning: when a segment is sent, the callback returns false if - /// it is no longer interested (typically because hyper has just noticed the client is no - /// longer connected). This doesn't work when the system is shutting down and nothing more is - /// sent, though. - pub fn clear_watches(&mut self) { - for s in self.streams_by_id.values_mut() { - s.on_live_segment.clear(); - } - } - - pub(crate) fn send_live_segment(&mut self, stream: i32, l: LiveSegment) -> Result<(), Error> { + pub(crate) fn send_live_segment(&mut self, stream: i32, l: LiveFrame) -> Result<(), Error> { let s = match self.streams_by_id.get_mut(&stream) { None => bail!(Internal, msg("no such stream {stream}")), Some(s) => s, }; - // TODO: use std's retain_mut after it's available in our minimum supported Rust version. - // - odds::vec::VecExt::retain_mut(&mut s.on_live_segment, |cb| cb(l.clone())); + // Note that `send` will fail if there are no active receivers. + // That's fine, so ignore this error. + let _ = s.live_segments.send(l); Ok(()) } @@ -1738,7 +1731,7 @@ impl LockedDatabase { cum_runs: row.get(7)?, uncommitted: VecDeque::new(), synced_recordings: 0, - on_live_segment: Vec::new(), + live_segments: tokio::sync::broadcast::channel(LIVE_SEGMENTS_BUF_LEN).0, }, ); c.streams[type_.index()] = Some(id); diff --git a/server/db/writer.rs b/server/db/writer.rs index 5f8b1f3..7bcc526 100644 --- a/server/db/writer.rs +++ b/server/db/writer.rs @@ -915,7 +915,7 @@ impl InnerWriter { db.lock() .send_live_segment( stream_id, - db::LiveSegment { + db::LiveFrame { recording: self.id.recording(), is_key, media_off_90k: prev_media_duration_90k..media_duration_90k, diff --git a/server/src/cmds/run/mod.rs b/server/src/cmds/run/mod.rs index 7c89b9a..0a566fd 100644 --- a/server/src/cmds/run/mod.rs +++ b/server/src/cmds/run/mod.rs @@ -524,8 +524,6 @@ async fn inner( .await .map_err(|e| err!(Unknown, source(e)))?; - db.lock().clear_watches(); - info!("Waiting for HTTP requests to finish."); for h in web_handles { h.await diff --git a/server/src/web/live.rs b/server/src/web/live.rs index da4f793..3a579e6 100644 --- a/server/src/web/live.rs +++ b/server/src/web/live.rs @@ -7,8 +7,9 @@ use std::sync::Arc; use base::{bail, err, Error}; -use futures::{future::Either, SinkExt, StreamExt}; +use futures::SinkExt; use http::header; +use tokio::sync::broadcast::error::RecvError; use tokio_tungstenite::{tungstenite, WebSocketStream}; use uuid::Uuid; @@ -16,6 +17,13 @@ use crate::mp4; use super::{Caller, Service}; +/// Interval at which to send keepalives if there are no frames. +/// +/// Chrome appears to time out WebSockets after 60 seconds of inactivity. +/// If the camera is disconnected or not sending frames, we'd like to keep +/// the connection open so everything will recover when the camera comes back. +const KEEPALIVE_AFTER_IDLE: tokio::time::Duration = tokio::time::Duration::from_secs(30); + impl Service { pub(super) async fn stream_live_m4s( self: Arc, @@ -31,8 +39,7 @@ impl Service { let stream_id; let open_id; - let (sub_tx, sub_rx) = futures::channel::mpsc::unbounded(); - { + let mut sub_rx = { let mut db = self.db.lock(); open_id = match db.open { None => { @@ -48,45 +55,49 @@ impl Service { .ok_or_else(|| err!(NotFound, msg("no such camera {uuid}")))?; stream_id = camera.streams[stream_type.index()] .ok_or_else(|| err!(NotFound, msg("no such stream {uuid}/{stream_type}")))?; - db.watch_live( - stream_id, - Box::new(move |l| sub_tx.unbounded_send(l).is_ok()), - ) - .expect("stream_id refed by camera"); - } + db.watch_live(stream_id).expect("stream_id refed by camera") + }; - let keepalive = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval( - std::time::Duration::new(30, 0), - )); - let mut combo = futures::stream::select( - sub_rx.map(Either::Left), - keepalive.map(|_| Either::Right(())), - ); + let mut keepalive = tokio::time::interval(KEEPALIVE_AFTER_IDLE); + keepalive.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - // On the first LiveSegment, send all the data from the previous key frame onward. - // For LiveSegments, it's okay to send a single non-key frame at a time. + // On the first LiveFrame, send all the data from the previous key frame + // onward. Afterward, send a single (often non-key) frame at a time. let mut start_at_key = true; loop { - let next = combo - .next() - .await - .unwrap_or_else(|| unreachable!("timer stream never ends")); - match next { - Either::Left(live) => { - if !self - .stream_live_m4s_chunk(open_id, stream_id, ws, live, start_at_key) - .await? - { - return Ok(()); + tokio::select! { + biased; + + next = sub_rx.recv() => { + match next { + Ok(l) => { + keepalive.reset_after(KEEPALIVE_AFTER_IDLE); + if !self.stream_live_m4s_chunk( + open_id, + stream_id, + ws, + l, + start_at_key, + ).await? { + return Ok(()); + } + start_at_key = false; + } + Err(RecvError::Closed) => { + bail!(Internal, msg("live stream closed unexpectedly")); + } + Err(RecvError::Lagged(frames)) => { + bail!( + ResourceExhausted, + msg("subscriber {frames} frames further behind than allowed; \ + this typically indicates insufficient bandwidth"), + ) + } } - start_at_key = false; } - Either::Right(_) => { - if ws - .send(tungstenite::Message::Ping(Vec::new())) - .await - .is_err() - { + + _ = keepalive.tick() => { + if ws.send(tungstenite::Message::Ping(Vec::new())).await.is_err() { return Ok(()); } } @@ -101,7 +112,7 @@ impl Service { open_id: u32, stream_id: i32, ws: &mut tokio_tungstenite::WebSocketStream, - live: db::LiveSegment, + live: db::LiveFrame, start_at_key: bool, ) -> Result { let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment);