avoid unbounded channels in live streams

It's cleaner anyway to use `tokio::broadcast::channel` than the list
of callbacks.

Also make it send pings only on long pauses between frames, as when
the camera is disconnected.
This commit is contained in:
Scott Lamb 2024-08-15 06:12:11 -07:00
parent d43e09d959
commit 89f230004e
7 changed files with 73 additions and 81 deletions

View File

@ -654,6 +654,9 @@ However, there are two important differences:
* The `/view.m4s` endpoint always returns a time range that starts with a key frame; * The `/view.m4s` endpoint always returns a time range that starts with a key frame;
`/live.m4s` messages may not include a key frame. `/live.m4s` messages may not include a key frame.
If the caller falls too many frames behind, the connection will drop with an
text message error.
Note: an earlier version of this API used a `multipart/mixed` segment instead, Note: an earlier version of this API used a `multipart/mixed` segment instead,
compatible with the [multipart-stream-js][multipart-stream-js] library. The compatible with the [multipart-stream-js][multipart-stream-js] library. The
problem with this approach is that browsers have low limits on the number of problem with this approach is that browsers have low limits on the number of

12
server/Cargo.lock generated
View File

@ -1204,7 +1204,6 @@ dependencies = [
"tempfile", "tempfile",
"time 0.1.45", "time 0.1.45",
"tokio", "tokio",
"tokio-stream",
"tokio-tungstenite", "tokio-tungstenite",
"toml", "toml",
"tracing", "tracing",
@ -2236,17 +2235,6 @@ dependencies = [
"syn 2.0.48", "syn 2.0.48",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-tungstenite" name = "tokio-tungstenite"
version = "0.20.1" version = "0.20.1"

View File

@ -67,7 +67,6 @@ smallvec = { version = "1.7", features = ["union"] }
sync_wrapper = "0.1.0" sync_wrapper = "0.1.0"
time = "0.1" time = "0.1"
tokio = { version = "1.24", features = ["macros", "rt-multi-thread", "signal", "sync", "time"] } tokio = { version = "1.24", features = ["macros", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = "0.1.5"
tokio-tungstenite = "0.20.0" tokio-tungstenite = "0.20.0"
toml = "0.8" toml = "0.8"
tracing = { workspace = true } tracing = { workspace = true }

View File

@ -66,6 +66,13 @@ pub const EXPECTED_SCHEMA_VERSION: i32 = 7;
/// Make it one less than a power of two so that the data structure's size is efficient. /// Make it one less than a power of two so that the data structure's size is efficient.
const VIDEO_INDEX_CACHE_LEN: usize = 1023; const VIDEO_INDEX_CACHE_LEN: usize = 1023;
/// Maximum number of live segments references to keep.
///
/// These should only be 16 bytes each, so they're fairly cheap, but we should
/// have some bound in case subscribers are slow, and anyway it's generally
/// not a good experience for subscribers to fall too far behind.
const LIVE_SEGMENTS_BUF_LEN: usize = 128;
const GET_RECORDING_PLAYBACK_SQL: &str = r#" const GET_RECORDING_PLAYBACK_SQL: &str = r#"
select select
video_index video_index
@ -500,21 +507,22 @@ pub struct Stream {
/// The number of recordings in `uncommitted` which are synced and ready to commit. /// The number of recordings in `uncommitted` which are synced and ready to commit.
synced_recordings: usize, synced_recordings: usize,
on_live_segment: Vec<Box<dyn FnMut(LiveSegment) -> bool + Send>>, live_segments: tokio::sync::broadcast::Sender<LiveFrame>,
} }
/// Bounds of a live view segment. Currently this is a single frame of video. /// Bounds of a live view frame.
///
/// This is used for live stream recordings. The stream id should already be known to the /// This is used for live stream recordings. The stream id should already be known to the
/// subscriber. Note this doesn't actually contain the video, just a reference that can be /// subscriber. Note this doesn't actually contain the video, just a reference that can be
/// looked up within the database. /// looked up within the database.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct LiveSegment { pub struct LiveFrame {
pub recording: i32, pub recording: i32,
/// If the segment's one frame is a key frame. /// If the segment's one frame is a key frame.
pub is_key: bool, pub is_key: bool,
/// The pts, relative to the start of the recording, of the start and end of this live segment, /// The pts, relative to the start of the recording, of the start and end of this frame,
/// in 90kHz units. /// in 90kHz units.
pub media_off_90k: Range<i32>, pub media_off_90k: Range<i32>,
} }
@ -823,7 +831,7 @@ impl StreamStateChanger {
cum_runs: 0, cum_runs: 0,
uncommitted: VecDeque::new(), uncommitted: VecDeque::new(),
synced_recordings: 0, synced_recordings: 0,
on_live_segment: Vec::new(), live_segments: tokio::sync::broadcast::channel(LIVE_SEGMENTS_BUF_LEN).0,
}); });
} }
(Entry::Vacant(_), None) => {} (Entry::Vacant(_), None) => {}
@ -962,42 +970,27 @@ impl LockedDatabase {
Ok(()) Ok(())
} }
/// Registers a callback to run on every live segment immediately after it's recorded. /// Returns a watcher for live segments of the given stream.
/// The callback is run with the database lock held, so it must not call back into the database
/// or block. The callback should return false to unregister.
pub fn watch_live( pub fn watch_live(
&mut self, &mut self,
stream_id: i32, stream_id: i32,
cb: Box<dyn FnMut(LiveSegment) -> bool + Send>, ) -> Result<tokio::sync::broadcast::Receiver<LiveFrame>, Error> {
) -> Result<(), Error> {
let s = match self.streams_by_id.get_mut(&stream_id) { let s = match self.streams_by_id.get_mut(&stream_id) {
None => bail!(NotFound, msg("no such stream {stream_id}")), None => bail!(NotFound, msg("no such stream {stream_id}")),
Some(s) => s, Some(s) => s,
}; };
s.on_live_segment.push(cb); Ok(s.live_segments.subscribe())
Ok(())
} }
/// Clears all watches on all streams. pub(crate) fn send_live_segment(&mut self, stream: i32, l: LiveFrame) -> Result<(), Error> {
/// Normally watches are self-cleaning: when a segment is sent, the callback returns false if
/// it is no longer interested (typically because hyper has just noticed the client is no
/// longer connected). This doesn't work when the system is shutting down and nothing more is
/// sent, though.
pub fn clear_watches(&mut self) {
for s in self.streams_by_id.values_mut() {
s.on_live_segment.clear();
}
}
pub(crate) fn send_live_segment(&mut self, stream: i32, l: LiveSegment) -> Result<(), Error> {
let s = match self.streams_by_id.get_mut(&stream) { let s = match self.streams_by_id.get_mut(&stream) {
None => bail!(Internal, msg("no such stream {stream}")), None => bail!(Internal, msg("no such stream {stream}")),
Some(s) => s, Some(s) => s,
}; };
// TODO: use std's retain_mut after it's available in our minimum supported Rust version. // Note that `send` will fail if there are no active receivers.
// <https://github.com/rust-lang/rust/issues/48919> // That's fine, so ignore this error.
odds::vec::VecExt::retain_mut(&mut s.on_live_segment, |cb| cb(l.clone())); let _ = s.live_segments.send(l);
Ok(()) Ok(())
} }
@ -1738,7 +1731,7 @@ impl LockedDatabase {
cum_runs: row.get(7)?, cum_runs: row.get(7)?,
uncommitted: VecDeque::new(), uncommitted: VecDeque::new(),
synced_recordings: 0, synced_recordings: 0,
on_live_segment: Vec::new(), live_segments: tokio::sync::broadcast::channel(LIVE_SEGMENTS_BUF_LEN).0,
}, },
); );
c.streams[type_.index()] = Some(id); c.streams[type_.index()] = Some(id);

