mirror of
https://github.com/scottlamb/moonfire-nvr.git
synced 2025-03-13 05:00:13 -04:00
seamless mid-stream video parameter changes
For #217. This handles the recording logic. May still need fixes to playback and/or live stream logic.
This commit is contained in:
parent
71d3f2a946
commit
3bc552b950
@ -9,6 +9,9 @@ Each release is tagged in Git and on the Docker repository
|
|||||||
## unreleased
|
## unreleased
|
||||||
|
|
||||||
* upgrade to Retina 0.3.9, improving camera interop and diagnostics
|
* upgrade to Retina 0.3.9, improving camera interop and diagnostics
|
||||||
|
* [#217](https://github.com/scottlamb/moonfire-nvr/issues/217): no longer
|
||||||
|
drop the connection to the camera when it changes video parameters, instead
|
||||||
|
continuing the run seamlessly.
|
||||||
|
|
||||||
## `v0.7.3` (2022-03-22)
|
## `v0.7.3` (2022-03-22)
|
||||||
|
|
||||||
|
@ -156,7 +156,7 @@ impl VideoSampleEntry {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Eq)]
|
#[derive(Clone, PartialEq, Eq)]
|
||||||
pub struct VideoSampleEntryToInsert {
|
pub struct VideoSampleEntryToInsert {
|
||||||
pub data: Vec<u8>,
|
pub data: Vec<u8>,
|
||||||
pub rfc6381_codec: String,
|
pub rfc6381_codec: String,
|
||||||
|
@ -615,7 +615,6 @@ pub struct Writer<'a, C: Clocks + Clone, D: DirWriter> {
|
|||||||
db: &'a db::Database<C>,
|
db: &'a db::Database<C>,
|
||||||
channel: &'a SyncerChannel<D::File>,
|
channel: &'a SyncerChannel<D::File>,
|
||||||
stream_id: i32,
|
stream_id: i32,
|
||||||
video_sample_entry_id: i32,
|
|
||||||
state: WriterState<D::File>,
|
state: WriterState<D::File>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -634,6 +633,7 @@ struct InnerWriter<F: FileWriter> {
|
|||||||
r: Arc<Mutex<db::RecordingToInsert>>,
|
r: Arc<Mutex<db::RecordingToInsert>>,
|
||||||
e: recording::SampleIndexEncoder,
|
e: recording::SampleIndexEncoder,
|
||||||
id: CompositeId,
|
id: CompositeId,
|
||||||
|
video_sample_entry_id: i32,
|
||||||
|
|
||||||
hasher: blake3::Hasher,
|
hasher: blake3::Hasher,
|
||||||
|
|
||||||
@ -680,26 +680,34 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
|
|||||||
db: &'a db::Database<C>,
|
db: &'a db::Database<C>,
|
||||||
channel: &'a SyncerChannel<D::File>,
|
channel: &'a SyncerChannel<D::File>,
|
||||||
stream_id: i32,
|
stream_id: i32,
|
||||||
video_sample_entry_id: i32,
|
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Writer {
|
Writer {
|
||||||
dir,
|
dir,
|
||||||
db,
|
db,
|
||||||
channel,
|
channel,
|
||||||
stream_id,
|
stream_id,
|
||||||
video_sample_entry_id,
|
|
||||||
state: WriterState::Unopened,
|
state: WriterState::Unopened,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Opens a new writer.
|
/// Opens a new recording if not already open.
|
||||||
|
///
|
||||||
/// On successful return, `self.state` will be `WriterState::Open(w)` with `w` violating the
|
/// On successful return, `self.state` will be `WriterState::Open(w)` with `w` violating the
|
||||||
/// invariant that `unindexed_sample` is `Some`. The caller (`write`) is responsible for
|
/// invariant that `unindexed_sample` is `Some`. The caller (`write`) is responsible for
|
||||||
/// correcting this.
|
/// correcting this.
|
||||||
fn open(&mut self, shutdown_rx: &mut base::shutdown::Receiver) -> Result<(), Error> {
|
fn open(
|
||||||
|
&mut self,
|
||||||
|
shutdown_rx: &mut base::shutdown::Receiver,
|
||||||
|
video_sample_entry_id: i32,
|
||||||
|
) -> Result<(), Error> {
|
||||||
let prev = match self.state {
|
let prev = match self.state {
|
||||||
WriterState::Unopened => None,
|
WriterState::Unopened => None,
|
||||||
WriterState::Open(_) => return Ok(()),
|
WriterState::Open(ref o) => {
|
||||||
|
if o.video_sample_entry_id != video_sample_entry_id {
|
||||||
|
bail!("inconsistent video_sample_entry_id");
|
||||||
|
}
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
WriterState::Closed(prev) => Some(prev),
|
WriterState::Closed(prev) => Some(prev),
|
||||||
};
|
};
|
||||||
let (id, r) = self.db.lock().add_recording(
|
let (id, r) = self.db.lock().add_recording(
|
||||||
@ -709,7 +717,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
|
|||||||
start: prev
|
start: prev
|
||||||
.map(|p| p.end)
|
.map(|p| p.end)
|
||||||
.unwrap_or(recording::Time(i64::max_value())),
|
.unwrap_or(recording::Time(i64::max_value())),
|
||||||
video_sample_entry_id: self.video_sample_entry_id,
|
video_sample_entry_id,
|
||||||
flags: db::RecordingFlags::Growing as i32,
|
flags: db::RecordingFlags::Growing as i32,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
},
|
||||||
@ -726,6 +734,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
|
|||||||
hasher: blake3::Hasher::new(),
|
hasher: blake3::Hasher::new(),
|
||||||
local_start: recording::Time(i64::max_value()),
|
local_start: recording::Time(i64::max_value()),
|
||||||
unindexed_sample: None,
|
unindexed_sample: None,
|
||||||
|
video_sample_entry_id,
|
||||||
});
|
});
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -747,8 +756,9 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
|
|||||||
local_time: recording::Time,
|
local_time: recording::Time,
|
||||||
pts_90k: i64,
|
pts_90k: i64,
|
||||||
is_key: bool,
|
is_key: bool,
|
||||||
|
video_sample_entry_id: i32,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
self.open(shutdown_rx)?;
|
self.open(shutdown_rx, video_sample_entry_id)?;
|
||||||
let w = match self.state {
|
let w = match self.state {
|
||||||
WriterState::Open(ref mut w) => w,
|
WriterState::Open(ref mut w) => w,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
@ -816,9 +826,14 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's
|
/// Cleanly closes a single recording within this writer, using a supplied
|
||||||
/// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait,
|
/// pts of the next sample for the last sample's duration (if known).
|
||||||
/// swallowing errors and using a zero duration for the last sample.
|
///
|
||||||
|
/// The `Writer` may be used again, causing another recording to be created
|
||||||
|
/// within the same run.
|
||||||
|
///
|
||||||
|
/// If the `Writer` is dropped without `close`, the `Drop` trait impl will
|
||||||
|
/// close, swallowing errors and using a zero duration for the last sample.
|
||||||
pub fn close(&mut self, next_pts: Option<i64>, reason: Option<String>) -> Result<(), Error> {
|
pub fn close(&mut self, next_pts: Option<i64>, reason: Option<String>) -> Result<(), Error> {
|
||||||
self.state = match mem::replace(&mut self.state, WriterState::Unopened) {
|
self.state = match mem::replace(&mut self.state, WriterState::Unopened) {
|
||||||
WriterState::Open(w) => {
|
WriterState::Open(w) => {
|
||||||
@ -1179,13 +1194,7 @@ mod tests {
|
|||||||
rfc6381_codec: "avc1.000000".to_owned(),
|
rfc6381_codec: "avc1.000000".to_owned(),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let mut w = Writer::new(
|
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID);
|
||||||
&h.dir,
|
|
||||||
&h.db,
|
|
||||||
&h.channel,
|
|
||||||
testutil::TEST_STREAM_ID,
|
|
||||||
video_sample_entry_id,
|
|
||||||
);
|
|
||||||
h.dir.expect(MockDirAction::Create(
|
h.dir.expect(MockDirAction::Create(
|
||||||
CompositeId::new(1, 0),
|
CompositeId::new(1, 0),
|
||||||
Box::new(|_id| Err(nix::Error::EIO)),
|
Box::new(|_id| Err(nix::Error::EIO)),
|
||||||
@ -1200,7 +1209,14 @@ mod tests {
|
|||||||
));
|
));
|
||||||
f.expect(MockFileAction::Write(Box::new(|_| Ok(1))));
|
f.expect(MockFileAction::Write(Box::new(|_| Ok(1))));
|
||||||
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
||||||
w.write(&mut h.shutdown_rx, b"1", recording::Time(1), 0, true)
|
w.write(
|
||||||
|
&mut h.shutdown_rx,
|
||||||
|
b"1",
|
||||||
|
recording::Time(1),
|
||||||
|
0,
|
||||||
|
true,
|
||||||
|
video_sample_entry_id,
|
||||||
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let e = w
|
let e = w
|
||||||
|
@ -211,7 +211,7 @@ fn press_test_inner(
|
|||||||
transport: retina::client::Transport,
|
transport: retina::client::Transport,
|
||||||
) -> Result<String, Error> {
|
) -> Result<String, Error> {
|
||||||
let _enter = handle.enter();
|
let _enter = handle.enter();
|
||||||
let (extra_data, stream) = stream::OPENER.open(
|
let stream = stream::OPENER.open(
|
||||||
"test stream".to_owned(),
|
"test stream".to_owned(),
|
||||||
url,
|
url,
|
||||||
retina::client::SessionOptions::default()
|
retina::client::SessionOptions::default()
|
||||||
@ -222,10 +222,17 @@ fn press_test_inner(
|
|||||||
})
|
})
|
||||||
.transport(transport),
|
.transport(transport),
|
||||||
)?;
|
)?;
|
||||||
|
let video_sample_entry = stream.video_sample_entry();
|
||||||
Ok(format!(
|
Ok(format!(
|
||||||
"{}x{} video stream served by tool {:?}",
|
"codec: {}\n\
|
||||||
extra_data.width,
|
dimensions: {}x{}\n\
|
||||||
extra_data.height,
|
pixel aspect ratio: {}x{}\n\
|
||||||
|
tool: {:?}",
|
||||||
|
&video_sample_entry.rfc6381_codec,
|
||||||
|
video_sample_entry.width,
|
||||||
|
video_sample_entry.height,
|
||||||
|
video_sample_entry.pasp_h_spacing,
|
||||||
|
video_sample_entry.pasp_v_spacing,
|
||||||
stream.tool(),
|
stream.tool(),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
@ -2278,21 +2278,18 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn copy_mp4_to_db(db: &mut TestDb<RealClocks>) {
|
fn copy_mp4_to_db(db: &mut TestDb<RealClocks>) {
|
||||||
let (extra_data, input) =
|
let input = stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
|
||||||
stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
|
|
||||||
let mut input: Box<dyn stream::Stream> = Box::new(input);
|
let mut input: Box<dyn stream::Stream> = Box::new(input);
|
||||||
|
|
||||||
// 2015-04-26 00:00:00 UTC.
|
// 2015-04-26 00:00:00 UTC.
|
||||||
const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC);
|
const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC);
|
||||||
let video_sample_entry_id = db.db.lock().insert_video_sample_entry(extra_data).unwrap();
|
let video_sample_entry_id = db
|
||||||
|
.db
|
||||||
|
.lock()
|
||||||
|
.insert_video_sample_entry(input.video_sample_entry().clone())
|
||||||
|
.unwrap();
|
||||||
let dir = db.dirs_by_stream_id.get(&TEST_STREAM_ID).unwrap();
|
let dir = db.dirs_by_stream_id.get(&TEST_STREAM_ID).unwrap();
|
||||||
let mut output = writer::Writer::new(
|
let mut output = writer::Writer::new(dir, &db.db, &db.syncer_channel, TEST_STREAM_ID);
|
||||||
dir,
|
|
||||||
&db.db,
|
|
||||||
&db.syncer_channel,
|
|
||||||
TEST_STREAM_ID,
|
|
||||||
video_sample_entry_id,
|
|
||||||
);
|
|
||||||
|
|
||||||
// end_pts is the pts of the end of the most recent frame (start + duration).
|
// end_pts is the pts of the end of the most recent frame (start + duration).
|
||||||
// It's needed because dir::Writer calculates a packet's duration from its pts and the
|
// It's needed because dir::Writer calculates a packet's duration from its pts and the
|
||||||
@ -2321,6 +2318,7 @@ mod tests {
|
|||||||
frame_time,
|
frame_time,
|
||||||
pkt.pts,
|
pkt.pts,
|
||||||
pkt.is_key,
|
pkt.is_key,
|
||||||
|
video_sample_entry_id,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
end_pts = Some(pkt.pts + i64::from(pkt.duration));
|
end_pts = Some(pkt.pts + i64::from(pkt.duration));
|
||||||
@ -2391,9 +2389,8 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn compare_mp4s(new_filename: &str, pts_offset: i64, shorten: i64) {
|
fn compare_mp4s(new_filename: &str, pts_offset: i64, shorten: i64) {
|
||||||
let (orig_extra_data, orig) =
|
let orig = stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
|
||||||
stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
|
let new = stream::testutil::Mp4Stream::open(new_filename).unwrap();
|
||||||
let (new_extra_data, new) = stream::testutil::Mp4Stream::open(new_filename).unwrap();
|
|
||||||
|
|
||||||
if pts_offset > 0 {
|
if pts_offset > 0 {
|
||||||
// The mp4 crate doesn't interpret the edit list. Manually inspect it.
|
// The mp4 crate doesn't interpret the edit list. Manually inspect it.
|
||||||
@ -2410,7 +2407,7 @@ mod tests {
|
|||||||
|
|
||||||
let mut orig: Box<dyn stream::Stream> = Box::new(orig);
|
let mut orig: Box<dyn stream::Stream> = Box::new(orig);
|
||||||
let mut new: Box<dyn stream::Stream> = Box::new(new);
|
let mut new: Box<dyn stream::Stream> = Box::new(new);
|
||||||
assert_eq!(orig_extra_data, new_extra_data);
|
assert_eq!(orig.video_sample_entry(), new.video_sample_entry());
|
||||||
let mut final_durations = None;
|
let mut final_durations = None;
|
||||||
for i in 0.. {
|
for i in 0.. {
|
||||||
let orig_pkt = match orig.next() {
|
let orig_pkt = match orig.next() {
|
||||||
|
@ -8,7 +8,7 @@ use failure::format_err;
|
|||||||
use failure::{bail, Error};
|
use failure::{bail, Error};
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use retina::client::Demuxed;
|
use retina::client::Demuxed;
|
||||||
use retina::codec::{CodecItem, VideoParameters};
|
use retina::codec::CodecItem;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::result::Result;
|
use std::result::Result;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
@ -26,7 +26,7 @@ pub trait Opener: Send + Sync {
|
|||||||
label: String,
|
label: String,
|
||||||
url: Url,
|
url: Url,
|
||||||
options: retina::client::SessionOptions,
|
options: retina::client::SessionOptions,
|
||||||
) -> Result<(db::VideoSampleEntryToInsert, Box<dyn Stream>), Error>;
|
) -> Result<Box<dyn Stream>, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct VideoFrame {
|
pub struct VideoFrame {
|
||||||
@ -38,10 +38,13 @@ pub struct VideoFrame {
|
|||||||
|
|
||||||
pub is_key: bool,
|
pub is_key: bool,
|
||||||
pub data: Bytes,
|
pub data: Bytes,
|
||||||
|
|
||||||
|
pub new_video_sample_entry: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait Stream: Send {
|
pub trait Stream: Send {
|
||||||
fn tool(&self) -> Option<&retina::client::Tool>;
|
fn tool(&self) -> Option<&retina::client::Tool>;
|
||||||
|
fn video_sample_entry(&self) -> &db::VideoSampleEntryToInsert;
|
||||||
fn next(&mut self) -> Result<VideoFrame, Error>;
|
fn next(&mut self) -> Result<VideoFrame, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -55,22 +58,20 @@ impl Opener for RealOpener {
|
|||||||
label: String,
|
label: String,
|
||||||
url: Url,
|
url: Url,
|
||||||
options: retina::client::SessionOptions,
|
options: retina::client::SessionOptions,
|
||||||
) -> Result<(db::VideoSampleEntryToInsert, Box<dyn Stream>), Error> {
|
) -> Result<Box<dyn Stream>, Error> {
|
||||||
let options = options.user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION")));
|
let options = options.user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION")));
|
||||||
let rt_handle = tokio::runtime::Handle::current();
|
let rt_handle = tokio::runtime::Handle::current();
|
||||||
let (inner, video_params, first_frame) = rt_handle
|
let (inner, first_frame) = rt_handle
|
||||||
.block_on(rt_handle.spawn(tokio::time::timeout(
|
.block_on(rt_handle.spawn(tokio::time::timeout(
|
||||||
RETINA_TIMEOUT,
|
RETINA_TIMEOUT,
|
||||||
RetinaStreamInner::play(label, url, options),
|
RetinaStreamInner::play(label, url, options),
|
||||||
)))
|
)))
|
||||||
.expect("RetinaStream::play task panicked, see earlier error")??;
|
.expect("RetinaStream::play task panicked, see earlier error")??;
|
||||||
let extra_data = h264::parse_extra_data(video_params.extra_data())?;
|
Ok(Box::new(RetinaStream {
|
||||||
let stream = Box::new(RetinaStream {
|
|
||||||
inner: Some(inner),
|
inner: Some(inner),
|
||||||
rt_handle,
|
rt_handle,
|
||||||
first_frame: Some(first_frame),
|
first_frame: Some(first_frame),
|
||||||
});
|
}))
|
||||||
Ok((extra_data, stream))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -102,6 +103,7 @@ struct RetinaStream {
|
|||||||
struct RetinaStreamInner {
|
struct RetinaStreamInner {
|
||||||
label: String,
|
label: String,
|
||||||
session: Demuxed,
|
session: Demuxed,
|
||||||
|
video_sample_entry: db::VideoSampleEntryToInsert,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RetinaStreamInner {
|
impl RetinaStreamInner {
|
||||||
@ -110,7 +112,7 @@ impl RetinaStreamInner {
|
|||||||
label: String,
|
label: String,
|
||||||
url: Url,
|
url: Url,
|
||||||
options: retina::client::SessionOptions,
|
options: retina::client::SessionOptions,
|
||||||
) -> Result<(Box<Self>, Box<VideoParameters>, retina::codec::VideoFrame), Error> {
|
) -> Result<(Box<Self>, retina::codec::VideoFrame), Error> {
|
||||||
let mut session = retina::client::Session::describe(url, options).await?;
|
let mut session = retina::client::Session::describe(url, options).await?;
|
||||||
log::debug!("connected to {:?}, tool {:?}", &label, session.tool());
|
log::debug!("connected to {:?}, tool {:?}", &label, session.tool());
|
||||||
let (video_i, mut video_params) = session
|
let (video_i, mut video_params) = session
|
||||||
@ -151,12 +153,17 @@ impl RetinaStreamInner {
|
|||||||
Some(Ok(_)) => {}
|
Some(Ok(_)) => {}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let self_ = Box::new(Self { label, session });
|
let video_sample_entry = h264::parse_extra_data(
|
||||||
Ok((
|
video_params
|
||||||
self_,
|
.ok_or_else(|| format_err!("couldn't find H.264 parameters"))?
|
||||||
video_params.ok_or_else(|| format_err!("couldn't find H.264 parameters"))?,
|
.extra_data(),
|
||||||
first_frame,
|
)?;
|
||||||
))
|
let self_ = Box::new(Self {
|
||||||
|
label,
|
||||||
|
session,
|
||||||
|
video_sample_entry,
|
||||||
|
});
|
||||||
|
Ok((self_, first_frame))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetches a non-initial frame.
|
/// Fetches a non-initial frame.
|
||||||
@ -167,10 +174,6 @@ impl RetinaStreamInner {
|
|||||||
match Pin::new(&mut self.session).next().await.transpose()? {
|
match Pin::new(&mut self.session).next().await.transpose()? {
|
||||||
None => bail!("end of stream"),
|
None => bail!("end of stream"),
|
||||||
Some(CodecItem::VideoFrame(v)) => {
|
Some(CodecItem::VideoFrame(v)) => {
|
||||||
if let Some(p) = v.new_parameters {
|
|
||||||
// TODO: we could start a new recording without dropping the connection.
|
|
||||||
bail!("parameter change: {:?}", p);
|
|
||||||
}
|
|
||||||
if v.loss > 0 {
|
if v.loss > 0 {
|
||||||
log::warn!(
|
log::warn!(
|
||||||
"{}: lost {} RTP packets @ {}",
|
"{}: lost {} RTP packets @ {}",
|
||||||
@ -192,10 +195,18 @@ impl Stream for RetinaStream {
|
|||||||
self.inner.as_ref().unwrap().session.tool()
|
self.inner.as_ref().unwrap().session.tool()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn video_sample_entry(&self) -> &db::VideoSampleEntryToInsert {
|
||||||
|
&self.inner.as_ref().unwrap().video_sample_entry
|
||||||
|
}
|
||||||
|
|
||||||
fn next(&mut self) -> Result<VideoFrame, Error> {
|
fn next(&mut self) -> Result<VideoFrame, Error> {
|
||||||
let frame = self.first_frame.take().map(Ok).unwrap_or_else(move || {
|
let (frame, new_video_sample_entry) = self
|
||||||
|
.first_frame
|
||||||
|
.take()
|
||||||
|
.map(|f| Ok((f, false)))
|
||||||
|
.unwrap_or_else(move || {
|
||||||
let inner = self.inner.take().unwrap();
|
let inner = self.inner.take().unwrap();
|
||||||
let (inner, frame) = self
|
let (mut inner, mut frame) = self
|
||||||
.rt_handle
|
.rt_handle
|
||||||
.block_on(self.rt_handle.spawn(tokio::time::timeout(
|
.block_on(self.rt_handle.spawn(tokio::time::timeout(
|
||||||
RETINA_TIMEOUT,
|
RETINA_TIMEOUT,
|
||||||
@ -203,14 +214,29 @@ impl Stream for RetinaStream {
|
|||||||
)))
|
)))
|
||||||
.expect("fetch_next_frame task panicked, see earlier error")
|
.expect("fetch_next_frame task panicked, see earlier error")
|
||||||
.map_err(|_| format_err!("timeout getting next frame"))??;
|
.map_err(|_| format_err!("timeout getting next frame"))??;
|
||||||
|
let mut new_video_sample_entry = false;
|
||||||
|
if let Some(p) = frame.new_parameters.take() {
|
||||||
|
let video_sample_entry = h264::parse_extra_data(p.extra_data())?;
|
||||||
|
if video_sample_entry != inner.video_sample_entry {
|
||||||
|
log::debug!(
|
||||||
|
"{}: parameter change:\nold: {:?}\nnew: {:?}",
|
||||||
|
&inner.label,
|
||||||
|
&inner.video_sample_entry,
|
||||||
|
&video_sample_entry
|
||||||
|
);
|
||||||
|
inner.video_sample_entry = video_sample_entry;
|
||||||
|
new_video_sample_entry = true;
|
||||||
|
}
|
||||||
|
};
|
||||||
self.inner = Some(inner);
|
self.inner = Some(inner);
|
||||||
Ok::<_, failure::Error>(frame)
|
Ok::<_, failure::Error>((frame, new_video_sample_entry))
|
||||||
})?;
|
})?;
|
||||||
Ok(VideoFrame {
|
Ok(VideoFrame {
|
||||||
pts: frame.timestamp.elapsed(),
|
pts: frame.timestamp.elapsed(),
|
||||||
duration: 0,
|
duration: 0,
|
||||||
is_key: frame.is_random_access_point,
|
is_key: frame.is_random_access_point,
|
||||||
data: frame.into_data(),
|
data: frame.into_data(),
|
||||||
|
new_video_sample_entry,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -225,11 +251,12 @@ pub mod testutil {
|
|||||||
reader: mp4::Mp4Reader<Cursor<Vec<u8>>>,
|
reader: mp4::Mp4Reader<Cursor<Vec<u8>>>,
|
||||||
h264_track_id: u32,
|
h264_track_id: u32,
|
||||||
next_sample_id: u32,
|
next_sample_id: u32,
|
||||||
|
video_sample_entry: db::VideoSampleEntryToInsert,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Mp4Stream {
|
impl Mp4Stream {
|
||||||
/// Opens a stream, with a return matching that expected by [`Opener`].
|
/// Opens a stream, with a return matching that expected by [`Opener`].
|
||||||
pub fn open(path: &str) -> Result<(db::VideoSampleEntryToInsert, Self), Error> {
|
pub fn open(path: &str) -> Result<Self, Error> {
|
||||||
let f = std::fs::read(path)?;
|
let f = std::fs::read(path)?;
|
||||||
let len = f.len();
|
let len = f.len();
|
||||||
let reader = mp4::Mp4Reader::read_header(Cursor::new(f), u64::try_from(len)?)?;
|
let reader = mp4::Mp4Reader::read_header(Cursor::new(f), u64::try_from(len)?)?;
|
||||||
@ -241,14 +268,15 @@ pub mod testutil {
|
|||||||
None => bail!("expected a H.264 track"),
|
None => bail!("expected a H.264 track"),
|
||||||
Some(t) => t,
|
Some(t) => t,
|
||||||
};
|
};
|
||||||
let extra_data = h264::parse_extra_data(&h264_track.extra_data()?[..])?;
|
let video_sample_entry = h264::parse_extra_data(&h264_track.extra_data()?[..])?;
|
||||||
let h264_track_id = h264_track.track_id();
|
let h264_track_id = h264_track.track_id();
|
||||||
let stream = Mp4Stream {
|
let stream = Mp4Stream {
|
||||||
reader,
|
reader,
|
||||||
h264_track_id,
|
h264_track_id,
|
||||||
next_sample_id: 1,
|
next_sample_id: 1,
|
||||||
|
video_sample_entry,
|
||||||
};
|
};
|
||||||
Ok((extra_data, stream))
|
Ok(stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn duration(&self) -> u64 {
|
pub fn duration(&self) -> u64 {
|
||||||
@ -282,7 +310,12 @@ pub mod testutil {
|
|||||||
duration: sample.duration as i32,
|
duration: sample.duration as i32,
|
||||||
is_key: sample.is_sync,
|
is_key: sample.is_sync,
|
||||||
data: sample.bytes,
|
data: sample.bytes,
|
||||||
|
new_video_sample_entry: false,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn video_sample_entry(&self) -> &db::VideoSampleEntryToInsert {
|
||||||
|
&self.video_sample_entry
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -159,7 +159,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let (extra_data, mut stream) = {
|
let mut stream = {
|
||||||
let _t = TimerGuard::new(&clocks, || format!("opening {}", self.url.as_str()));
|
let _t = TimerGuard::new(&clocks, || format!("opening {}", self.url.as_str()));
|
||||||
self.opener.open(
|
self.opener.open(
|
||||||
self.short_name.clone(),
|
self.short_name.clone(),
|
||||||
@ -178,34 +178,33 @@ where
|
|||||||
)?
|
)?
|
||||||
};
|
};
|
||||||
let realtime_offset = self.db.clocks().realtime() - clocks.monotonic();
|
let realtime_offset = self.db.clocks().realtime() - clocks.monotonic();
|
||||||
let video_sample_entry_id = {
|
let mut video_sample_entry_id = {
|
||||||
let _t = TimerGuard::new(&clocks, || "inserting video sample entry");
|
let _t = TimerGuard::new(&clocks, || "inserting video sample entry");
|
||||||
self.db.lock().insert_video_sample_entry(extra_data)?
|
self.db
|
||||||
|
.lock()
|
||||||
|
.insert_video_sample_entry(stream.video_sample_entry().clone())?
|
||||||
};
|
};
|
||||||
let mut seen_key_frame = false;
|
let mut seen_key_frame = false;
|
||||||
|
|
||||||
// Seconds since epoch at which to next rotate.
|
// Seconds since epoch at which to next rotate. See comment at start
|
||||||
|
// of while loop.
|
||||||
let mut rotate: Option<i64> = None;
|
let mut rotate: Option<i64> = None;
|
||||||
let mut w = writer::Writer::new(
|
let mut w = writer::Writer::new(&self.dir, &self.db, &self.syncer_channel, self.stream_id);
|
||||||
&self.dir,
|
|
||||||
&self.db,
|
|
||||||
&self.syncer_channel,
|
|
||||||
self.stream_id,
|
|
||||||
video_sample_entry_id,
|
|
||||||
);
|
|
||||||
while self.shutdown_rx.check().is_ok() {
|
while self.shutdown_rx.check().is_ok() {
|
||||||
let pkt = {
|
// `rotate` should now be set iff `w` has an open recording.
|
||||||
|
|
||||||
|
let frame = {
|
||||||
let _t = TimerGuard::new(&clocks, || "getting next packet");
|
let _t = TimerGuard::new(&clocks, || "getting next packet");
|
||||||
stream.next()
|
stream.next()
|
||||||
};
|
};
|
||||||
let pkt = match pkt {
|
let frame = match frame {
|
||||||
Ok(p) => p,
|
Ok(f) => f,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let _ = w.close(None, Some(e.to_string()));
|
let _ = w.close(None, Some(e.to_string()));
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if !seen_key_frame && !pkt.is_key {
|
if !seen_key_frame && !frame.is_key {
|
||||||
continue;
|
continue;
|
||||||
} else if !seen_key_frame {
|
} else if !seen_key_frame {
|
||||||
debug!("{}: have first key frame", self.short_name);
|
debug!("{}: have first key frame", self.short_name);
|
||||||
@ -214,10 +213,24 @@ where
|
|||||||
let frame_realtime = clocks.monotonic() + realtime_offset;
|
let frame_realtime = clocks.monotonic() + realtime_offset;
|
||||||
let local_time = recording::Time::new(frame_realtime);
|
let local_time = recording::Time::new(frame_realtime);
|
||||||
rotate = if let Some(r) = rotate {
|
rotate = if let Some(r) = rotate {
|
||||||
if frame_realtime.sec > r && pkt.is_key {
|
if frame_realtime.sec > r && frame.is_key {
|
||||||
trace!("{}: write on normal rotation", self.short_name);
|
trace!("{}: close on normal rotation", self.short_name);
|
||||||
let _t = TimerGuard::new(&clocks, || "closing writer");
|
let _t = TimerGuard::new(&clocks, || "closing writer");
|
||||||
w.close(Some(pkt.pts), None)?;
|
w.close(Some(frame.pts), None)?;
|
||||||
|
None
|
||||||
|
} else if frame.new_video_sample_entry {
|
||||||
|
if !frame.is_key {
|
||||||
|
bail!("parameter change on non-key frame");
|
||||||
|
}
|
||||||
|
trace!("{}: close on parameter change", self.short_name);
|
||||||
|
video_sample_entry_id = {
|
||||||
|
let _t = TimerGuard::new(&clocks, || "inserting video sample entry");
|
||||||
|
self.db
|
||||||
|
.lock()
|
||||||
|
.insert_video_sample_entry(stream.video_sample_entry().clone())?
|
||||||
|
};
|
||||||
|
let _t = TimerGuard::new(&clocks, || "closing writer");
|
||||||
|
w.close(Some(frame.pts), None)?;
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
Some(r)
|
Some(r)
|
||||||
@ -249,13 +262,14 @@ where
|
|||||||
r
|
r
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let _t = TimerGuard::new(&clocks, || format!("writing {} bytes", pkt.data.len()));
|
let _t = TimerGuard::new(&clocks, || format!("writing {} bytes", frame.data.len()));
|
||||||
w.write(
|
w.write(
|
||||||
&mut self.shutdown_rx,
|
&mut self.shutdown_rx,
|
||||||
&pkt.data[..],
|
&frame.data[..],
|
||||||
local_time,
|
local_time,
|
||||||
pkt.pts,
|
frame.pts,
|
||||||
pkt.is_key,
|
frame.is_key,
|
||||||
|
video_sample_entry_id,
|
||||||
)?;
|
)?;
|
||||||
rotate = Some(r);
|
rotate = Some(r);
|
||||||
}
|
}
|
||||||
@ -314,6 +328,10 @@ mod tests {
|
|||||||
self.inner.tool()
|
self.inner.tool()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn video_sample_entry(&self) -> &db::VideoSampleEntryToInsert {
|
||||||
|
self.inner.video_sample_entry()
|
||||||
|
}
|
||||||
|
|
||||||
fn next(&mut self) -> Result<stream::VideoFrame, Error> {
|
fn next(&mut self) -> Result<stream::VideoFrame, Error> {
|
||||||
if self.pkts_left == 0 {
|
if self.pkts_left == 0 {
|
||||||
bail!("end of stream");
|
bail!("end of stream");
|
||||||
@ -355,7 +373,7 @@ mod tests {
|
|||||||
|
|
||||||
struct MockOpener {
|
struct MockOpener {
|
||||||
expected_url: url::Url,
|
expected_url: url::Url,
|
||||||
streams: Mutex<Vec<(db::VideoSampleEntryToInsert, Box<dyn stream::Stream>)>>,
|
streams: Mutex<Vec<Box<dyn stream::Stream>>>,
|
||||||
shutdown_tx: Mutex<Option<base::shutdown::Sender>>,
|
shutdown_tx: Mutex<Option<base::shutdown::Sender>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -365,7 +383,7 @@ mod tests {
|
|||||||
_label: String,
|
_label: String,
|
||||||
url: url::Url,
|
url: url::Url,
|
||||||
_options: retina::client::SessionOptions,
|
_options: retina::client::SessionOptions,
|
||||||
) -> Result<(db::VideoSampleEntryToInsert, Box<dyn stream::Stream>), Error> {
|
) -> Result<Box<dyn stream::Stream>, Error> {
|
||||||
assert_eq!(&url, &self.expected_url);
|
assert_eq!(&url, &self.expected_url);
|
||||||
let mut l = self.streams.lock();
|
let mut l = self.streams.lock();
|
||||||
match l.pop() {
|
match l.pop() {
|
||||||
@ -412,8 +430,7 @@ mod tests {
|
|||||||
let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0));
|
let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0));
|
||||||
clocks.sleep(time::Duration::seconds(86400)); // to 2015-04-26 00:00:00 UTC
|
clocks.sleep(time::Duration::seconds(86400)); // to 2015-04-26 00:00:00 UTC
|
||||||
|
|
||||||
let (extra_data, stream) =
|
let stream = stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
|
||||||
stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
|
|
||||||
let mut stream =
|
let mut stream =
|
||||||
ProxyingStream::new(clocks.clone(), time::Duration::seconds(2), Box::new(stream));
|
ProxyingStream::new(clocks.clone(), time::Duration::seconds(2), Box::new(stream));
|
||||||
stream.ts_offset = 123456; // starting pts of the input should be irrelevant
|
stream.ts_offset = 123456; // starting pts of the input should be irrelevant
|
||||||
@ -422,7 +439,7 @@ mod tests {
|
|||||||
let (shutdown_tx, shutdown_rx) = base::shutdown::channel();
|
let (shutdown_tx, shutdown_rx) = base::shutdown::channel();
|
||||||
let opener = MockOpener {
|
let opener = MockOpener {
|
||||||
expected_url: url::Url::parse("rtsp://test-camera/main").unwrap(),
|
expected_url: url::Url::parse("rtsp://test-camera/main").unwrap(),
|
||||||
streams: Mutex::new(vec![(extra_data, Box::new(stream))]),
|
streams: Mutex::new(vec![Box::new(stream)]),
|
||||||
shutdown_tx: Mutex::new(Some(shutdown_tx)),
|
shutdown_tx: Mutex::new(Some(shutdown_tx)),
|
||||||
};
|
};
|
||||||
let db = testutil::TestDb::new(clocks.clone());
|
let db = testutil::TestDb::new(clocks.clone());
|
||||||
|
Loading…
x
Reference in New Issue
Block a user