seamless mid-stream video parameter changes

For #217. This handles the recording logic. May still need fixes to
playback and/or live stream logic.
This commit is contained in:
Scott Lamb 2022-04-13 14:37:45 -07:00
parent 71d3f2a946
commit 3bc552b950
7 changed files with 174 additions and 101 deletions

View File

@ -9,6 +9,9 @@ Each release is tagged in Git and on the Docker repository
## unreleased ## unreleased
* upgrade to Retina 0.3.9, improving camera interop and diagnostics * upgrade to Retina 0.3.9, improving camera interop and diagnostics
* [#217](https://github.com/scottlamb/moonfire-nvr/issues/217): no longer
drop the connection to the camera when it changes video parameters, instead
continuing the run seamlessly.
## `v0.7.3` (2022-03-22) ## `v0.7.3` (2022-03-22)

View File

@ -156,7 +156,7 @@ impl VideoSampleEntry {
} }
} }
#[derive(PartialEq, Eq)] #[derive(Clone, PartialEq, Eq)]
pub struct VideoSampleEntryToInsert { pub struct VideoSampleEntryToInsert {
pub data: Vec<u8>, pub data: Vec<u8>,
pub rfc6381_codec: String, pub rfc6381_codec: String,

View File

@ -615,7 +615,6 @@ pub struct Writer<'a, C: Clocks + Clone, D: DirWriter> {
db: &'a db::Database<C>, db: &'a db::Database<C>,
channel: &'a SyncerChannel<D::File>, channel: &'a SyncerChannel<D::File>,
stream_id: i32, stream_id: i32,
video_sample_entry_id: i32,
state: WriterState<D::File>, state: WriterState<D::File>,
} }
@ -634,6 +633,7 @@ struct InnerWriter<F: FileWriter> {
r: Arc<Mutex<db::RecordingToInsert>>, r: Arc<Mutex<db::RecordingToInsert>>,
e: recording::SampleIndexEncoder, e: recording::SampleIndexEncoder,
id: CompositeId, id: CompositeId,
video_sample_entry_id: i32,
hasher: blake3::Hasher, hasher: blake3::Hasher,
@ -680,26 +680,34 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
db: &'a db::Database<C>, db: &'a db::Database<C>,
channel: &'a SyncerChannel<D::File>, channel: &'a SyncerChannel<D::File>,
stream_id: i32, stream_id: i32,
video_sample_entry_id: i32,
) -> Self { ) -> Self {
Writer { Writer {
dir, dir,
db, db,
channel, channel,
stream_id, stream_id,
video_sample_entry_id,
state: WriterState::Unopened, state: WriterState::Unopened,
} }
} }
/// Opens a new writer. /// Opens a new recording if not already open.
///
/// On successful return, `self.state` will be `WriterState::Open(w)` with `w` violating the /// On successful return, `self.state` will be `WriterState::Open(w)` with `w` violating the
/// invariant that `unindexed_sample` is `Some`. The caller (`write`) is responsible for /// invariant that `unindexed_sample` is `Some`. The caller (`write`) is responsible for
/// correcting this. /// correcting this.
fn open(&mut self, shutdown_rx: &mut base::shutdown::Receiver) -> Result<(), Error> { fn open(
&mut self,
shutdown_rx: &mut base::shutdown::Receiver,
video_sample_entry_id: i32,
) -> Result<(), Error> {
let prev = match self.state { let prev = match self.state {
WriterState::Unopened => None, WriterState::Unopened => None,
WriterState::Open(_) => return Ok(()), WriterState::Open(ref o) => {
if o.video_sample_entry_id != video_sample_entry_id {
bail!("inconsistent video_sample_entry_id");
}
return Ok(());
}
WriterState::Closed(prev) => Some(prev), WriterState::Closed(prev) => Some(prev),
}; };
let (id, r) = self.db.lock().add_recording( let (id, r) = self.db.lock().add_recording(
@ -709,7 +717,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
start: prev start: prev
.map(|p| p.end) .map(|p| p.end)
.unwrap_or(recording::Time(i64::max_value())), .unwrap_or(recording::Time(i64::max_value())),
video_sample_entry_id: self.video_sample_entry_id, video_sample_entry_id,
flags: db::RecordingFlags::Growing as i32, flags: db::RecordingFlags::Growing as i32,
..Default::default() ..Default::default()
}, },
@ -726,6 +734,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
hasher: blake3::Hasher::new(), hasher: blake3::Hasher::new(),
local_start: recording::Time(i64::max_value()), local_start: recording::Time(i64::max_value()),
unindexed_sample: None, unindexed_sample: None,
video_sample_entry_id,
}); });
Ok(()) Ok(())
} }
@ -747,8 +756,9 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
local_time: recording::Time, local_time: recording::Time,
pts_90k: i64, pts_90k: i64,
is_key: bool, is_key: bool,
video_sample_entry_id: i32,
) -> Result<(), Error> { ) -> Result<(), Error> {
self.open(shutdown_rx)?; self.open(shutdown_rx, video_sample_entry_id)?;
let w = match self.state { let w = match self.state {
WriterState::Open(ref mut w) => w, WriterState::Open(ref mut w) => w,
_ => unreachable!(), _ => unreachable!(),
@ -816,9 +826,14 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
Ok(()) Ok(())
} }
/// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's /// Cleanly closes a single recording within this writer, using a supplied
/// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait, /// pts of the next sample for the last sample's duration (if known).
/// swallowing errors and using a zero duration for the last sample. ///
/// The `Writer` may be used again, causing another recording to be created
/// within the same run.
///
/// If the `Writer` is dropped without `close`, the `Drop` trait impl will
/// close, swallowing errors and using a zero duration for the last sample.
pub fn close(&mut self, next_pts: Option<i64>, reason: Option<String>) -> Result<(), Error> { pub fn close(&mut self, next_pts: Option<i64>, reason: Option<String>) -> Result<(), Error> {
self.state = match mem::replace(&mut self.state, WriterState::Unopened) { self.state = match mem::replace(&mut self.state, WriterState::Unopened) {
WriterState::Open(w) => { WriterState::Open(w) => {
@ -1179,13 +1194,7 @@ mod tests {
rfc6381_codec: "avc1.000000".to_owned(), rfc6381_codec: "avc1.000000".to_owned(),
}) })
.unwrap(); .unwrap();
let mut w = Writer::new( let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID);
&h.dir,
&h.db,
&h.channel,
testutil::TEST_STREAM_ID,
video_sample_entry_id,
);
h.dir.expect(MockDirAction::Create( h.dir.expect(MockDirAction::Create(
CompositeId::new(1, 0), CompositeId::new(1, 0),
Box::new(|_id| Err(nix::Error::EIO)), Box::new(|_id| Err(nix::Error::EIO)),
@ -1200,8 +1209,15 @@ mod tests {
)); ));
f.expect(MockFileAction::Write(Box::new(|_| Ok(1)))); f.expect(MockFileAction::Write(Box::new(|_| Ok(1))));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(&mut h.shutdown_rx, b"1", recording::Time(1), 0, true) w.write(
.unwrap(); &mut h.shutdown_rx,
b"1",
recording::Time(1),
0,
true,
video_sample_entry_id,
)
.unwrap();
let e = w let e = w
.write( .write(

View File

@ -211,7 +211,7 @@ fn press_test_inner(
transport: retina::client::Transport, transport: retina::client::Transport,
) -> Result<String, Error> { ) -> Result<String, Error> {
let _enter = handle.enter(); let _enter = handle.enter();
let (extra_data, stream) = stream::OPENER.open( let stream = stream::OPENER.open(
"test stream".to_owned(), "test stream".to_owned(),
url, url,
retina::client::SessionOptions::default() retina::client::SessionOptions::default()
@ -222,10 +222,17 @@ fn press_test_inner(
}) })
.transport(transport), .transport(transport),
)?; )?;
let video_sample_entry = stream.video_sample_entry();
Ok(format!( Ok(format!(
"{}x{} video stream served by tool {:?}", "codec: {}\n\
extra_data.width, dimensions: {}x{}\n\
extra_data.height, pixel aspect ratio: {}x{}\n\
tool: {:?}",
&video_sample_entry.rfc6381_codec,
video_sample_entry.width,
video_sample_entry.height,
video_sample_entry.pasp_h_spacing,
video_sample_entry.pasp_v_spacing,
stream.tool(), stream.tool(),
)) ))
} }

View File

@ -2278,21 +2278,18 @@ mod tests {
} }
fn copy_mp4_to_db(db: &mut TestDb<RealClocks>) { fn copy_mp4_to_db(db: &mut TestDb<RealClocks>) {
let (extra_data, input) = let input = stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
let mut input: Box<dyn stream::Stream> = Box::new(input); let mut input: Box<dyn stream::Stream> = Box::new(input);
// 2015-04-26 00:00:00 UTC. // 2015-04-26 00:00:00 UTC.
const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC);
let video_sample_entry_id = db.db.lock().insert_video_sample_entry(extra_data).unwrap(); let video_sample_entry_id = db
.db
.lock()
.insert_video_sample_entry(input.video_sample_entry().clone())
.unwrap();
let dir = db.dirs_by_stream_id.get(&TEST_STREAM_ID).unwrap(); let dir = db.dirs_by_stream_id.get(&TEST_STREAM_ID).unwrap();
let mut output = writer::Writer::new( let mut output = writer::Writer::new(dir, &db.db, &db.syncer_channel, TEST_STREAM_ID);
dir,
&db.db,
&db.syncer_channel,
TEST_STREAM_ID,
video_sample_entry_id,
);
// end_pts is the pts of the end of the most recent frame (start + duration). // end_pts is the pts of the end of the most recent frame (start + duration).
// It's needed because dir::Writer calculates a packet's duration from its pts and the // It's needed because dir::Writer calculates a packet's duration from its pts and the
@ -2321,6 +2318,7 @@ mod tests {
frame_time, frame_time,
pkt.pts, pkt.pts,
pkt.is_key, pkt.is_key,
video_sample_entry_id,
) )
.unwrap(); .unwrap();
end_pts = Some(pkt.pts + i64::from(pkt.duration)); end_pts = Some(pkt.pts + i64::from(pkt.duration));
@ -2391,9 +2389,8 @@ mod tests {
} }
fn compare_mp4s(new_filename: &str, pts_offset: i64, shorten: i64) { fn compare_mp4s(new_filename: &str, pts_offset: i64, shorten: i64) {
let (orig_extra_data, orig) = let orig = stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap(); let new = stream::testutil::Mp4Stream::open(new_filename).unwrap();
let (new_extra_data, new) = stream::testutil::Mp4Stream::open(new_filename).unwrap();
if pts_offset > 0 { if pts_offset > 0 {
// The mp4 crate doesn't interpret the edit list. Manually inspect it. // The mp4 crate doesn't interpret the edit list. Manually inspect it.
@ -2410,7 +2407,7 @@ mod tests {
let mut orig: Box<dyn stream::Stream> = Box::new(orig); let mut orig: Box<dyn stream::Stream> = Box::new(orig);
let mut new: Box<dyn stream::Stream> = Box::new(new); let mut new: Box<dyn stream::Stream> = Box::new(new);
assert_eq!(orig_extra_data, new_extra_data); assert_eq!(orig.video_sample_entry(), new.video_sample_entry());
let mut final_durations = None; let mut final_durations = None;
for i in 0.. { for i in 0.. {
let orig_pkt = match orig.next() { let orig_pkt = match orig.next() {

View File

@ -8,7 +8,7 @@ use failure::format_err;
use failure::{bail, Error}; use failure::{bail, Error};
use futures::StreamExt; use futures::StreamExt;
use retina::client::Demuxed; use retina::client::Demuxed;
use retina::codec::{CodecItem, VideoParameters}; use retina::codec::CodecItem;
use std::pin::Pin; use std::pin::Pin;
use std::result::Result; use std::result::Result;
use url::Url; use url::Url;
@ -26,7 +26,7 @@ pub trait Opener: Send + Sync {
label: String, label: String,
url: Url, url: Url,
options: retina::client::SessionOptions, options: retina::client::SessionOptions,
) -> Result<(db::VideoSampleEntryToInsert, Box<dyn Stream>), Error>; ) -> Result<Box<dyn Stream>, Error>;
} }
pub struct VideoFrame { pub struct VideoFrame {
@ -38,10 +38,13 @@ pub struct VideoFrame {
pub is_key: bool, pub is_key: bool,
pub data: Bytes, pub data: Bytes,
pub new_video_sample_entry: bool,
} }
pub trait Stream: Send { pub trait Stream: Send {
fn tool(&self) -> Option<&retina::client::Tool>; fn tool(&self) -> Option<&retina::client::Tool>;
fn video_sample_entry(&self) -> &db::VideoSampleEntryToInsert;
fn next(&mut self) -> Result<VideoFrame, Error>; fn next(&mut self) -> Result<VideoFrame, Error>;
} }
@ -55,22 +58,20 @@ impl Opener for RealOpener {
label: String, label: String,
url: Url, url: Url,
options: retina::client::SessionOptions, options: retina::client::SessionOptions,
) -> Result<(db::VideoSampleEntryToInsert, Box<dyn Stream>), Error> { ) -> Result<Box<dyn Stream>, Error> {
let options = options.user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION"))); let options = options.user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION")));
let rt_handle = tokio::runtime::Handle::current(); let rt_handle = tokio::runtime::Handle::current();
let (inner, video_params, first_frame) = rt_handle let (inner, first_frame) = rt_handle
.block_on(rt_handle.spawn(tokio::time::timeout( .block_on(rt_handle.spawn(tokio::time::timeout(
RETINA_TIMEOUT, RETINA_TIMEOUT,
RetinaStreamInner::play(label, url, options), RetinaStreamInner::play(label, url, options),
))) )))
.expect("RetinaStream::play task panicked, see earlier error")??; .expect("RetinaStream::play task panicked, see earlier error")??;
let extra_data = h264::parse_extra_data(video_params.extra_data())?; Ok(Box::new(RetinaStream {
let stream = Box::new(RetinaStream {
inner: Some(inner), inner: Some(inner),
rt_handle, rt_handle,
first_frame: Some(first_frame), first_frame: Some(first_frame),
}); }))
Ok((extra_data, stream))
} }
} }
@ -102,6 +103,7 @@ struct RetinaStream {
struct RetinaStreamInner { struct RetinaStreamInner {
label: String, label: String,
session: Demuxed, session: Demuxed,
video_sample_entry: db::VideoSampleEntryToInsert,
} }
impl RetinaStreamInner { impl RetinaStreamInner {
@ -110,7 +112,7 @@ impl RetinaStreamInner {
label: String, label: String,
url: Url, url: Url,
options: retina::client::SessionOptions, options: retina::client::SessionOptions,
) -> Result<(Box<Self>, Box<VideoParameters>, retina::codec::VideoFrame), Error> { ) -> Result<(Box<Self>, retina::codec::VideoFrame), Error> {
let mut session = retina::client::Session::describe(url, options).await?; let mut session = retina::client::Session::describe(url, options).await?;
log::debug!("connected to {:?}, tool {:?}", &label, session.tool()); log::debug!("connected to {:?}, tool {:?}", &label, session.tool());
let (video_i, mut video_params) = session let (video_i, mut video_params) = session
@ -151,12 +153,17 @@ impl RetinaStreamInner {
Some(Ok(_)) => {} Some(Ok(_)) => {}
} }
}; };
let self_ = Box::new(Self { label, session }); let video_sample_entry = h264::parse_extra_data(
Ok(( video_params
self_, .ok_or_else(|| format_err!("couldn't find H.264 parameters"))?
video_params.ok_or_else(|| format_err!("couldn't find H.264 parameters"))?, .extra_data(),
first_frame, )?;
)) let self_ = Box::new(Self {
label,
session,
video_sample_entry,
});
Ok((self_, first_frame))
} }
/// Fetches a non-initial frame. /// Fetches a non-initial frame.
@ -167,10 +174,6 @@ impl RetinaStreamInner {
match Pin::new(&mut self.session).next().await.transpose()? { match Pin::new(&mut self.session).next().await.transpose()? {
None => bail!("end of stream"), None => bail!("end of stream"),
Some(CodecItem::VideoFrame(v)) => { Some(CodecItem::VideoFrame(v)) => {
if let Some(p) = v.new_parameters {
// TODO: we could start a new recording without dropping the connection.
bail!("parameter change: {:?}", p);
}
if v.loss > 0 { if v.loss > 0 {
log::warn!( log::warn!(
"{}: lost {} RTP packets @ {}", "{}: lost {} RTP packets @ {}",
@ -192,25 +195,48 @@ impl Stream for RetinaStream {
self.inner.as_ref().unwrap().session.tool() self.inner.as_ref().unwrap().session.tool()
} }
fn video_sample_entry(&self) -> &db::VideoSampleEntryToInsert {
&self.inner.as_ref().unwrap().video_sample_entry
}
fn next(&mut self) -> Result<VideoFrame, Error> { fn next(&mut self) -> Result<VideoFrame, Error> {
let frame = self.first_frame.take().map(Ok).unwrap_or_else(move || { let (frame, new_video_sample_entry) = self
let inner = self.inner.take().unwrap(); .first_frame
let (inner, frame) = self .take()
.rt_handle .map(|f| Ok((f, false)))
.block_on(self.rt_handle.spawn(tokio::time::timeout( .unwrap_or_else(move || {
RETINA_TIMEOUT, let inner = self.inner.take().unwrap();
inner.fetch_next_frame(), let (mut inner, mut frame) = self
))) .rt_handle
.expect("fetch_next_frame task panicked, see earlier error") .block_on(self.rt_handle.spawn(tokio::time::timeout(
.map_err(|_| format_err!("timeout getting next frame"))??; RETINA_TIMEOUT,
self.inner = Some(inner); inner.fetch_next_frame(),
Ok::<_, failure::Error>(frame) )))
})?; .expect("fetch_next_frame task panicked, see earlier error")
.map_err(|_| format_err!("timeout getting next frame"))??;
let mut new_video_sample_entry = false;
if let Some(p) = frame.new_parameters.take() {
let video_sample_entry = h264::parse_extra_data(p.extra_data())?;
if video_sample_entry != inner.video_sample_entry {
log::debug!(
"{}: parameter change:\nold: {:?}\nnew: {:?}",
&inner.label,
&inner.video_sample_entry,
&video_sample_entry
);
inner.video_sample_entry = video_sample_entry;
new_video_sample_entry = true;
}
};
self.inner = Some(inner);
Ok::<_, failure::Error>((frame, new_video_sample_entry))
})?;
Ok(VideoFrame { Ok(VideoFrame {
pts: frame.timestamp.elapsed(), pts: frame.timestamp.elapsed(),
duration: 0, duration: 0,
is_key: frame.is_random_access_point, is_key: frame.is_random_access_point,
data: frame.into_data(), data: frame.into_data(),
new_video_sample_entry,
}) })
} }
} }
@ -225,11 +251,12 @@ pub mod testutil {
reader: mp4::Mp4Reader<Cursor<Vec<u8>>>, reader: mp4::Mp4Reader<Cursor<Vec<u8>>>,
h264_track_id: u32, h264_track_id: u32,
next_sample_id: u32, next_sample_id: u32,
video_sample_entry: db::VideoSampleEntryToInsert,
} }
impl Mp4Stream { impl Mp4Stream {
/// Opens a stream, with a return matching that expected by [`Opener`]. /// Opens a stream, with a return matching that expected by [`Opener`].
pub fn open(path: &str) -> Result<(db::VideoSampleEntryToInsert, Self), Error> { pub fn open(path: &str) -> Result<Self, Error> {
let f = std::fs::read(path)?; let f = std::fs::read(path)?;
let len = f.len(); let len = f.len();
let reader = mp4::Mp4Reader::read_header(Cursor::new(f), u64::try_from(len)?)?; let reader = mp4::Mp4Reader::read_header(Cursor::new(f), u64::try_from(len)?)?;
@ -241,14 +268,15 @@ pub mod testutil {
None => bail!("expected a H.264 track"), None => bail!("expected a H.264 track"),
Some(t) => t, Some(t) => t,
}; };
let extra_data = h264::parse_extra_data(&h264_track.extra_data()?[..])?; let video_sample_entry = h264::parse_extra_data(&h264_track.extra_data()?[..])?;
let h264_track_id = h264_track.track_id(); let h264_track_id = h264_track.track_id();
let stream = Mp4Stream { let stream = Mp4Stream {
reader, reader,
h264_track_id, h264_track_id,
next_sample_id: 1, next_sample_id: 1,
video_sample_entry,
}; };
Ok((extra_data, stream)) Ok(stream)
} }
pub fn duration(&self) -> u64 { pub fn duration(&self) -> u64 {
@ -282,7 +310,12 @@ pub mod testutil {
duration: sample.duration as i32, duration: sample.duration as i32,
is_key: sample.is_sync, is_key: sample.is_sync,
data: sample.bytes, data: sample.bytes,
new_video_sample_entry: false,
}) })
} }
fn video_sample_entry(&self) -> &db::VideoSampleEntryToInsert {
&self.video_sample_entry
}
} }
} }

