upgrade to retina v0.0.5

While I'm here, return a clean error if a non-initial video frame
includes a parameter change, rather than doing something crazy (#42).
It's still broken under ffmpeg, it's untested, and it's not as clean
as seamlessly starting a new recording with the new parameters, but
it's better than nothing.
This commit is contained in:
Scott Lamb 2021-07-08 16:06:30 -07:00
parent f078328935
commit 75e3b85850
3 changed files with 39 additions and 30 deletions

14
server/Cargo.lock generated
View File

@ -1855,15 +1855,14 @@ dependencies = [
[[package]] [[package]]
name = "retina" name = "retina"
version = "0.0.4" version = "0.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a2197de61111c2ffd1d4a9c04edf7a883bdbf8ab3e5d7d21fc19fce7547e54f" checksum = "712fc240bf72a74aafd9dfe0f5ff8e16180b3fb4dd3fec5351a8361a89bf8f57"
dependencies = [ dependencies = [
"base64", "base64",
"bitreader", "bitreader",
"bytes", "bytes",
"digest_auth", "digest_auth",
"failure",
"futures", "futures",
"h264-reader", "h264-reader",
"hex", "hex",
@ -1876,6 +1875,7 @@ dependencies = [
"rtsp-types", "rtsp-types",
"sdp", "sdp",
"smallvec", "smallvec",
"thiserror",
"time", "time",
"tokio", "tokio",
"tokio-util", "tokio-util",
@ -2315,18 +2315,18 @@ dependencies = [
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "1.0.24" version = "1.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0f4a65597094d4483ddaed134f409b2cb7c1beccf25201a9f73c719254fa98e" checksum = "93119e4feac1cbe6c798c34d3a53ea0026b0b1de6a120deef895137c0529bfe2"
dependencies = [ dependencies = [
"thiserror-impl", "thiserror-impl",
] ]
[[package]] [[package]]
name = "thiserror-impl" name = "thiserror-impl"
version = "1.0.24" version = "1.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7765189610d8241a44529806d6fd1f2e0a08734313a35d5b3a556f92b381f3c0" checksum = "060d69a0afe7796bf42e9e2ff91f5ee691fb15c53d38b4b62a9a53eb23164745"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View File

@ -46,7 +46,7 @@ nom = "6.0.0"
parking_lot = { version = "0.11.1", features = [] } parking_lot = { version = "0.11.1", features = [] }
protobuf = { git = "https://github.com/stepancheg/rust-protobuf" } protobuf = { git = "https://github.com/stepancheg/rust-protobuf" }
reffers = "0.6.0" reffers = "0.6.0"
retina = "0.0.4" retina = "0.0.5"
ring = "0.16.2" ring = "0.16.2"
rusqlite = "0.25.3" rusqlite = "0.25.3"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }

View File

@ -320,16 +320,24 @@ impl Opener for RetinaOpener {
// Read following frames. // Read following frames.
let mut deadline = tokio::time::Instant::now() + RETINA_TIMEOUT; let mut deadline = tokio::time::Instant::now() + RETINA_TIMEOUT;
loop { loop {
let item = tokio::time::timeout_at(deadline, session.next()) match tokio::time::timeout_at(deadline, session.next()).await {
.await Err(_) => {
.unwrap_or_else(|_| Some(Err(format_err!("timeout getting next frame")))); let _ = frame_tx
match item { .send(Err(format_err!("timeout getting next frame")))
Some(Err(e)) => { .await;
let _ = frame_tx.send(Err(e)).await;
return; return;
} }
None => break, Ok(Some(Err(e))) => {
Some(Ok(CodecItem::VideoFrame(v))) => { let _ = frame_tx.send(Err(e.into())).await;
return;
}
Ok(None) => break,
Ok(Some(Ok(CodecItem::VideoFrame(v)))) => {
if let Some(p) = v.new_parameters {
// TODO: we could start a new recording without dropping the connection.
let _ = frame_tx.send(Err(format_err!("parameter; change: {:?}", p)));
return;
}
deadline = tokio::time::Instant::now() + RETINA_TIMEOUT; deadline = tokio::time::Instant::now() + RETINA_TIMEOUT;
if v.loss > 0 { if v.loss > 0 {
log::warn!( log::warn!(
@ -343,7 +351,7 @@ impl Opener for RetinaOpener {
return; // other end died. return; // other end died.
} }
} }
Some(Ok(_)) => {} Ok(Some(Ok(_))) => {}
} }
} }
}); });
@ -369,8 +377,8 @@ impl RetinaOpener {
creds: Option<Credentials>, creds: Option<Credentials>,
) -> Result< ) -> Result<
( (
Pin<Box<impl futures::Stream<Item = Result<retina::codec::CodecItem, Error>>>>, Pin<Box<retina::client::Demuxed>>,
VideoParameters, Box<VideoParameters>,
retina::codec::VideoFrame, retina::codec::VideoFrame,
), ),
Error, Error,
@ -381,7 +389,7 @@ impl RetinaOpener {
.iter() .iter()
.enumerate() .enumerate()
.find_map(|(i, s)| match s.parameters() { .find_map(|(i, s)| match s.parameters() {
Some(retina::codec::Parameters::Video(v)) => Some((i, v.clone())), Some(retina::codec::Parameters::Video(v)) => Some((i, Box::new(v.clone()))),
_ => None, _ => None,
}) })
.ok_or_else(|| format_err!("couldn't find H.264 video stream"))?; .ok_or_else(|| format_err!("couldn't find H.264 video stream"))?;
@ -391,17 +399,18 @@ impl RetinaOpener {
// First frame. // First frame.
let first_frame = loop { let first_frame = loop {
if let CodecItem::VideoFrame(mut v) = session match session.next().await {
.next() None => bail!("stream closed before first frame"),
.await Some(Err(e)) => return Err(e.into()),
.unwrap_or_else(|| Err(format_err!("stream closed before first frame")))? Some(Ok(CodecItem::VideoFrame(mut v))) => {
{ if let Some(v) = v.new_parameters.take() {
if let Some(v) = v.new_parameters.take() { video_params = v;
video_params = v; }
} if v.is_random_access_point {
if v.is_random_access_point { break v;
break v; }
} }
Some(Ok(_)) => {}
} }
}; };
Ok((session, video_params, first_frame)) Ok((session, video_params, first_frame))