seamless mid-stream video parameter changes

For #217. This handles the recording logic. May still need fixes to
playback and/or live stream logic.
This commit is contained in:
Scott Lamb 2022-04-13 14:37:45 -07:00
parent 71d3f2a946
commit 3bc552b950
7 changed files with 174 additions and 101 deletions

View File

@ -9,6 +9,9 @@ Each release is tagged in Git and on the Docker repository
## unreleased
* upgrade to Retina 0.3.9, improving camera interop and diagnostics
* [#217](https://github.com/scottlamb/moonfire-nvr/issues/217): no longer
drop the connection to the camera when it changes video parameters, instead
continuing the run seamlessly.
## `v0.7.3` (2022-03-22)

View File

@ -156,7 +156,7 @@ impl VideoSampleEntry {
}
}
#[derive(PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq)]
pub struct VideoSampleEntryToInsert {
pub data: Vec<u8>,
pub rfc6381_codec: String,

View File

@ -615,7 +615,6 @@ pub struct Writer<'a, C: Clocks + Clone, D: DirWriter> {
db: &'a db::Database<C>,
channel: &'a SyncerChannel<D::File>,
stream_id: i32,
video_sample_entry_id: i32,
state: WriterState<D::File>,
}
@ -634,6 +633,7 @@ struct InnerWriter<F: FileWriter> {
r: Arc<Mutex<db::RecordingToInsert>>,
e: recording::SampleIndexEncoder,
id: CompositeId,
video_sample_entry_id: i32,
hasher: blake3::Hasher,
@ -680,26 +680,34 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
db: &'a db::Database<C>,
channel: &'a SyncerChannel<D::File>,
stream_id: i32,
video_sample_entry_id: i32,
) -> Self {
Writer {
dir,
db,
channel,
stream_id,
video_sample_entry_id,
state: WriterState::Unopened,
}
}
/// Opens a new writer.
/// Opens a new recording if not already open.
///
/// On successful return, `self.state` will be `WriterState::Open(w)` with `w` violating the
/// invariant that `unindexed_sample` is `Some`. The caller (`write`) is responsible for
/// correcting this.
fn open(&mut self, shutdown_rx: &mut base::shutdown::Receiver) -> Result<(), Error> {
fn open(
&mut self,
shutdown_rx: &mut base::shutdown::Receiver,
video_sample_entry_id: i32,
) -> Result<(), Error> {
let prev = match self.state {
WriterState::Unopened => None,
WriterState::Open(_) => return Ok(()),
WriterState::Open(ref o) => {
if o.video_sample_entry_id != video_sample_entry_id {
bail!("inconsistent video_sample_entry_id");
}
return Ok(());
}
WriterState::Closed(prev) => Some(prev),
};
let (id, r) = self.db.lock().add_recording(
@ -709,7 +717,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
start: prev
.map(|p| p.end)
.unwrap_or(recording::Time(i64::max_value())),
video_sample_entry_id: self.video_sample_entry_id,
video_sample_entry_id,
flags: db::RecordingFlags::Growing as i32,
..Default::default()
},
@ -726,6 +734,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
hasher: blake3::Hasher::new(),
local_start: recording::Time(i64::max_value()),
unindexed_sample: None,
video_sample_entry_id,
});
Ok(())
}
@ -747,8 +756,9 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
local_time: recording::Time,
pts_90k: i64,
is_key: bool,
video_sample_entry_id: i32,
) -> Result<(), Error> {
self.open(shutdown_rx)?;
self.open(shutdown_rx, video_sample_entry_id)?;
let w = match self.state {
WriterState::Open(ref mut w) => w,
_ => unreachable!(),
@ -816,9 +826,14 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
Ok(())
}
/// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's
/// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait,
/// swallowing errors and using a zero duration for the last sample.
/// Cleanly closes a single recording within this writer, using a supplied
/// pts of the next sample for the last sample's duration (if known).
///
/// The `Writer` may be used again, causing another recording to be created
/// within the same run.
///
/// If the `Writer` is dropped without `close`, the `Drop` trait impl will
/// close, swallowing errors and using a zero duration for the last sample.
pub fn close(&mut self, next_pts: Option<i64>, reason: Option<String>) -> Result<(), Error> {
self.state = match mem::replace(&mut self.state, WriterState::Unopened) {
WriterState::Open(w) => {
@ -1179,13 +1194,7 @@ mod tests {
rfc6381_codec: "avc1.000000".to_owned(),
})
.unwrap();
let mut w = Writer::new(
&h.dir,
&h.db,
&h.channel,
testutil::TEST_STREAM_ID,
video_sample_entry_id,
);
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID);
h.dir.expect(MockDirAction::Create(
CompositeId::new(1, 0),
Box::new(|_id| Err(nix::Error::EIO)),
@ -1200,8 +1209,15 @@ mod tests {
));
f.expect(MockFileAction::Write(Box::new(|_| Ok(1))));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(&mut h.shutdown_rx, b"1", recording::Time(1), 0, true)
.unwrap();
w.write(
&mut h.shutdown_rx,
b"1",
recording::Time(1),
0,
true,
video_sample_entry_id,
)
.unwrap();
let e = w
.write(

View File

@ -211,7 +211,7 @@ fn press_test_inner(
transport: retina::client::Transport,
) -> Result<String, Error> {
let _enter = handle.enter();
let (extra_data, stream) = stream::OPENER.open(
let stream = stream::OPENER.open(
"test stream".to_owned(),
url,
retina::client::SessionOptions::default()
@ -222,10 +222,17 @@ fn press_test_inner(
})
.transport(transport),
)?;
let video_sample_entry = stream.video_sample_entry();
Ok(format!(
"{}x{} video stream served by tool {:?}",
extra_data.width,
extra_data.height,
"codec: {}\n\
dimensions: {}x{}\n\
pixel aspect ratio: {}x{}\n\
tool: {:?}",
&video_sample_entry.rfc6381_codec,
video_sample_entry.width,
video_sample_entry.height,
video_sample_entry.pasp_h_spacing,
video_sample_entry.pasp_v_spacing,
stream.tool(),
))
}

View File

@ -2278,21 +2278,18 @@ mod tests {
}
fn copy_mp4_to_db(db: &mut TestDb<RealClocks>) {
let (extra_data, input) =
stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
let input = stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
let mut input: Box<dyn stream::Stream> = Box::new(input);
// 2015-04-26 00:00:00 UTC.
const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC);
let video_sample_entry_id = db.db.lock().insert_video_sample_entry(extra_data).unwrap();
let video_sample_entry_id = db
.db
.lock()
.insert_video_sample_entry(input.video_sample_entry().clone())
.unwrap();
let dir = db.dirs_by_stream_id.get(&TEST_STREAM_ID).unwrap();
let mut output = writer::Writer::new(
dir,
&db.db,
&db.syncer_channel,
TEST_STREAM_ID,
video_sample_entry_id,
);
let mut output = writer::Writer::new(dir, &db.db, &db.syncer_channel, TEST_STREAM_ID);
// end_pts is the pts of the end of the most recent frame (start + duration).
// It's needed because dir::Writer calculates a packet's duration from its pts and the
@ -2321,6 +2318,7 @@ mod tests {
frame_time,
pkt.pts,
pkt.is_key,
video_sample_entry_id,
)
.unwrap();
end_pts = Some(pkt.pts + i64::from(pkt.duration));
@ -2391,9 +2389,8 @@ mod tests {
}
fn compare_mp4s(new_filename: &str, pts_offset: i64, shorten: i64) {
let (orig_extra_data, orig) =
stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
let (new_extra_data, new) = stream::testutil::Mp4Stream::open(new_filename).unwrap();
let orig = stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
let new = stream::testutil::Mp4Stream::open(new_filename).unwrap();
if pts_offset > 0 {
// The mp4 crate doesn't interpret the edit list. Manually inspect it.
@ -2410,7 +2407,7 @@ mod tests {
let mut orig: Box<dyn stream::Stream> = Box::new(orig);
let mut new: Box<dyn stream::Stream> = Box::new(new);
assert_eq!(orig_extra_data, new_extra_data);
assert_eq!(orig.video_sample_entry(), new.video_sample_entry());
let mut final_durations = None;
for i in 0.. {
let orig_pkt = match orig.next() {

View File

@ -8,7 +8,7 @@ use failure::format_err;
use failure::{bail, Error};
use futures::StreamExt;
use retina::client::Demuxed;
use retina::codec::{CodecItem, VideoParameters};
use retina::codec::CodecItem;
use std::pin::Pin;
use std::result::Result;
use url::Url;
@ -26,7 +26,7 @@ pub trait Opener: Send + Sync {
label: String,
url: Url,
options: retina::client::SessionOptions,
) -> Result<(db::VideoSampleEntryToInsert, Box<dyn Stream>), Error>;
) -> Result<Box<dyn Stream>, Error>;
}
pub struct VideoFrame {
@ -38,10 +38,13 @@ pub struct VideoFrame {
pub is_key: bool,
pub data: Bytes,
pub new_video_sample_entry: bool,
}
pub trait Stream: Send {
fn tool(&self) -> Option<&retina::client::Tool>;
fn video_sample_entry(&self) -> &db::VideoSampleEntryToInsert;
fn next(&mut self) -> Result<VideoFrame, Error>;
}
@ -55,22 +58,20 @@ impl Opener for RealOpener {
label: String,
url: Url,
options: retina::client::SessionOptions,
) -> Result<(db::VideoSampleEntryToInsert, Box<dyn Stream>), Error> {
) -> Result<Box<dyn Stream>, Error> {
let options = options.user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION")));
let rt_handle = tokio::runtime::Handle::current();
let (inner, video_params, first_frame) = rt_handle
let (inner, first_frame) = rt_handle
.block_on(rt_handle.spawn(tokio::time::timeout(
RETINA_TIMEOUT,
RetinaStreamInner::play(label, url, options),
)))
.expect("RetinaStream::play task panicked, see earlier error")??;
let extra_data = h264::parse_extra_data(video_params.extra_data())?;
let stream = Box::new(RetinaStream {
Ok(Box::new(RetinaStream {
inner: Some(inner),
rt_handle,
first_frame: Some(first_frame),
});
Ok((extra_data, stream))
}))
}
}
@ -102,6 +103,7 @@ struct RetinaStream {
struct RetinaStreamInner {
label: String,
session: Demuxed,
video_sample_entry: db::VideoSampleEntryToInsert,
}
impl RetinaStreamInner {
@ -110,7 +112,7 @@ impl RetinaStreamInner {
label: String,
url: Url,
options: retina::client::SessionOptions,
) -> Result<(Box<Self>, Box<VideoParameters>, retina::codec::VideoFrame), Error> {
) -> Result<(Box<Self>, retina::codec::VideoFrame), Error> {
let mut session = retina::client::Session::describe(url, options).await?;
log::debug!("connected to {:?}, tool {:?}", &label, session.tool());
let (video_i, mut video_params) = session
@ -151,12 +153,17 @@ impl RetinaStreamInner {
Some(Ok(_)) => {}
}
};
let self_ = Box::new(Self { label, session });
Ok((
self_,
video_params.ok_or_else(|| format_err!("couldn't find H.264 parameters"))?,
first_frame,
))
let video_sample_entry = h264::parse_extra_data(
video_params
.ok_or_else(|| format_err!("couldn't find H.264 parameters"))?
.extra_data(),
)?;
let self_ = Box::new(Self {
label,
session,
video_sample_entry,
});
Ok((self_, first_frame))
}
/// Fetches a non-initial frame.
@ -167,10 +174,6 @@ impl RetinaStreamInner {
match Pin::new(&mut self.session).next().await.transpose()? {
None => bail!("end of stream"),
Some(CodecItem::VideoFrame(v)) => {
if let Some(p) = v.new_parameters {
// TODO: we could start a new recording without dropping the connection.
bail!("parameter change: {:?}", p);
}
if v.loss > 0 {
log::warn!(
"{}: lost {} RTP packets @ {}",
@ -192,25 +195,48 @@ impl Stream for RetinaStream {
self.inner.as_ref().unwrap().session.tool()
}
fn video_sample_entry(&self) -> &db::VideoSampleEntryToInsert {
&self.inner.as_ref().unwrap().video_sample_entry
}
fn next(&mut self) -> Result<VideoFrame, Error> {
let frame = self.first_frame.take().map(Ok).unwrap_or_else(move || {
let inner = self.inner.take().unwrap();
let (inner, frame) = self
.rt_handle
.block_on(self.rt_handle.spawn(tokio::time::timeout(
RETINA_TIMEOUT,
inner.fetch_next_frame(),
)))
.expect("fetch_next_frame task panicked, see earlier error")
.map_err(|_| format_err!("timeout getting next frame"))??;
self.inner = Some(inner);
Ok::<_, failure::Error>(frame)
})?;
let (frame, new_video_sample_entry) = self
.first_frame
.take()
.map(|f| Ok((f, false)))
.unwrap_or_else(move || {
let inner = self.inner.take().unwrap();
let (mut inner, mut frame) = self
.rt_handle
.block_on(self.rt_handle.spawn(tokio::time::timeout(
RETINA_TIMEOUT,
inner.fetch_next_frame(),
)))
.expect("fetch_next_frame task panicked, see earlier error")
.map_err(|_| format_err!("timeout getting next frame"))??;
let mut new_video_sample_entry = false;
if let Some(p) = frame.new_parameters.take() {
let video_sample_entry = h264::parse_extra_data(p.extra_data())?;
if video_sample_entry != inner.video_sample_entry {
log::debug!(
"{}: parameter change:\nold: {:?}\nnew: {:?}",
&inner.label,
&inner.video_sample_entry,
&video_sample_entry
);
inner.video_sample_entry = video_sample_entry;
new_video_sample_entry = true;
}
};
self.inner = Some(inner);
Ok::<_, failure::Error>((frame, new_video_sample_entry))
})?;
Ok(VideoFrame {
pts: frame.timestamp.elapsed(),
duration: 0,
is_key: frame.is_random_access_point,
data: frame.into_data(),
new_video_sample_entry,
})
}
}
@ -225,11 +251,12 @@ pub mod testutil {
reader: mp4::Mp4Reader<Cursor<Vec<u8>>>,
h264_track_id: u32,
next_sample_id: u32,
video_sample_entry: db::VideoSampleEntryToInsert,
}
impl Mp4Stream {
/// Opens a stream, with a return matching that expected by [`Opener`].
pub fn open(path: &str) -> Result<(db::VideoSampleEntryToInsert, Self), Error> {
pub fn open(path: &str) -> Result<Self, Error> {
let f = std::fs::read(path)?;
let len = f.len();
let reader = mp4::Mp4Reader::read_header(Cursor::new(f), u64::try_from(len)?)?;
@ -241,14 +268,15 @@ pub mod testutil {
None => bail!("expected a H.264 track"),
Some(t) => t,
};
let extra_data = h264::parse_extra_data(&h264_track.extra_data()?[..])?;
let video_sample_entry = h264::parse_extra_data(&h264_track.extra_data()?[..])?;
let h264_track_id = h264_track.track_id();
let stream = Mp4Stream {
reader,
h264_track_id,
next_sample_id: 1,
video_sample_entry,
};
Ok((extra_data, stream))
Ok(stream)
}
pub fn duration(&self) -> u64 {
@ -282,7 +310,12 @@ pub mod testutil {
duration: sample.duration as i32,
is_key: sample.is_sync,
data: sample.bytes,
new_video_sample_entry: false,
})
}
fn video_sample_entry(&self) -> &db::VideoSampleEntryToInsert {
&self.video_sample_entry
}
}
}

View File

@ -159,7 +159,7 @@ where
}
}
let (extra_data, mut stream) = {
let mut stream = {
let _t = TimerGuard::new(&clocks, || format!("opening {}", self.url.as_str()));
self.opener.open(
self.short_name.clone(),
@ -178,34 +178,33 @@ where
)?
};
let realtime_offset = self.db.clocks().realtime() - clocks.monotonic();
let video_sample_entry_id = {
let mut video_sample_entry_id = {
let _t = TimerGuard::new(&clocks, || "inserting video sample entry");
self.db.lock().insert_video_sample_entry(extra_data)?
self.db
.lock()
.insert_video_sample_entry(stream.video_sample_entry().clone())?
};
let mut seen_key_frame = false;
// Seconds since epoch at which to next rotate.
// Seconds since epoch at which to next rotate. See comment at start
// of while loop.
let mut rotate: Option<i64> = None;
let mut w = writer::Writer::new(
&self.dir,
&self.db,
&self.syncer_channel,
self.stream_id,
video_sample_entry_id,
);
let mut w = writer::Writer::new(&self.dir, &self.db, &self.syncer_channel, self.stream_id);
while self.shutdown_rx.check().is_ok() {
let pkt = {
// `rotate` should now be set iff `w` has an open recording.
let frame = {
let _t = TimerGuard::new(&clocks, || "getting next packet");
stream.next()
};
let pkt = match pkt {
Ok(p) => p,
let frame = match frame {
Ok(f) => f,
Err(e) => {
let _ = w.close(None, Some(e.to_string()));
return Err(e);
}
};
if !seen_key_frame && !pkt.is_key {
if !seen_key_frame && !frame.is_key {
continue;
} else if !seen_key_frame {
debug!("{}: have first key frame", self.short_name);
@ -214,10 +213,24 @@ where
let frame_realtime = clocks.monotonic() + realtime_offset;
let local_time = recording::Time::new(frame_realtime);
rotate = if let Some(r) = rotate {
if frame_realtime.sec > r && pkt.is_key {
trace!("{}: write on normal rotation", self.short_name);
if frame_realtime.sec > r && frame.is_key {
trace!("{}: close on normal rotation", self.short_name);
let _t = TimerGuard::new(&clocks, || "closing writer");
w.close(Some(pkt.pts), None)?;
w.close(Some(frame.pts), None)?;
None
} else if frame.new_video_sample_entry {
if !frame.is_key {
bail!("parameter change on non-key frame");
}
trace!("{}: close on parameter change", self.short_name);
video_sample_entry_id = {
let _t = TimerGuard::new(&clocks, || "inserting video sample entry");
self.db
.lock()
.insert_video_sample_entry(stream.video_sample_entry().clone())?
};
let _t = TimerGuard::new(&clocks, || "closing writer");
w.close(Some(frame.pts), None)?;
None
} else {
Some(r)
@ -249,13 +262,14 @@ where
r
}
};
let _t = TimerGuard::new(&clocks, || format!("writing {} bytes", pkt.data.len()));
let _t = TimerGuard::new(&clocks, || format!("writing {} bytes", frame.data.len()));
w.write(
&mut self.shutdown_rx,
&pkt.data[..],
&frame.data[..],
local_time,
pkt.pts,
pkt.is_key,
frame.pts,
frame.is_key,
video_sample_entry_id,
)?;
rotate = Some(r);
}
@ -314,6 +328,10 @@ mod tests {
self.inner.tool()
}
fn video_sample_entry(&self) -> &db::VideoSampleEntryToInsert {
self.inner.video_sample_entry()
}
fn next(&mut self) -> Result<stream::VideoFrame, Error> {
if self.pkts_left == 0 {
bail!("end of stream");
@ -355,7 +373,7 @@ mod tests {
struct MockOpener {
expected_url: url::Url,
streams: Mutex<Vec<(db::VideoSampleEntryToInsert, Box<dyn stream::Stream>)>>,
streams: Mutex<Vec<Box<dyn stream::Stream>>>,
shutdown_tx: Mutex<Option<base::shutdown::Sender>>,
}
@ -365,7 +383,7 @@ mod tests {
_label: String,
url: url::Url,
_options: retina::client::SessionOptions,
) -> Result<(db::VideoSampleEntryToInsert, Box<dyn stream::Stream>), Error> {
) -> Result<Box<dyn stream::Stream>, Error> {
assert_eq!(&url, &self.expected_url);
let mut l = self.streams.lock();
match l.pop() {
@ -412,8 +430,7 @@ mod tests {
let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0));
clocks.sleep(time::Duration::seconds(86400)); // to 2015-04-26 00:00:00 UTC
let (extra_data, stream) =
stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
let stream = stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
let mut stream =
ProxyingStream::new(clocks.clone(), time::Duration::seconds(2), Box::new(stream));
stream.ts_offset = 123456; // starting pts of the input should be irrelevant
@ -422,7 +439,7 @@ mod tests {
let (shutdown_tx, shutdown_rx) = base::shutdown::channel();
let opener = MockOpener {
expected_url: url::Url::parse("rtsp://test-camera/main").unwrap(),
streams: Mutex::new(vec![(extra_data, Box::new(stream))]),
streams: Mutex::new(vec![Box::new(stream)]),
shutdown_tx: Mutex::new(Some(shutdown_tx)),
};
let db = testutil::TestDb::new(clocks.clone());