View File

@ -159,7 +159,7 @@ where
} }
} }
let (extra_data, mut stream) = { let mut stream = {
let _t = TimerGuard::new(&clocks, || format!("opening {}", self.url.as_str())); let _t = TimerGuard::new(&clocks, || format!("opening {}", self.url.as_str()));
self.opener.open( self.opener.open(
self.short_name.clone(), self.short_name.clone(),
@ -178,34 +178,33 @@ where
)? )?
}; };
let realtime_offset = self.db.clocks().realtime() - clocks.monotonic(); let realtime_offset = self.db.clocks().realtime() - clocks.monotonic();
let video_sample_entry_id = { let mut video_sample_entry_id = {
let _t = TimerGuard::new(&clocks, || "inserting video sample entry"); let _t = TimerGuard::new(&clocks, || "inserting video sample entry");
self.db.lock().insert_video_sample_entry(extra_data)? self.db
.lock()
.insert_video_sample_entry(stream.video_sample_entry().clone())?
}; };
let mut seen_key_frame = false; let mut seen_key_frame = false;
// Seconds since epoch at which to next rotate. // Seconds since epoch at which to next rotate. See comment at start
// of while loop.
let mut rotate: Option<i64> = None; let mut rotate: Option<i64> = None;
let mut w = writer::Writer::new( let mut w = writer::Writer::new(&self.dir, &self.db, &self.syncer_channel, self.stream_id);
&self.dir,
&self.db,
&self.syncer_channel,
self.stream_id,
video_sample_entry_id,
);
while self.shutdown_rx.check().is_ok() { while self.shutdown_rx.check().is_ok() {
let pkt = { // `rotate` should now be set iff `w` has an open recording.
let frame = {
let _t = TimerGuard::new(&clocks, || "getting next packet"); let _t = TimerGuard::new(&clocks, || "getting next packet");
stream.next() stream.next()
}; };
let pkt = match pkt { let frame = match frame {
Ok(p) => p, Ok(f) => f,
Err(e) => { Err(e) => {
let _ = w.close(None, Some(e.to_string())); let _ = w.close(None, Some(e.to_string()));
return Err(e); return Err(e);
} }
}; };
if !seen_key_frame && !pkt.is_key { if !seen_key_frame && !frame.is_key {
continue; continue;
} else if !seen_key_frame { } else if !seen_key_frame {
debug!("{}: have first key frame", self.short_name); debug!("{}: have first key frame", self.short_name);
@ -214,10 +213,24 @@ where
let frame_realtime = clocks.monotonic() + realtime_offset; let frame_realtime = clocks.monotonic() + realtime_offset;
let local_time = recording::Time::new(frame_realtime); let local_time = recording::Time::new(frame_realtime);
rotate = if let Some(r) = rotate { rotate = if let Some(r) = rotate {
if frame_realtime.sec > r && pkt.is_key { if frame_realtime.sec > r && frame.is_key {
trace!("{}: write on normal rotation", self.short_name); trace!("{}: close on normal rotation", self.short_name);
let _t = TimerGuard::new(&clocks, || "closing writer"); let _t = TimerGuard::new(&clocks, || "closing writer");
w.close(Some(pkt.pts), None)?; w.close(Some(frame.pts), None)?;
None
} else if frame.new_video_sample_entry {
if !frame.is_key {
bail!("parameter change on non-key frame");
}
trace!("{}: close on parameter change", self.short_name);
video_sample_entry_id = {
let _t = TimerGuard::new(&clocks, || "inserting video sample entry");
self.db
.lock()
.insert_video_sample_entry(stream.video_sample_entry().clone())?
};
let _t = TimerGuard::new(&clocks, || "closing writer");
w.close(Some(frame.pts), None)?;
None None
} else { } else {
Some(r) Some(r)
@ -249,13 +262,14 @@ where
r r
} }
}; };
let _t = TimerGuard::new(&clocks, || format!("writing {} bytes", pkt.data.len())); let _t = TimerGuard::new(&clocks, || format!("writing {} bytes", frame.data.len()));
w.write( w.write(
&mut self.shutdown_rx, &mut self.shutdown_rx,
&pkt.data[..], &frame.data[..],
local_time, local_time,
pkt.pts, frame.pts,
pkt.is_key, frame.is_key,
video_sample_entry_id,
)?; )?;
rotate = Some(r); rotate = Some(r);
} }
@ -314,6 +328,10 @@ mod tests {
self.inner.tool() self.inner.tool()
} }
fn video_sample_entry(&self) -> &db::VideoSampleEntryToInsert {
self.inner.video_sample_entry()
}
fn next(&mut self) -> Result<stream::VideoFrame, Error> { fn next(&mut self) -> Result<stream::VideoFrame, Error> {
if self.pkts_left == 0 { if self.pkts_left == 0 {
bail!("end of stream"); bail!("end of stream");
@ -355,7 +373,7 @@ mod tests {
struct MockOpener { struct MockOpener {
expected_url: url::Url, expected_url: url::Url,
streams: Mutex<Vec<(db::VideoSampleEntryToInsert, Box<dyn stream::Stream>)>>, streams: Mutex<Vec<Box<dyn stream::Stream>>>,
shutdown_tx: Mutex<Option<base::shutdown::Sender>>, shutdown_tx: Mutex<Option<base::shutdown::Sender>>,
} }
@ -365,7 +383,7 @@ mod tests {
_label: String, _label: String,
url: url::Url, url: url::Url,
_options: retina::client::SessionOptions, _options: retina::client::SessionOptions,
) -> Result<(db::VideoSampleEntryToInsert, Box<dyn stream::Stream>), Error> { ) -> Result<Box<dyn stream::Stream>, Error> {
assert_eq!(&url, &self.expected_url); assert_eq!(&url, &self.expected_url);
let mut l = self.streams.lock(); let mut l = self.streams.lock();
match l.pop() { match l.pop() {
@ -412,8 +430,7 @@ mod tests {
let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0)); let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0));
clocks.sleep(time::Duration::seconds(86400)); // to 2015-04-26 00:00:00 UTC clocks.sleep(time::Duration::seconds(86400)); // to 2015-04-26 00:00:00 UTC
let (extra_data, stream) = let stream = stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
let mut stream = let mut stream =
ProxyingStream::new(clocks.clone(), time::Duration::seconds(2), Box::new(stream)); ProxyingStream::new(clocks.clone(), time::Duration::seconds(2), Box::new(stream));
stream.ts_offset = 123456; // starting pts of the input should be irrelevant stream.ts_offset = 123456; // starting pts of the input should be irrelevant
@ -422,7 +439,7 @@ mod tests {
let (shutdown_tx, shutdown_rx) = base::shutdown::channel(); let (shutdown_tx, shutdown_rx) = base::shutdown::channel();
let opener = MockOpener { let opener = MockOpener {
expected_url: url::Url::parse("rtsp://test-camera/main").unwrap(), expected_url: url::Url::parse("rtsp://test-camera/main").unwrap(),
streams: Mutex::new(vec![(extra_data, Box::new(stream))]), streams: Mutex::new(vec![Box::new(stream)]),
shutdown_tx: Mutex::new(Some(shutdown_tx)), shutdown_tx: Mutex::new(Some(shutdown_tx)),
}; };
let db = testutil::TestDb::new(clocks.clone()); let db = testutil::TestDb::new(clocks.clone());