From 3bc552b95004d13223120d1c3fcf5d43a6af83f6 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Wed, 13 Apr 2022 14:37:45 -0700 Subject: [PATCH] seamless mid-stream video parameter changes For #217. This handles the recording logic. May still need fixes to playback and/or live stream logic. --- CHANGELOG.md | 3 + server/db/db.rs | 2 +- server/db/writer.rs | 56 ++++++++++------ server/src/cmds/config/cameras.rs | 15 +++-- server/src/mp4.rs | 25 ++++---- server/src/stream.rs | 103 ++++++++++++++++++++---------- server/src/streamer.rs | 71 ++++++++++++-------- 7 files changed, 174 insertions(+), 101 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 588c986..6790868 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ Each release is tagged in Git and on the Docker repository ## unreleased * upgrade to Retina 0.3.9, improving camera interop and diagnostics +* [#217](https://github.com/scottlamb/moonfire-nvr/issues/217): no longer + drop the connection to the camera when it changes video parameters, instead + continuing the run seamlessly. ## `v0.7.3` (2022-03-22) diff --git a/server/db/db.rs b/server/db/db.rs index cd3bf61..7ef67ba 100644 --- a/server/db/db.rs +++ b/server/db/db.rs @@ -156,7 +156,7 @@ impl VideoSampleEntry { } } -#[derive(PartialEq, Eq)] +#[derive(Clone, PartialEq, Eq)] pub struct VideoSampleEntryToInsert { pub data: Vec, pub rfc6381_codec: String, diff --git a/server/db/writer.rs b/server/db/writer.rs index d430fbc..997095e 100644 --- a/server/db/writer.rs +++ b/server/db/writer.rs @@ -615,7 +615,6 @@ pub struct Writer<'a, C: Clocks + Clone, D: DirWriter> { db: &'a db::Database, channel: &'a SyncerChannel, stream_id: i32, - video_sample_entry_id: i32, state: WriterState, } @@ -634,6 +633,7 @@ struct InnerWriter { r: Arc>, e: recording::SampleIndexEncoder, id: CompositeId, + video_sample_entry_id: i32, hasher: blake3::Hasher, @@ -680,26 +680,34 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { db: &'a db::Database, channel: &'a SyncerChannel, stream_id: i32, - video_sample_entry_id: i32, ) -> Self { Writer { dir, db, channel, stream_id, - video_sample_entry_id, state: WriterState::Unopened, } } - /// Opens a new writer. + /// Opens a new recording if not already open. + /// /// On successful return, `self.state` will be `WriterState::Open(w)` with `w` violating the /// invariant that `unindexed_sample` is `Some`. The caller (`write`) is responsible for /// correcting this. - fn open(&mut self, shutdown_rx: &mut base::shutdown::Receiver) -> Result<(), Error> { + fn open( + &mut self, + shutdown_rx: &mut base::shutdown::Receiver, + video_sample_entry_id: i32, + ) -> Result<(), Error> { let prev = match self.state { WriterState::Unopened => None, - WriterState::Open(_) => return Ok(()), + WriterState::Open(ref o) => { + if o.video_sample_entry_id != video_sample_entry_id { + bail!("inconsistent video_sample_entry_id"); + } + return Ok(()); + } WriterState::Closed(prev) => Some(prev), }; let (id, r) = self.db.lock().add_recording( @@ -709,7 +717,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { start: prev .map(|p| p.end) .unwrap_or(recording::Time(i64::max_value())), - video_sample_entry_id: self.video_sample_entry_id, + video_sample_entry_id, flags: db::RecordingFlags::Growing as i32, ..Default::default() }, @@ -726,6 +734,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { hasher: blake3::Hasher::new(), local_start: recording::Time(i64::max_value()), unindexed_sample: None, + video_sample_entry_id, }); Ok(()) } @@ -747,8 +756,9 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { local_time: recording::Time, pts_90k: i64, is_key: bool, + video_sample_entry_id: i32, ) -> Result<(), Error> { - self.open(shutdown_rx)?; + self.open(shutdown_rx, video_sample_entry_id)?; let w = match self.state { WriterState::Open(ref mut w) => w, _ => unreachable!(), @@ -816,9 +826,14 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { Ok(()) } - /// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's - /// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait, - /// swallowing errors and using a zero duration for the last sample. + /// Cleanly closes a single recording within this writer, using a supplied + /// pts of the next sample for the last sample's duration (if known). + /// + /// The `Writer` may be used again, causing another recording to be created + /// within the same run. + /// + /// If the `Writer` is dropped without `close`, the `Drop` trait impl will + /// close, swallowing errors and using a zero duration for the last sample. pub fn close(&mut self, next_pts: Option, reason: Option) -> Result<(), Error> { self.state = match mem::replace(&mut self.state, WriterState::Unopened) { WriterState::Open(w) => { @@ -1179,13 +1194,7 @@ mod tests { rfc6381_codec: "avc1.000000".to_owned(), }) .unwrap(); - let mut w = Writer::new( - &h.dir, - &h.db, - &h.channel, - testutil::TEST_STREAM_ID, - video_sample_entry_id, - ); + let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID); h.dir.expect(MockDirAction::Create( CompositeId::new(1, 0), Box::new(|_id| Err(nix::Error::EIO)), @@ -1200,8 +1209,15 @@ mod tests { )); f.expect(MockFileAction::Write(Box::new(|_| Ok(1)))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); - w.write(&mut h.shutdown_rx, b"1", recording::Time(1), 0, true) - .unwrap(); + w.write( + &mut h.shutdown_rx, + b"1", + recording::Time(1), + 0, + true, + video_sample_entry_id, + ) + .unwrap(); let e = w .write( diff --git a/server/src/cmds/config/cameras.rs b/server/src/cmds/config/cameras.rs index 55c192e..b1cfb9c 100644 --- a/server/src/cmds/config/cameras.rs +++ b/server/src/cmds/config/cameras.rs @@ -211,7 +211,7 @@ fn press_test_inner( transport: retina::client::Transport, ) -> Result { let _enter = handle.enter(); - let (extra_data, stream) = stream::OPENER.open( + let stream = stream::OPENER.open( "test stream".to_owned(), url, retina::client::SessionOptions::default() @@ -222,10 +222,17 @@ fn press_test_inner( }) .transport(transport), )?; + let video_sample_entry = stream.video_sample_entry(); Ok(format!( - "{}x{} video stream served by tool {:?}", - extra_data.width, - extra_data.height, + "codec: {}\n\ + dimensions: {}x{}\n\ + pixel aspect ratio: {}x{}\n\ + tool: {:?}", + &video_sample_entry.rfc6381_codec, + video_sample_entry.width, + video_sample_entry.height, + video_sample_entry.pasp_h_spacing, + video_sample_entry.pasp_v_spacing, stream.tool(), )) } diff --git a/server/src/mp4.rs b/server/src/mp4.rs index 5d32348..bd1c4e5 100644 --- a/server/src/mp4.rs +++ b/server/src/mp4.rs @@ -2278,21 +2278,18 @@ mod tests { } fn copy_mp4_to_db(db: &mut TestDb) { - let (extra_data, input) = - stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap(); + let input = stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap(); let mut input: Box = Box::new(input); // 2015-04-26 00:00:00 UTC. const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); - let video_sample_entry_id = db.db.lock().insert_video_sample_entry(extra_data).unwrap(); + let video_sample_entry_id = db + .db + .lock() + .insert_video_sample_entry(input.video_sample_entry().clone()) + .unwrap(); let dir = db.dirs_by_stream_id.get(&TEST_STREAM_ID).unwrap(); - let mut output = writer::Writer::new( - dir, - &db.db, - &db.syncer_channel, - TEST_STREAM_ID, - video_sample_entry_id, - ); + let mut output = writer::Writer::new(dir, &db.db, &db.syncer_channel, TEST_STREAM_ID); // end_pts is the pts of the end of the most recent frame (start + duration). // It's needed because dir::Writer calculates a packet's duration from its pts and the @@ -2321,6 +2318,7 @@ mod tests { frame_time, pkt.pts, pkt.is_key, + video_sample_entry_id, ) .unwrap(); end_pts = Some(pkt.pts + i64::from(pkt.duration)); @@ -2391,9 +2389,8 @@ mod tests { } fn compare_mp4s(new_filename: &str, pts_offset: i64, shorten: i64) { - let (orig_extra_data, orig) = - stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap(); - let (new_extra_data, new) = stream::testutil::Mp4Stream::open(new_filename).unwrap(); + let orig = stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap(); + let new = stream::testutil::Mp4Stream::open(new_filename).unwrap(); if pts_offset > 0 { // The mp4 crate doesn't interpret the edit list. Manually inspect it. @@ -2410,7 +2407,7 @@ mod tests { let mut orig: Box = Box::new(orig); let mut new: Box = Box::new(new); - assert_eq!(orig_extra_data, new_extra_data); + assert_eq!(orig.video_sample_entry(), new.video_sample_entry()); let mut final_durations = None; for i in 0.. { let orig_pkt = match orig.next() { diff --git a/server/src/stream.rs b/server/src/stream.rs index 827812c..d820130 100644 --- a/server/src/stream.rs +++ b/server/src/stream.rs @@ -8,7 +8,7 @@ use failure::format_err; use failure::{bail, Error}; use futures::StreamExt; use retina::client::Demuxed; -use retina::codec::{CodecItem, VideoParameters}; +use retina::codec::CodecItem; use std::pin::Pin; use std::result::Result; use url::Url; @@ -26,7 +26,7 @@ pub trait Opener: Send + Sync { label: String, url: Url, options: retina::client::SessionOptions, - ) -> Result<(db::VideoSampleEntryToInsert, Box), Error>; + ) -> Result, Error>; } pub struct VideoFrame { @@ -38,10 +38,13 @@ pub struct VideoFrame { pub is_key: bool, pub data: Bytes, + + pub new_video_sample_entry: bool, } pub trait Stream: Send { fn tool(&self) -> Option<&retina::client::Tool>; + fn video_sample_entry(&self) -> &db::VideoSampleEntryToInsert; fn next(&mut self) -> Result; } @@ -55,22 +58,20 @@ impl Opener for RealOpener { label: String, url: Url, options: retina::client::SessionOptions, - ) -> Result<(db::VideoSampleEntryToInsert, Box), Error> { + ) -> Result, Error> { let options = options.user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION"))); let rt_handle = tokio::runtime::Handle::current(); - let (inner, video_params, first_frame) = rt_handle + let (inner, first_frame) = rt_handle .block_on(rt_handle.spawn(tokio::time::timeout( RETINA_TIMEOUT, RetinaStreamInner::play(label, url, options), ))) .expect("RetinaStream::play task panicked, see earlier error")??; - let extra_data = h264::parse_extra_data(video_params.extra_data())?; - let stream = Box::new(RetinaStream { + Ok(Box::new(RetinaStream { inner: Some(inner), rt_handle, first_frame: Some(first_frame), - }); - Ok((extra_data, stream)) + })) } } @@ -102,6 +103,7 @@ struct RetinaStream { struct RetinaStreamInner { label: String, session: Demuxed, + video_sample_entry: db::VideoSampleEntryToInsert, } impl RetinaStreamInner { @@ -110,7 +112,7 @@ impl RetinaStreamInner { label: String, url: Url, options: retina::client::SessionOptions, - ) -> Result<(Box, Box, retina::codec::VideoFrame), Error> { + ) -> Result<(Box, retina::codec::VideoFrame), Error> { let mut session = retina::client::Session::describe(url, options).await?; log::debug!("connected to {:?}, tool {:?}", &label, session.tool()); let (video_i, mut video_params) = session @@ -151,12 +153,17 @@ impl RetinaStreamInner { Some(Ok(_)) => {} } }; - let self_ = Box::new(Self { label, session }); - Ok(( - self_, - video_params.ok_or_else(|| format_err!("couldn't find H.264 parameters"))?, - first_frame, - )) + let video_sample_entry = h264::parse_extra_data( + video_params + .ok_or_else(|| format_err!("couldn't find H.264 parameters"))? + .extra_data(), + )?; + let self_ = Box::new(Self { + label, + session, + video_sample_entry, + }); + Ok((self_, first_frame)) } /// Fetches a non-initial frame. @@ -167,10 +174,6 @@ impl RetinaStreamInner { match Pin::new(&mut self.session).next().await.transpose()? { None => bail!("end of stream"), Some(CodecItem::VideoFrame(v)) => { - if let Some(p) = v.new_parameters { - // TODO: we could start a new recording without dropping the connection. - bail!("parameter change: {:?}", p); - } if v.loss > 0 { log::warn!( "{}: lost {} RTP packets @ {}", @@ -192,25 +195,48 @@ impl Stream for RetinaStream { self.inner.as_ref().unwrap().session.tool() } + fn video_sample_entry(&self) -> &db::VideoSampleEntryToInsert { + &self.inner.as_ref().unwrap().video_sample_entry + } + fn next(&mut self) -> Result { - let frame = self.first_frame.take().map(Ok).unwrap_or_else(move || { - let inner = self.inner.take().unwrap(); - let (inner, frame) = self - .rt_handle - .block_on(self.rt_handle.spawn(tokio::time::timeout( - RETINA_TIMEOUT, - inner.fetch_next_frame(), - ))) - .expect("fetch_next_frame task panicked, see earlier error") - .map_err(|_| format_err!("timeout getting next frame"))??; - self.inner = Some(inner); - Ok::<_, failure::Error>(frame) - })?; + let (frame, new_video_sample_entry) = self + .first_frame + .take() + .map(|f| Ok((f, false))) + .unwrap_or_else(move || { + let inner = self.inner.take().unwrap(); + let (mut inner, mut frame) = self + .rt_handle + .block_on(self.rt_handle.spawn(tokio::time::timeout( + RETINA_TIMEOUT, + inner.fetch_next_frame(), + ))) + .expect("fetch_next_frame task panicked, see earlier error") + .map_err(|_| format_err!("timeout getting next frame"))??; + let mut new_video_sample_entry = false; + if let Some(p) = frame.new_parameters.take() { + let video_sample_entry = h264::parse_extra_data(p.extra_data())?; + if video_sample_entry != inner.video_sample_entry { + log::debug!( + "{}: parameter change:\nold: {:?}\nnew: {:?}", + &inner.label, + &inner.video_sample_entry, + &video_sample_entry + ); + inner.video_sample_entry = video_sample_entry; + new_video_sample_entry = true; + } + }; + self.inner = Some(inner); + Ok::<_, failure::Error>((frame, new_video_sample_entry)) + })?; Ok(VideoFrame { pts: frame.timestamp.elapsed(), duration: 0, is_key: frame.is_random_access_point, data: frame.into_data(), + new_video_sample_entry, }) } } @@ -225,11 +251,12 @@ pub mod testutil { reader: mp4::Mp4Reader>>, h264_track_id: u32, next_sample_id: u32, + video_sample_entry: db::VideoSampleEntryToInsert, } impl Mp4Stream { /// Opens a stream, with a return matching that expected by [`Opener`]. - pub fn open(path: &str) -> Result<(db::VideoSampleEntryToInsert, Self), Error> { + pub fn open(path: &str) -> Result { let f = std::fs::read(path)?; let len = f.len(); let reader = mp4::Mp4Reader::read_header(Cursor::new(f), u64::try_from(len)?)?; @@ -241,14 +268,15 @@ pub mod testutil { None => bail!("expected a H.264 track"), Some(t) => t, }; - let extra_data = h264::parse_extra_data(&h264_track.extra_data()?[..])?; + let video_sample_entry = h264::parse_extra_data(&h264_track.extra_data()?[..])?; let h264_track_id = h264_track.track_id(); let stream = Mp4Stream { reader, h264_track_id, next_sample_id: 1, + video_sample_entry, }; - Ok((extra_data, stream)) + Ok(stream) } pub fn duration(&self) -> u64 { @@ -282,7 +310,12 @@ pub mod testutil { duration: sample.duration as i32, is_key: sample.is_sync, data: sample.bytes, + new_video_sample_entry: false, }) } + + fn video_sample_entry(&self) -> &db::VideoSampleEntryToInsert { + &self.video_sample_entry + } } } diff --git a/server/src/streamer.rs b/server/src/streamer.rs index a2b7264..26d7be9 100644 --- a/server/src/streamer.rs +++ b/server/src/streamer.rs @@ -159,7 +159,7 @@ where } } - let (extra_data, mut stream) = { + let mut stream = { let _t = TimerGuard::new(&clocks, || format!("opening {}", self.url.as_str())); self.opener.open( self.short_name.clone(), @@ -178,34 +178,33 @@ where )? }; let realtime_offset = self.db.clocks().realtime() - clocks.monotonic(); - let video_sample_entry_id = { + let mut video_sample_entry_id = { let _t = TimerGuard::new(&clocks, || "inserting video sample entry"); - self.db.lock().insert_video_sample_entry(extra_data)? + self.db + .lock() + .insert_video_sample_entry(stream.video_sample_entry().clone())? }; let mut seen_key_frame = false; - // Seconds since epoch at which to next rotate. + // Seconds since epoch at which to next rotate. See comment at start + // of while loop. let mut rotate: Option = None; - let mut w = writer::Writer::new( - &self.dir, - &self.db, - &self.syncer_channel, - self.stream_id, - video_sample_entry_id, - ); + let mut w = writer::Writer::new(&self.dir, &self.db, &self.syncer_channel, self.stream_id); while self.shutdown_rx.check().is_ok() { - let pkt = { + // `rotate` should now be set iff `w` has an open recording. + + let frame = { let _t = TimerGuard::new(&clocks, || "getting next packet"); stream.next() }; - let pkt = match pkt { - Ok(p) => p, + let frame = match frame { + Ok(f) => f, Err(e) => { let _ = w.close(None, Some(e.to_string())); return Err(e); } }; - if !seen_key_frame && !pkt.is_key { + if !seen_key_frame && !frame.is_key { continue; } else if !seen_key_frame { debug!("{}: have first key frame", self.short_name); @@ -214,10 +213,24 @@ where let frame_realtime = clocks.monotonic() + realtime_offset; let local_time = recording::Time::new(frame_realtime); rotate = if let Some(r) = rotate { - if frame_realtime.sec > r && pkt.is_key { - trace!("{}: write on normal rotation", self.short_name); + if frame_realtime.sec > r && frame.is_key { + trace!("{}: close on normal rotation", self.short_name); let _t = TimerGuard::new(&clocks, || "closing writer"); - w.close(Some(pkt.pts), None)?; + w.close(Some(frame.pts), None)?; + None + } else if frame.new_video_sample_entry { + if !frame.is_key { + bail!("parameter change on non-key frame"); + } + trace!("{}: close on parameter change", self.short_name); + video_sample_entry_id = { + let _t = TimerGuard::new(&clocks, || "inserting video sample entry"); + self.db + .lock() + .insert_video_sample_entry(stream.video_sample_entry().clone())? + }; + let _t = TimerGuard::new(&clocks, || "closing writer"); + w.close(Some(frame.pts), None)?; None } else { Some(r) @@ -249,13 +262,14 @@ where r } }; - let _t = TimerGuard::new(&clocks, || format!("writing {} bytes", pkt.data.len())); + let _t = TimerGuard::new(&clocks, || format!("writing {} bytes", frame.data.len())); w.write( &mut self.shutdown_rx, - &pkt.data[..], + &frame.data[..], local_time, - pkt.pts, - pkt.is_key, + frame.pts, + frame.is_key, + video_sample_entry_id, )?; rotate = Some(r); } @@ -314,6 +328,10 @@ mod tests { self.inner.tool() } + fn video_sample_entry(&self) -> &db::VideoSampleEntryToInsert { + self.inner.video_sample_entry() + } + fn next(&mut self) -> Result { if self.pkts_left == 0 { bail!("end of stream"); @@ -355,7 +373,7 @@ mod tests { struct MockOpener { expected_url: url::Url, - streams: Mutex)>>, + streams: Mutex>>, shutdown_tx: Mutex>, } @@ -365,7 +383,7 @@ mod tests { _label: String, url: url::Url, _options: retina::client::SessionOptions, - ) -> Result<(db::VideoSampleEntryToInsert, Box), Error> { + ) -> Result, Error> { assert_eq!(&url, &self.expected_url); let mut l = self.streams.lock(); match l.pop() { @@ -412,8 +430,7 @@ mod tests { let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0)); clocks.sleep(time::Duration::seconds(86400)); // to 2015-04-26 00:00:00 UTC - let (extra_data, stream) = - stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap(); + let stream = stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap(); let mut stream = ProxyingStream::new(clocks.clone(), time::Duration::seconds(2), Box::new(stream)); stream.ts_offset = 123456; // starting pts of the input should be irrelevant @@ -422,7 +439,7 @@ mod tests { let (shutdown_tx, shutdown_rx) = base::shutdown::channel(); let opener = MockOpener { expected_url: url::Url::parse("rtsp://test-camera/main").unwrap(), - streams: Mutex::new(vec![(extra_data, Box::new(stream))]), + streams: Mutex::new(vec![Box::new(stream)]), shutdown_tx: Mutex::new(Some(shutdown_tx)), }; let db = testutil::TestDb::new(clocks.clone());