upgrade to Retina 0.4.0

This commit is contained in:
Scott Lamb 2022-05-17 16:36:39 -07:00
parent 0d2cda5c18
commit 14f70ff4ce
5 changed files with 66 additions and 73 deletions

11
server/Cargo.lock generated
View File

@ -1654,9 +1654,9 @@ dependencies = [
[[package]] [[package]]
name = "retina" name = "retina"
version = "0.3.10" version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01358b10b0e442f1cbe1417a888698c88969bfef230290c0aaec65228238a8ca" checksum = "57bdeafed8d429e892754895f028852e666cdf77c3fb248c3780d70606e3e2c7"
dependencies = [ dependencies = [
"base64", "base64",
"bitreader", "bitreader",
@ -1670,7 +1670,6 @@ dependencies = [
"pin-project", "pin-project",
"pretty-hex", "pretty-hex",
"rand", "rand",
"rtp-rs",
"rtsp-types", "rtsp-types",
"sdp-types", "sdp-types",
"smallvec", "smallvec",
@ -1707,12 +1706,6 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "rtp-rs"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4ed274a5b3d36c4434cff6a4de1b42f43e64ae326b1cfa72d13d9037a314355"
[[package]] [[package]]
name = "rtsp-types" name = "rtsp-types"
version = "0.0.3" version = "0.0.3"

View File

@ -47,7 +47,7 @@ parking_lot = "0.12.0"
password-hash = "0.3.2" password-hash = "0.3.2"
protobuf = "3.0" protobuf = "3.0"
reffers = "0.7.0" reffers = "0.7.0"
retina = "0.3.9" retina = "0.4.0"
ring = "0.16.2" ring = "0.16.2"
rusqlite = "0.27.0" rusqlite = "0.27.0"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }

View File

@ -211,17 +211,15 @@ fn press_test_inner(
transport: retina::client::Transport, transport: retina::client::Transport,
) -> Result<String, Error> { ) -> Result<String, Error> {
let _enter = handle.enter(); let _enter = handle.enter();
let stream = stream::OPENER.open( let options = stream::Options {
"test stream".to_owned(), session: retina::client::SessionOptions::default().creds(if username.is_empty() {
url, None
retina::client::SessionOptions::default() } else {
.creds(if username.is_empty() { Some(retina::client::Credentials { username, password })
None }),
} else { setup: retina::client::SetupOptions::default().transport(transport),
Some(retina::client::Credentials { username, password }) };
}) let stream = stream::OPENER.open("test stream".to_owned(), url, options)?;
.transport(transport),
)?;
let video_sample_entry = stream.video_sample_entry(); let video_sample_entry = stream.video_sample_entry();
Ok(format!( Ok(format!(
"codec: {}\n\ "codec: {}\n\

View File

@ -15,18 +15,18 @@ use url::Url;
static RETINA_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); static RETINA_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
pub struct Options {
pub session: retina::client::SessionOptions,
pub setup: retina::client::SetupOptions,
}
/// Opens a RTSP stream. This is a trait for test injection. /// Opens a RTSP stream. This is a trait for test injection.
pub trait Opener: Send + Sync { pub trait Opener: Send + Sync {
/// Opens the given RTSP URL. /// Opens the given RTSP URL.
/// ///
/// Note: despite the blocking interface, this expects to be called from /// Note: despite the blocking interface, this expects to be called from
/// the context of a multithreaded tokio runtime with IO and time enabled. /// the context of a multithreaded tokio runtime with IO and time enabled.
fn open( fn open(&self, label: String, url: Url, options: Options) -> Result<Box<dyn Stream>, Error>;
&self,
label: String,
url: Url,
options: retina::client::SessionOptions,
) -> Result<Box<dyn Stream>, Error>;
} }
pub struct VideoFrame { pub struct VideoFrame {
@ -57,9 +57,11 @@ impl Opener for RealOpener {
&self, &self,
label: String, label: String,
url: Url, url: Url,
options: retina::client::SessionOptions, mut options: Options,
) -> Result<Box<dyn Stream>, Error> { ) -> Result<Box<dyn Stream>, Error> {
let options = options.user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION"))); options.session = options
.session
.user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION")));
let rt_handle = tokio::runtime::Handle::current(); let rt_handle = tokio::runtime::Handle::current();
let (inner, first_frame) = rt_handle let (inner, first_frame) = rt_handle
.block_on(rt_handle.spawn(tokio::time::timeout( .block_on(rt_handle.spawn(tokio::time::timeout(
@ -111,29 +113,16 @@ impl RetinaStreamInner {
async fn play( async fn play(
label: String, label: String,
url: Url, url: Url,
options: retina::client::SessionOptions, options: Options,
) -> Result<(Box<Self>, retina::codec::VideoFrame), Error> { ) -> Result<(Box<Self>, retina::codec::VideoFrame), Error> {
let mut session = retina::client::Session::describe(url, options).await?; let mut session = retina::client::Session::describe(url, options.session).await?;
log::debug!("connected to {:?}, tool {:?}", &label, session.tool()); log::debug!("connected to {:?}, tool {:?}", &label, session.tool());
let (video_i, mut video_params) = session let video_i = session
.streams() .streams()
.iter() .iter()
.enumerate() .position(|s| s.media() == "video" && s.encoding_name() == "h264")
.find_map(|(i, s)| {
if s.media == "video" && s.encoding_name == "h264" {
Some((
i,
s.parameters().and_then(|p| match p {
retina::codec::Parameters::Video(v) => Some(Box::new(v.clone())),
_ => None,
}),
))
} else {
None
}
})
.ok_or_else(|| format_err!("couldn't find H.264 video stream"))?; .ok_or_else(|| format_err!("couldn't find H.264 video stream"))?;
session.setup(video_i).await?; session.setup(video_i, options.setup).await?;
let session = session.play(retina::client::PlayOptions::default()).await?; let session = session.play(retina::client::PlayOptions::default()).await?;
let mut session = session.demuxed()?; let mut session = session.demuxed()?;
@ -142,22 +131,20 @@ impl RetinaStreamInner {
match Pin::new(&mut session).next().await { match Pin::new(&mut session).next().await {
None => bail!("stream closed before first frame"), None => bail!("stream closed before first frame"),
Some(Err(e)) => return Err(e.into()), Some(Err(e)) => return Err(e.into()),
Some(Ok(CodecItem::VideoFrame(mut v))) => { Some(Ok(CodecItem::VideoFrame(v))) => {
if let Some(v) = v.new_parameters.take() { if v.is_random_access_point() {
video_params = Some(v);
}
if v.is_random_access_point {
break v; break v;
} }
} }
Some(Ok(_)) => {} Some(Ok(_)) => {}
} }
}; };
let video_sample_entry = h264::parse_extra_data( let video_params = match session.streams()[video_i].parameters() {
video_params Some(retina::codec::ParametersRef::Video(v)) => v.clone(),
.ok_or_else(|| format_err!("couldn't find H.264 parameters"))? Some(_) => unreachable!(),
.extra_data(), None => bail!("couldn't find H.264 parameters"),
)?; };
let video_sample_entry = h264::parse_extra_data(video_params.extra_data())?;
let self_ = Box::new(Self { let self_ = Box::new(Self {
label, label,
session, session,
@ -169,20 +156,35 @@ impl RetinaStreamInner {
/// Fetches a non-initial frame. /// Fetches a non-initial frame.
async fn fetch_next_frame( async fn fetch_next_frame(
mut self: Box<Self>, mut self: Box<Self>,
) -> Result<(Box<Self>, retina::codec::VideoFrame), Error> { ) -> Result<
(
Box<Self>,
retina::codec::VideoFrame,
Option<retina::codec::VideoParameters>,
),
Error,
> {
loop { loop {
match Pin::new(&mut self.session).next().await.transpose()? { match Pin::new(&mut self.session).next().await.transpose()? {
None => bail!("end of stream"), None => bail!("end of stream"),
Some(CodecItem::VideoFrame(v)) => { Some(CodecItem::VideoFrame(v)) => {
if v.loss > 0 { if v.loss() > 0 {
log::warn!( log::warn!(
"{}: lost {} RTP packets @ {}", "{}: lost {} RTP packets @ {}",
&self.label, &self.label,
v.loss, v.loss(),
v.start_ctx() v.start_ctx()
); );
} }
return Ok((self, v)); let p = if v.has_new_parameters() {
Some(match self.session.streams()[v.stream_id()].parameters() {
Some(retina::codec::ParametersRef::Video(v)) => v.clone(),
_ => unreachable!(),
})
} else {
None
};
return Ok((self, v, p));
} }
Some(_) => {} Some(_) => {}
} }
@ -206,7 +208,7 @@ impl Stream for RetinaStream {
.map(|f| Ok((f, false))) .map(|f| Ok((f, false)))
.unwrap_or_else(move || { .unwrap_or_else(move || {
let inner = self.inner.take().unwrap(); let inner = self.inner.take().unwrap();
let (mut inner, mut frame) = self let (mut inner, frame, new_parameters) = self
.rt_handle .rt_handle
.block_on(self.rt_handle.spawn(tokio::time::timeout( .block_on(self.rt_handle.spawn(tokio::time::timeout(
RETINA_TIMEOUT, RETINA_TIMEOUT,
@ -215,7 +217,7 @@ impl Stream for RetinaStream {
.expect("fetch_next_frame task panicked, see earlier error") .expect("fetch_next_frame task panicked, see earlier error")
.map_err(|_| format_err!("timeout getting next frame"))??; .map_err(|_| format_err!("timeout getting next frame"))??;
let mut new_video_sample_entry = false; let mut new_video_sample_entry = false;
if let Some(p) = frame.new_parameters.take() { if let Some(p) = new_parameters {
let video_sample_entry = h264::parse_extra_data(p.extra_data())?; let video_sample_entry = h264::parse_extra_data(p.extra_data())?;
if video_sample_entry != inner.video_sample_entry { if video_sample_entry != inner.video_sample_entry {
log::debug!( log::debug!(
@ -232,10 +234,10 @@ impl Stream for RetinaStream {
Ok::<_, failure::Error>((frame, new_video_sample_entry)) Ok::<_, failure::Error>((frame, new_video_sample_entry))
})?; })?;
Ok(VideoFrame { Ok(VideoFrame {
pts: frame.timestamp.elapsed(), pts: frame.timestamp().elapsed(),
duration: 0, duration: 0,
is_key: frame.is_random_access_point, is_key: frame.is_random_access_point(),
data: frame.into_data(), data: frame.into_data().into(),
new_video_sample_entry, new_video_sample_entry,
}) })
} }

View File

@ -161,10 +161,8 @@ where
let mut stream = { let mut stream = {
let _t = TimerGuard::new(&clocks, || format!("opening {}", self.url.as_str())); let _t = TimerGuard::new(&clocks, || format!("opening {}", self.url.as_str()));
self.opener.open( let options = stream::Options {
self.short_name.clone(), session: retina::client::SessionOptions::default()
self.url.clone(),
retina::client::SessionOptions::default()
.creds(if self.username.is_empty() { .creds(if self.username.is_empty() {
None None
} else { } else {
@ -173,9 +171,11 @@ where
password: self.password.clone(), password: self.password.clone(),
}) })
}) })
.transport(self.transport)
.session_group(self.session_group.clone()), .session_group(self.session_group.clone()),
)? setup: retina::client::SetupOptions::default().transport(self.transport.clone()),
};
self.opener
.open(self.short_name.clone(), self.url.clone(), options)?
}; };
let realtime_offset = self.db.clocks().realtime() - clocks.monotonic(); let realtime_offset = self.db.clocks().realtime() - clocks.monotonic();
let mut video_sample_entry_id = { let mut video_sample_entry_id = {
@ -382,7 +382,7 @@ mod tests {
&self, &self,
_label: String, _label: String,
url: url::Url, url: url::Url,
_options: retina::client::SessionOptions, _options: stream::Options,
) -> Result<Box<dyn stream::Stream>, Error> { ) -> Result<Box<dyn stream::Stream>, Error> {
assert_eq!(&url, &self.expected_url); assert_eq!(&url, &self.expected_url);
let mut l = self.streams.lock(); let mut l = self.streams.lock();