View File

@ -915,7 +915,7 @@ impl<F: FileWriter> InnerWriter<F> {
db.lock() db.lock()
.send_live_segment( .send_live_segment(
stream_id, stream_id,
db::LiveSegment { db::LiveFrame {
recording: self.id.recording(), recording: self.id.recording(),
is_key, is_key,
media_off_90k: prev_media_duration_90k..media_duration_90k, media_off_90k: prev_media_duration_90k..media_duration_90k,

View File

@ -524,8 +524,6 @@ async fn inner(
.await .await
.map_err(|e| err!(Unknown, source(e)))?; .map_err(|e| err!(Unknown, source(e)))?;
db.lock().clear_watches();
info!("Waiting for HTTP requests to finish."); info!("Waiting for HTTP requests to finish.");
for h in web_handles { for h in web_handles {
h.await h.await

View File

@ -7,8 +7,9 @@
use std::sync::Arc; use std::sync::Arc;
use base::{bail, err, Error}; use base::{bail, err, Error};
use futures::{future::Either, SinkExt, StreamExt}; use futures::SinkExt;
use http::header; use http::header;
use tokio::sync::broadcast::error::RecvError;
use tokio_tungstenite::{tungstenite, WebSocketStream}; use tokio_tungstenite::{tungstenite, WebSocketStream};
use uuid::Uuid; use uuid::Uuid;
@ -16,6 +17,13 @@ use crate::mp4;
use super::{Caller, Service}; use super::{Caller, Service};
/// Interval at which to send keepalives if there are no frames.
///
/// Chrome appears to time out WebSockets after 60 seconds of inactivity.
/// If the camera is disconnected or not sending frames, we'd like to keep
/// the connection open so everything will recover when the camera comes back.
const KEEPALIVE_AFTER_IDLE: tokio::time::Duration = tokio::time::Duration::from_secs(30);
impl Service { impl Service {
pub(super) async fn stream_live_m4s( pub(super) async fn stream_live_m4s(
self: Arc<Self>, self: Arc<Self>,
@ -31,8 +39,7 @@ impl Service {
let stream_id; let stream_id;
let open_id; let open_id;
let (sub_tx, sub_rx) = futures::channel::mpsc::unbounded(); let mut sub_rx = {
{
let mut db = self.db.lock(); let mut db = self.db.lock();
open_id = match db.open { open_id = match db.open {
None => { None => {
@ -48,45 +55,49 @@ impl Service {
.ok_or_else(|| err!(NotFound, msg("no such camera {uuid}")))?; .ok_or_else(|| err!(NotFound, msg("no such camera {uuid}")))?;
stream_id = camera.streams[stream_type.index()] stream_id = camera.streams[stream_type.index()]
.ok_or_else(|| err!(NotFound, msg("no such stream {uuid}/{stream_type}")))?; .ok_or_else(|| err!(NotFound, msg("no such stream {uuid}/{stream_type}")))?;
db.watch_live( db.watch_live(stream_id).expect("stream_id refed by camera")
stream_id, };
Box::new(move |l| sub_tx.unbounded_send(l).is_ok()),
)
.expect("stream_id refed by camera");
}
let keepalive = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval( let mut keepalive = tokio::time::interval(KEEPALIVE_AFTER_IDLE);
std::time::Duration::new(30, 0), keepalive.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
));
let mut combo = futures::stream::select(
sub_rx.map(Either::Left),
keepalive.map(|_| Either::Right(())),
);
// On the first LiveSegment, send all the data from the previous key frame onward. // On the first LiveFrame, send all the data from the previous key frame
// For LiveSegments, it's okay to send a single non-key frame at a time. // onward. Afterward, send a single (often non-key) frame at a time.
let mut start_at_key = true; let mut start_at_key = true;
loop { loop {
let next = combo tokio::select! {
.next() biased;
.await
.unwrap_or_else(|| unreachable!("timer stream never ends")); next = sub_rx.recv() => {
match next { match next {
Either::Left(live) => { Ok(l) => {
if !self keepalive.reset_after(KEEPALIVE_AFTER_IDLE);
.stream_live_m4s_chunk(open_id, stream_id, ws, live, start_at_key) if !self.stream_live_m4s_chunk(
.await? open_id,
{ stream_id,
ws,
l,
start_at_key,
).await? {
return Ok(()); return Ok(());
} }
start_at_key = false; start_at_key = false;
} }
Either::Right(_) => { Err(RecvError::Closed) => {
if ws bail!(Internal, msg("live stream closed unexpectedly"));
.send(tungstenite::Message::Ping(Vec::new())) }
.await Err(RecvError::Lagged(frames)) => {
.is_err() bail!(
{ ResourceExhausted,
msg("subscriber {frames} frames further behind than allowed; \
this typically indicates insufficient bandwidth"),
)
}
}
}
_ = keepalive.tick() => {
if ws.send(tungstenite::Message::Ping(Vec::new())).await.is_err() {
return Ok(()); return Ok(());
} }
} }
@ -101,7 +112,7 @@ impl Service {
open_id: u32, open_id: u32,
stream_id: i32, stream_id: i32,
ws: &mut tokio_tungstenite::WebSocketStream<hyper::upgrade::Upgraded>, ws: &mut tokio_tungstenite::WebSocketStream<hyper::upgrade::Upgraded>,
live: db::LiveSegment, live: db::LiveFrame,
start_at_key: bool, start_at_key: bool,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment); let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment);