reduce thread handoffs in RetinaStream

This commit is contained in:
Scott Lamb 2022-04-13 13:22:26 -07:00
parent 7b0a489541
commit 967834ce15
1 changed files with 52 additions and 35 deletions

View File

@ -58,14 +58,15 @@ impl Opener for RealOpener {
) -> Result<(db::VideoSampleEntryToInsert, Box<dyn Stream>), Error> { ) -> Result<(db::VideoSampleEntryToInsert, Box<dyn Stream>), Error> {
let options = options.user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION"))); let options = options.user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION")));
let rt_handle = tokio::runtime::Handle::current(); let rt_handle = tokio::runtime::Handle::current();
let (session, video_params, first_frame) = rt_handle.block_on(tokio::time::timeout( let (inner, video_params, first_frame) = rt_handle
RETINA_TIMEOUT, .block_on(rt_handle.spawn(tokio::time::timeout(
RetinaStream::play(&label, url, options), RETINA_TIMEOUT,
))??; RetinaStreamInner::play(label, url, options),
)))
.expect("RetinaStream::play task panicked, see earlier error")??;
let extra_data = h264::parse_extra_data(video_params.extra_data())?; let extra_data = h264::parse_extra_data(video_params.extra_data())?;
let stream = Box::new(RetinaStream { let stream = Box::new(RetinaStream {
label, inner: Some(inner),
session,
rt_handle, rt_handle,
first_frame: Some(first_frame), first_frame: Some(first_frame),
}); });
@ -73,9 +74,22 @@ impl Opener for RealOpener {
} }
} }
/// Real stream, implemented with the Retina library.
///
/// Retina is asynchronous and tokio-based where currently Moonfire expects
/// a synchronous stream interface. This blocks on the tokio operations.
///
/// Experimentally, it appears faster to have one thread hand-off per frame via
/// `handle.block_on(handle.spawn(...))` rather than the same without the
/// `handle.spawn(...)`. See
/// [#206](https://github.com/scottlamb/moonfire-nvr/issues/206).
struct RetinaStream { struct RetinaStream {
label: String, /// The actual stream details used from within the tokio reactor.
session: Pin<Box<Demuxed>>, ///
/// Spawned tokio tasks must be `'static`, so ownership is passed to the
/// task, and then returned when it completes.
inner: Option<Box<RetinaStreamInner>>,
rt_handle: tokio::runtime::Handle, rt_handle: tokio::runtime::Handle,
/// The first frame, if not yet returned from `next`. /// The first frame, if not yet returned from `next`.
@ -85,22 +99,20 @@ struct RetinaStream {
first_frame: Option<retina::codec::VideoFrame>, first_frame: Option<retina::codec::VideoFrame>,
} }
impl RetinaStream { struct RetinaStreamInner {
label: String,
session: Demuxed,
}
impl RetinaStreamInner {
/// Plays to first frame. No timeout; that's the caller's responsibility. /// Plays to first frame. No timeout; that's the caller's responsibility.
async fn play( async fn play(
label: &str, label: String,
url: Url, url: Url,
options: retina::client::SessionOptions, options: retina::client::SessionOptions,
) -> Result< ) -> Result<(Box<Self>, Box<VideoParameters>, retina::codec::VideoFrame), Error> {
(
Pin<Box<retina::client::Demuxed>>,
Box<VideoParameters>,
retina::codec::VideoFrame,
),
Error,
> {
let mut session = retina::client::Session::describe(url, options).await?; let mut session = retina::client::Session::describe(url, options).await?;
log::debug!("connected to {:?}, tool {:?}", label, session.tool()); log::debug!("connected to {:?}, tool {:?}", &label, session.tool());
let (video_i, mut video_params) = session let (video_i, mut video_params) = session
.streams() .streams()
.iter() .iter()
@ -121,11 +133,11 @@ impl RetinaStream {
.ok_or_else(|| format_err!("couldn't find H.264 video stream"))?; .ok_or_else(|| format_err!("couldn't find H.264 video stream"))?;
session.setup(video_i).await?; session.setup(video_i).await?;
let session = session.play(retina::client::PlayOptions::default()).await?; let session = session.play(retina::client::PlayOptions::default()).await?;
let mut session = Box::pin(session.demuxed()?); let mut session = session.demuxed()?;
// First frame. // First frame.
let first_frame = loop { let first_frame = loop {
match session.next().await { match Pin::new(&mut session).next().await {
None => bail!("stream closed before first frame"), None => bail!("stream closed before first frame"),
Some(Err(e)) => return Err(e.into()), Some(Err(e)) => return Err(e.into()),
Some(Ok(CodecItem::VideoFrame(mut v))) => { Some(Ok(CodecItem::VideoFrame(mut v))) => {
@ -139,8 +151,9 @@ impl RetinaStream {
Some(Ok(_)) => {} Some(Ok(_)) => {}
} }
}; };
let self_ = Box::new(Self { label, session });
Ok(( Ok((
session, self_,
video_params.ok_or_else(|| format_err!("couldn't find H.264 parameters"))?, video_params.ok_or_else(|| format_err!("couldn't find H.264 parameters"))?,
first_frame, first_frame,
)) ))
@ -148,11 +161,10 @@ impl RetinaStream {
/// Fetches a non-initial frame. /// Fetches a non-initial frame.
async fn fetch_next_frame( async fn fetch_next_frame(
label: &str, mut self: Box<Self>,
mut session: Pin<&mut Demuxed>, ) -> Result<(Box<Self>, retina::codec::VideoFrame), Error> {
) -> Result<retina::codec::VideoFrame, Error> {
loop { loop {
match session.next().await.transpose()? { match Pin::new(&mut self.session).next().await.transpose()? {
None => bail!("end of stream"), None => bail!("end of stream"),
Some(CodecItem::VideoFrame(v)) => { Some(CodecItem::VideoFrame(v)) => {
if let Some(p) = v.new_parameters { if let Some(p) = v.new_parameters {
@ -162,12 +174,12 @@ impl RetinaStream {
if v.loss > 0 { if v.loss > 0 {
log::warn!( log::warn!(
"{}: lost {} RTP packets @ {}", "{}: lost {} RTP packets @ {}",
&label, &self.label,
v.loss, v.loss,
v.start_ctx() v.start_ctx()
); );
} }
return Ok(v); return Ok((self, v));
} }
Some(_) => {} Some(_) => {}
} }
@ -177,17 +189,22 @@ impl RetinaStream {
impl Stream for RetinaStream { impl Stream for RetinaStream {
fn tool(&self) -> Option<&retina::client::Tool> { fn tool(&self) -> Option<&retina::client::Tool> {
Pin::into_inner(self.session.as_ref()).tool() self.inner.as_ref().unwrap().session.tool()
} }
fn next(&mut self) -> Result<VideoFrame, Error> { fn next(&mut self) -> Result<VideoFrame, Error> {
let frame = self.first_frame.take().map(Ok).unwrap_or_else(|| { let frame = self.first_frame.take().map(Ok).unwrap_or_else(move || {
self.rt_handle let inner = self.inner.take().unwrap();
.block_on(tokio::time::timeout( let (inner, frame) = self
.rt_handle
.block_on(self.rt_handle.spawn(tokio::time::timeout(
RETINA_TIMEOUT, RETINA_TIMEOUT,
RetinaStream::fetch_next_frame(&self.label, self.session.as_mut()), inner.fetch_next_frame(),
)) )))
.map_err(|_| format_err!("timeout getting next frame"))? .expect("fetch_next_frame task panicked, see earlier error")
.map_err(|_| format_err!("timeout getting next frame"))??;
self.inner = Some(inner);
Ok::<_, failure::Error>(frame)
})?; })?;
Ok(VideoFrame { Ok(VideoFrame {
pts: frame.timestamp.elapsed(), pts: frame.timestamp.elapsed(),