diff --git a/server/Cargo.lock b/server/Cargo.lock index 2a8cc5b..a98be8d 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -1855,15 +1855,14 @@ dependencies = [ [[package]] name = "retina" -version = "0.0.4" +version = "0.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a2197de61111c2ffd1d4a9c04edf7a883bdbf8ab3e5d7d21fc19fce7547e54f" +checksum = "712fc240bf72a74aafd9dfe0f5ff8e16180b3fb4dd3fec5351a8361a89bf8f57" dependencies = [ "base64", "bitreader", "bytes", "digest_auth", - "failure", "futures", "h264-reader", "hex", @@ -1876,6 +1875,7 @@ dependencies = [ "rtsp-types", "sdp", "smallvec", + "thiserror", "time", "tokio", "tokio-util", @@ -2315,18 +2315,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.24" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0f4a65597094d4483ddaed134f409b2cb7c1beccf25201a9f73c719254fa98e" +checksum = "93119e4feac1cbe6c798c34d3a53ea0026b0b1de6a120deef895137c0529bfe2" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.24" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7765189610d8241a44529806d6fd1f2e0a08734313a35d5b3a556f92b381f3c0" +checksum = "060d69a0afe7796bf42e9e2ff91f5ee691fb15c53d38b4b62a9a53eb23164745" dependencies = [ "proc-macro2", "quote", diff --git a/server/Cargo.toml b/server/Cargo.toml index 45384bc..9d8db13 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -46,7 +46,7 @@ nom = "6.0.0" parking_lot = { version = "0.11.1", features = [] } protobuf = { git = "https://github.com/stepancheg/rust-protobuf" } reffers = "0.6.0" -retina = "0.0.4" +retina = "0.0.5" ring = "0.16.2" rusqlite = "0.25.3" serde = { version = "1.0", features = ["derive"] } diff --git a/server/src/stream.rs b/server/src/stream.rs index cdaa617..899666f 100644 --- a/server/src/stream.rs +++ b/server/src/stream.rs @@ -320,16 +320,24 @@ impl Opener for RetinaOpener { // Read following frames. let mut deadline = tokio::time::Instant::now() + RETINA_TIMEOUT; loop { - let item = tokio::time::timeout_at(deadline, session.next()) - .await - .unwrap_or_else(|_| Some(Err(format_err!("timeout getting next frame")))); - match item { - Some(Err(e)) => { - let _ = frame_tx.send(Err(e)).await; + match tokio::time::timeout_at(deadline, session.next()).await { + Err(_) => { + let _ = frame_tx + .send(Err(format_err!("timeout getting next frame"))) + .await; return; } - None => break, - Some(Ok(CodecItem::VideoFrame(v))) => { + Ok(Some(Err(e))) => { + let _ = frame_tx.send(Err(e.into())).await; + return; + } + Ok(None) => break, + Ok(Some(Ok(CodecItem::VideoFrame(v)))) => { + if let Some(p) = v.new_parameters { + // TODO: we could start a new recording without dropping the connection. + let _ = frame_tx.send(Err(format_err!("parameter; change: {:?}", p))); + return; + } deadline = tokio::time::Instant::now() + RETINA_TIMEOUT; if v.loss > 0 { log::warn!( @@ -343,7 +351,7 @@ impl Opener for RetinaOpener { return; // other end died. } } - Some(Ok(_)) => {} + Ok(Some(Ok(_))) => {} } } }); @@ -369,8 +377,8 @@ impl RetinaOpener { creds: Option, ) -> Result< ( - Pin>>>, - VideoParameters, + Pin>, + Box, retina::codec::VideoFrame, ), Error, @@ -381,7 +389,7 @@ impl RetinaOpener { .iter() .enumerate() .find_map(|(i, s)| match s.parameters() { - Some(retina::codec::Parameters::Video(v)) => Some((i, v.clone())), + Some(retina::codec::Parameters::Video(v)) => Some((i, Box::new(v.clone()))), _ => None, }) .ok_or_else(|| format_err!("couldn't find H.264 video stream"))?; @@ -391,17 +399,18 @@ impl RetinaOpener { // First frame. let first_frame = loop { - if let CodecItem::VideoFrame(mut v) = session - .next() - .await - .unwrap_or_else(|| Err(format_err!("stream closed before first frame")))? - { - if let Some(v) = v.new_parameters.take() { - video_params = v; - } - if v.is_random_access_point { - break v; + match session.next().await { + None => bail!("stream closed before first frame"), + Some(Err(e)) => return Err(e.into()), + Some(Ok(CodecItem::VideoFrame(mut v))) => { + if let Some(v) = v.new_parameters.take() { + video_params = v; + } + if v.is_random_access_point { + break v; + } } + Some(Ok(_)) => {} } }; Ok((session, video_params, first_frame))