make retina's behavior more like ffmpeg's

*   have a timeout for opening the connection and getting the next
    video frame. The former is quite important. The latter is arguably
    redundant with the keepalive timer, but this ensures we actually
    get a full frame in this timespan rather than some keepalive
    responses, RTCP sender reports, or partial frames.
*   don't drop extra stuff on loss; just note it. I'm not sure what the
    right behavior is but I think I shouldn't change too much at once.
This commit is contained in:
Scott Lamb 2021-06-28 16:27:28 -07:00
parent a0ed74e8e0
commit 7034480cfe

View File

@ -9,15 +9,18 @@ use failure::{bail, Error};
use futures::StreamExt; use futures::StreamExt;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::warn; use log::warn;
use retina::client::{Credentials, Playing, Session}; use retina::client::Credentials;
use retina::codec::{CodecItem, VideoParameters}; use retina::codec::{CodecItem, VideoParameters};
use std::convert::TryFrom; use std::convert::TryFrom;
use std::ffi::CString; use std::ffi::CString;
use std::pin::Pin;
use std::result::Result; use std::result::Result;
use url::Url; use url::Url;
static START_FFMPEG: parking_lot::Once = parking_lot::Once::new(); static START_FFMPEG: parking_lot::Once = parking_lot::Once::new();
static RETINA_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
lazy_static! { lazy_static! {
pub static ref FFMPEG: Ffmpeg = Ffmpeg::new(); pub static ref FFMPEG: Ffmpeg = Ffmpeg::new();
} }
@ -289,89 +292,44 @@ impl Opener for RetinaOpener {
// TODO: connection timeout. // TODO: connection timeout.
handle.spawn(async move { handle.spawn(async move {
let (session, mut video_params) = match RetinaOpener::play(url, creds).await { let r = tokio::time::timeout(RETINA_TIMEOUT, RetinaOpener::play(url, creds)).await;
let (mut session, video_params, first_frame) =
match r.unwrap_or_else(|_| Err(format_err!("timeout opening stream"))) {
Err(e) => { Err(e) => {
let _ = startup_tx.send(Err(e)); let _ = startup_tx.send(Err(e));
return; return;
} }
Ok((s, v)) => (s, v), Ok((s, p, f)) => (s, p, f),
}; };
let session = match session.demuxed() {
Ok(s) => s,
Err(e) => {
let _ = startup_tx.send(Err(e));
return;
}
};
tokio::pin!(session);
// First frame.
loop {
match session.next().await {
Some(Err(e)) => {
let _ = startup_tx.send(Err(e));
return;
}
Some(Ok(CodecItem::VideoFrame(mut v))) => {
if let Some(v) = v.new_parameters.take() {
video_params = v;
}
if v.is_random_access_point {
if startup_tx.send(Ok(video_params)).is_err() { if startup_tx.send(Ok(video_params)).is_err() {
return; return;
} }
if frame_tx.send(Ok(v)).await.is_err() { if frame_tx.send(Ok(first_frame)).await.is_err() {
return; return;
} }
break;
}
}
Some(Ok(_)) => {}
None => {
let _ =
startup_tx.send(Err(format_err!("stream closed before first frame")));
return;
}
}
}
// Following frames. // Read following frames.
let mut need_key_frame = false; let mut deadline = tokio::time::Instant::now() + RETINA_TIMEOUT;
while let Some(item) = session.next().await { loop {
let item = tokio::time::timeout_at(deadline, session.next())
.await
.unwrap_or_else(|_| Some(Err(format_err!("timeout getting next frame"))));
match item { match item {
Err(e) => { Some(Err(e)) => {
let _ = frame_tx.send(Err(e)).await; let _ = frame_tx.send(Err(e)).await;
return; return;
} }
Ok(CodecItem::VideoFrame(v)) => { None => break,
Some(Ok(CodecItem::VideoFrame(v))) => {
deadline = tokio::time::Instant::now() + RETINA_TIMEOUT;
if v.loss > 0 { if v.loss > 0 {
if !v.is_random_access_point { log::warn!("lost {} RTP packets @ {:?}", v.loss, v.start_ctx());
log::info!(
"lost {} RTP packets; waiting for next key frame @ {:?}",
v.loss,
v.start_ctx()
);
need_key_frame = true;
continue;
} else {
log::info!(
"lost {} RTP packets; already have key frame @ {:?}",
v.loss,
v.start_ctx()
);
need_key_frame = false;
}
} else if need_key_frame && !v.is_random_access_point {
continue;
} else if need_key_frame {
log::info!("recovering from loss with key frame @ {:?}", v.start_ctx());
need_key_frame = false;
} }
if frame_tx.send(Ok(v)).await.is_err() { if frame_tx.send(Ok(v)).await.is_err() {
return; // other end died. return; // other end died.
} }
} }
_ => {} Some(Ok(_)) => {}
} }
} }
}); });
@ -391,12 +349,20 @@ impl Opener for RetinaOpener {
} }
impl RetinaOpener { impl RetinaOpener {
/// Plays to first frame. No timeout; that's the caller's responsibility.
async fn play( async fn play(
url: Url, url: Url,
creds: Option<Credentials>, creds: Option<Credentials>,
) -> Result<(Session<Playing>, VideoParameters), Error> { ) -> Result<
(
Pin<Box<impl futures::Stream<Item = Result<retina::codec::CodecItem, Error>>>>,
VideoParameters,
retina::codec::VideoFrame,
),
Error,
> {
let mut session = retina::client::Session::describe(url, creds).await?; let mut session = retina::client::Session::describe(url, creds).await?;
let (video_i, video_params) = session let (video_i, mut video_params) = session
.streams() .streams()
.iter() .iter()
.enumerate() .enumerate()
@ -407,7 +373,24 @@ impl RetinaOpener {
.ok_or_else(|| format_err!("couldn't find H.264 video stream"))?; .ok_or_else(|| format_err!("couldn't find H.264 video stream"))?;
session.setup(video_i).await?; session.setup(video_i).await?;
let session = session.play(retina::client::PlayPolicy::default()).await?; let session = session.play(retina::client::PlayPolicy::default()).await?;
Ok((session, video_params)) let mut session = Box::pin(session.demuxed()?);
// First frame.
let first_frame = loop {
if let CodecItem::VideoFrame(mut v) = session
.next()
.await
.unwrap_or_else(|| Err(format_err!("stream closed before first frame")))?
{
if let Some(v) = v.new_parameters.take() {
video_params = v;
}
if v.is_random_access_point {
break v;
}
}
};
Ok((session, video_params, first_frame))
} }
} }