diff --git a/server/src/stream.rs b/server/src/stream.rs index a409163..9d8c3ea 100644 --- a/server/src/stream.rs +++ b/server/src/stream.rs @@ -9,15 +9,18 @@ use failure::{bail, Error}; use futures::StreamExt; use lazy_static::lazy_static; use log::warn; -use retina::client::{Credentials, Playing, Session}; +use retina::client::Credentials; use retina::codec::{CodecItem, VideoParameters}; use std::convert::TryFrom; use std::ffi::CString; +use std::pin::Pin; use std::result::Result; use url::Url; static START_FFMPEG: parking_lot::Once = parking_lot::Once::new(); +static RETINA_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); + lazy_static! { pub static ref FFMPEG: Ffmpeg = Ffmpeg::new(); } @@ -289,89 +292,44 @@ impl Opener for RetinaOpener { // TODO: connection timeout. handle.spawn(async move { - let (session, mut video_params) = match RetinaOpener::play(url, creds).await { - Err(e) => { - let _ = startup_tx.send(Err(e)); - return; - } - Ok((s, v)) => (s, v), - }; - let session = match session.demuxed() { - Ok(s) => s, - Err(e) => { - let _ = startup_tx.send(Err(e)); - return; - } - }; - tokio::pin!(session); - - // First frame. - loop { - match session.next().await { - Some(Err(e)) => { + let r = tokio::time::timeout(RETINA_TIMEOUT, RetinaOpener::play(url, creds)).await; + let (mut session, video_params, first_frame) = + match r.unwrap_or_else(|_| Err(format_err!("timeout opening stream"))) { + Err(e) => { let _ = startup_tx.send(Err(e)); return; } - Some(Ok(CodecItem::VideoFrame(mut v))) => { - if let Some(v) = v.new_parameters.take() { - video_params = v; - } - if v.is_random_access_point { - if startup_tx.send(Ok(video_params)).is_err() { - return; - } - if frame_tx.send(Ok(v)).await.is_err() { - return; - } - break; - } - } - Some(Ok(_)) => {} - None => { - let _ = - startup_tx.send(Err(format_err!("stream closed before first frame"))); - return; - } - } + Ok((s, p, f)) => (s, p, f), + }; + if startup_tx.send(Ok(video_params)).is_err() { + return; + } + if frame_tx.send(Ok(first_frame)).await.is_err() { + return; } - // Following frames. - let mut need_key_frame = false; - while let Some(item) = session.next().await { + // Read following frames. + let mut deadline = tokio::time::Instant::now() + RETINA_TIMEOUT; + loop { + let item = tokio::time::timeout_at(deadline, session.next()) + .await + .unwrap_or_else(|_| Some(Err(format_err!("timeout getting next frame")))); match item { - Err(e) => { + Some(Err(e)) => { let _ = frame_tx.send(Err(e)).await; return; } - Ok(CodecItem::VideoFrame(v)) => { + None => break, + Some(Ok(CodecItem::VideoFrame(v))) => { + deadline = tokio::time::Instant::now() + RETINA_TIMEOUT; if v.loss > 0 { - if !v.is_random_access_point { - log::info!( - "lost {} RTP packets; waiting for next key frame @ {:?}", - v.loss, - v.start_ctx() - ); - need_key_frame = true; - continue; - } else { - log::info!( - "lost {} RTP packets; already have key frame @ {:?}", - v.loss, - v.start_ctx() - ); - need_key_frame = false; - } - } else if need_key_frame && !v.is_random_access_point { - continue; - } else if need_key_frame { - log::info!("recovering from loss with key frame @ {:?}", v.start_ctx()); - need_key_frame = false; + log::warn!("lost {} RTP packets @ {:?}", v.loss, v.start_ctx()); } if frame_tx.send(Ok(v)).await.is_err() { return; // other end died. } } - _ => {} + Some(Ok(_)) => {} } } }); @@ -391,12 +349,20 @@ impl Opener for RetinaOpener { } impl RetinaOpener { + /// Plays to first frame. No timeout; that's the caller's responsibility. async fn play( url: Url, creds: Option, - ) -> Result<(Session, VideoParameters), Error> { + ) -> Result< + ( + Pin>>>, + VideoParameters, + retina::codec::VideoFrame, + ), + Error, + > { let mut session = retina::client::Session::describe(url, creds).await?; - let (video_i, video_params) = session + let (video_i, mut video_params) = session .streams() .iter() .enumerate() @@ -407,7 +373,24 @@ impl RetinaOpener { .ok_or_else(|| format_err!("couldn't find H.264 video stream"))?; session.setup(video_i).await?; let session = session.play(retina::client::PlayPolicy::default()).await?; - Ok((session, video_params)) + let mut session = Box::pin(session.demuxed()?); + + // First frame. + let first_frame = loop { + if let CodecItem::VideoFrame(mut v) = session + .next() + .await + .unwrap_or_else(|| Err(format_err!("stream closed before first frame")))? + { + if let Some(v) = v.new_parameters.take() { + video_params = v; + } + if v.is_random_access_point { + break v; + } + } + }; + Ok((session, video_params, first_frame)) } }