support --rtsp-library=retina (#37)

This isn't well-tested and doesn't yet support an initial connection
timeout. But in a quick test, it successfully returns video!

I'd like to do some more aggressive code restructuring for zero-copy
and to have only one writer thread per sample file directory (rather
than the syncer thread + one writer thread per RTSP stream). But I'll
likely wait until I drop support for ffmpeg entirely.
This commit is contained in:
Scott Lamb 2021-06-07 14:36:53 -07:00
parent 7699696bd9
commit 032bd76577
10 changed files with 582 additions and 200 deletions

125
server/Cargo.lock generated
View File

@ -83,6 +83,27 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
name = "async-stream"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625"
dependencies = [
"async-stream-impl",
"futures-core",
]
[[package]]
name = "async-stream-impl"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "atty"
version = "0.2.14"
@ -322,6 +343,12 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
[[package]]
name = "cookie-factory"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b"
[[package]]
name = "cpufeatures"
version = "0.1.4"
@ -507,6 +534,19 @@ dependencies = [
"generic-array",
]
[[package]]
name = "digest_auth"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa30657988b2ced88f68fe490889e739bf98d342916c33ed3100af1d6f1cbc9c"
dependencies = [
"digest",
"hex",
"md-5",
"rand",
"sha2",
]
[[package]]
name = "dirs"
version = "1.0.5"
@ -862,6 +902,12 @@ dependencies = [
"libc",
]
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "hmac"
version = "0.11.0"
@ -1120,6 +1166,17 @@ version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
[[package]]
name = "md-5"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15"
dependencies = [
"block-buffer",
"digest",
"opaque-debug",
]
[[package]]
name = "memchr"
version = "2.4.0"
@ -1260,6 +1317,7 @@ dependencies = [
"protobuf",
"reffers",
"reqwest",
"retina",
"ring",
"rusqlite",
"serde",
@ -1812,6 +1870,35 @@ dependencies = [
"winreg",
]
[[package]]
name = "retina"
version = "0.0.1"
source = "git+https://github.com/scottlamb/retina?branch=main#1bcc864344cf54fdb691a0e11669a663d3c1d7c9"
dependencies = [
"async-stream",
"base64",
"bitreader",
"bytes",
"digest_auth",
"failure",
"futures",
"h264-reader",
"hex",
"log",
"once_cell",
"pin-project",
"pretty-hex",
"rtcp",
"rtp-rs",
"rtsp-types",
"sdp",
"smallvec",
"time",
"tokio",
"tokio-util",
"url",
]
[[package]]
name = "rfc6381-codec"
version = "0.1.0"
@ -1848,6 +1935,33 @@ dependencies = [
"winapi",
]
[[package]]
name = "rtcp"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f5fb4431b04a948fd91622a75d65a95da3ed2f0be26c902f3d027a23b78fbc96"
dependencies = [
"bytes",
"thiserror",
]
[[package]]
name = "rtp-rs"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1110d695193d446e901de09921ffbf2d86ae351bbfde9c5b53863ce177e17f5"
[[package]]
name = "rtsp-types"
version = "0.0.2"
source = "git+https://github.com/sdroege/rtsp-types#53bdf9a74175946572eb3509a6343696724123eb"
dependencies = [
"cookie-factory",
"nom",
"tinyvec",
"url",
]
[[package]]
name = "rusqlite"
version = "0.25.3"
@ -1922,6 +2036,17 @@ dependencies = [
"sha2",
]
[[package]]
name = "sdp"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f14aa4ddf1473d50e1664f5c353017981f43f27ede9136a54f60c4bb56d8b152"
dependencies = [
"rand",
"thiserror",
"url",
]
[[package]]
name = "serde"
version = "1.0.126"

View File

@ -45,6 +45,7 @@ nom = "6.0.0"
parking_lot = { version = "0.11.1", features = [] }
protobuf = { git = "https://github.com/stepancheg/rust-protobuf" }
reffers = "0.6.0"
retina = { git = "https://github.com/scottlamb/retina", branch = "main" }
ring = "0.16.2"
rusqlite = "0.25.3"
serde = { version = "1.0", features = ["derive"] }

View File

@ -350,8 +350,8 @@ pub struct Camera {
pub short_name: String,
pub description: String,
pub onvif_host: String,
pub username: String,
pub password: String,
pub username: Option<String>,
pub password: Option<String>,
pub streams: [Option<i32>; 2],
}
@ -500,8 +500,8 @@ pub struct CameraChange {
pub short_name: String,
pub description: String,
pub onvif_host: String,
pub username: String,
pub password: String,
pub username: Option<String>,
pub password: Option<String>,
/// `StreamType t` is represented by `streams[t.index()]`. A default StreamChange will
/// correspond to no stream in the database, provided there are no existing recordings for that
@ -2361,8 +2361,8 @@ mod tests {
camera_id = row.id;
assert_eq!(uuid, row.uuid);
assert_eq!("test-camera", row.onvif_host);
assert_eq!("foo", row.username);
assert_eq!("bar", row.password);
assert_eq!(Some("foo"), row.username.as_deref());
assert_eq!(Some("bar"), row.password.as_deref());
//assert_eq!("/main", row.main_rtsp_url);
//assert_eq!("/sub", row.sub_rtsp_url);
//assert_eq!(42, row.retain_bytes);
@ -2513,8 +2513,8 @@ mod tests {
short_name: "testcam".to_owned(),
description: "".to_owned(),
onvif_host: "test-camera".to_owned(),
username: "foo".to_owned(),
password: "bar".to_owned(),
username: Some("foo".to_owned()),
password: Some("bar".to_owned()),
streams: [
StreamChange {
sample_file_dir_id: Some(sample_file_dir_id),

View File

@ -84,8 +84,8 @@ impl<C: Clocks + Clone> TestDb<C> {
short_name: "test camera".to_owned(),
description: "".to_owned(),
onvif_host: "test-camera".to_owned(),
username: "foo".to_owned(),
password: "bar".to_owned(),
username: Some("foo".to_owned()),
password: Some("bar".to_owned()),
streams: [
db::StreamChange {
sample_file_dir_id: Some(sample_file_dir_id),

View File

@ -2,7 +2,7 @@
// Copyright (C) 2020 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt.
// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception.
use crate::stream::{self, Opener, Stream};
use crate::stream::{self, Opener};
use base::strutil::{decode_size, encode_size};
use cursive::traits::{Boxable, Finder, Identifiable};
use cursive::views;
@ -35,24 +35,30 @@ fn get_change(siv: &mut Cursive) -> db::CameraChange {
.get_content()
.as_str()
.into();
let u = siv
let username = match siv
.find_name::<views::EditView>("username")
.unwrap()
.get_content()
.as_str()
.into();
let p = siv
{
"" => None,
u => Some(u.to_owned()),
};
let password = match siv
.find_name::<views::EditView>("password")
.unwrap()
.get_content()
.as_str()
.into();
{
"" => None,
p => Some(p.to_owned()),
};
let mut c = db::CameraChange {
short_name: sn,
description: d,
onvif_host: h,
username: u,
password: p,
username,
password,
streams: Default::default(),
};
for &t in &db::ALL_STREAM_TYPES {
@ -114,12 +120,16 @@ fn press_edit(siv: &mut Cursive, db: &Arc<db::Database>, id: Option<i32>) {
}
}
fn press_test_inner(url: &Url) -> Result<String, Error> {
let stream = stream::FFMPEG.open(stream::Source::Rtsp {
url: url.as_str(),
redacted_url: url.as_str(), // don't need redaction in config UI.
fn press_test_inner(
url: Url,
username: Option<String>,
password: Option<String>,
) -> Result<String, Error> {
let (extra_data, _stream) = stream::FFMPEG.open(stream::Source::Rtsp {
url,
username,
password,
})?;
let extra_data = stream.get_extra_data()?;
Ok(format!(
"{}x{} video stream",
extra_data.entry.width, extra_data.entry.height
@ -128,7 +138,7 @@ fn press_test_inner(url: &Url) -> Result<String, Error> {
fn press_test(siv: &mut Cursive, t: db::StreamType) {
let c = get_change(siv);
let mut url = match Url::parse(&c.streams[t.index()].rtsp_url) {
let url = match Url::parse(&c.streams[t.index()].rtsp_url) {
Ok(u) => u,
Err(e) => {
siv.add_layer(
@ -139,11 +149,9 @@ fn press_test(siv: &mut Cursive, t: db::StreamType) {
return;
}
};
let username = c.username;
let password = c.password;
if !c.username.is_empty() {
let _ = url.set_username(&c.username);
let _ = url.set_password(Some(&c.password));
}
siv.add_layer(
views::Dialog::text(format!(
"Testing {} stream at {}. This may take a while \
@ -159,7 +167,7 @@ fn press_test(siv: &mut Cursive, t: db::StreamType) {
siv.set_fps(5);
let sink = siv.cb_sink().clone();
::std::thread::spawn(move || {
let r = press_test_inner(&url);
let r = press_test_inner(url.clone(), username, password);
sink.send(Box::new(move |siv: &mut Cursive| {
// Polling is no longer necessary.
siv.set_fps(0);
@ -442,8 +450,8 @@ fn edit_camera_dialog(db: &Arc<db::Database>, siv: &mut Cursive, item: &Option<i
for &(view_id, content) in &[
("short_name", &*camera.short_name),
("onvif_host", &*camera.onvif_host),
("username", &*camera.username),
("password", &*camera.password),
("username", camera.username.as_deref().unwrap_or("")),
("password", camera.password.as_deref().unwrap_or("")),
] {
dialog
.call_on_name(view_id, |v: &mut views::EditView| {

View File

@ -2,7 +2,6 @@
// Copyright (C) 2020 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt.
// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception.
use crate::stream;
use crate::streamer;
use crate::web;
use base::clock;
@ -65,6 +64,14 @@ pub struct Args {
/// --http-addr=127.0.0.1:8080.
#[structopt(long)]
trust_forward_hdrs: bool,
/// RTSP library to use for fetching the cameras' video stream.
/// Moonfire NVR is in the process of switching from `ffmpeg` (the current
/// default, used since the beginning of the project) to `retina` (a
/// pure-Rust RTSP library developed by Moonfire NVR's author). `retina`
/// is still experimental.
#[structopt(long, default_value = "ffmpeg", parse(try_from_str))]
rtsp_library: crate::stream::RtspLibrary,
}
// These are used in a hack to get the name of the current time zone (e.g. America/Los_Angeles).
@ -203,7 +210,7 @@ pub async fn run(args: &Args) -> Result<i32, Error> {
let streams = l.streams_by_id().len();
let env = streamer::Environment {
db: &db,
opener: &*stream::FFMPEG,
opener: args.rtsp_library.opener(),
shutdown: &shutdown_streamers,
};
@ -227,6 +234,7 @@ pub async fn run(args: &Args) -> Result<i32, Error> {
}
// Then start up streams.
let handle = tokio::runtime::Handle::current();
let l = db.lock();
for (i, (id, stream)) in l.streams_by_id().iter().enumerate() {
if !stream.record {
@ -259,10 +267,12 @@ pub async fn run(args: &Args) -> Result<i32, Error> {
)?;
info!("Starting streamer for {}", streamer.short_name());
let name = format!("s-{}", streamer.short_name());
let handle = handle.clone();
streamers.push(
thread::Builder::new()
.name(name)
.spawn(move || {
let _enter = handle.enter();
streamer.run();
})
.expect("can't create thread"),

View File

@ -70,8 +70,8 @@ pub struct Camera<'a> {
#[serde(rename_all = "camelCase")]
pub struct CameraConfig<'a> {
pub onvif_host: &'a str,
pub username: &'a str,
pub password: &'a str,
pub username: Option<&'a str>,
pub password: Option<&'a str>,
}
#[derive(Debug, Serialize)]
@ -191,8 +191,8 @@ impl<'a> Camera<'a> {
false => None,
true => Some(CameraConfig {
onvif_host: &c.onvif_host,
username: &c.username,
password: &c.password,
username: c.username.as_deref(),
password: c.password.as_deref(),
}),
},
streams: [

View File

@ -1971,7 +1971,7 @@ impl fmt::Debug for File {
#[cfg(test)]
mod tests {
use super::*;
use crate::stream::{self, Opener, Stream};
use crate::stream::{self, Opener};
use base::clock::RealClocks;
use byteorder::{BigEndian, ByteOrder};
use db::recording::{self, TIME_UNITS_PER_SEC};
@ -2255,13 +2255,12 @@ mod tests {
}
fn copy_mp4_to_db(db: &TestDb<RealClocks>) {
let mut input = stream::FFMPEG
let (extra_data, mut input) = stream::FFMPEG
.open(stream::Source::File("src/testdata/clip.mp4"))
.unwrap();
// 2015-04-26 00:00:00 UTC.
const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC);
let extra_data = input.get_extra_data().unwrap();
let video_sample_entry_id = db
.db
.lock()
@ -2286,26 +2285,20 @@ mod tests {
let mut frame_time = START_TIME;
loop {
let pkt = match input.get_next() {
let pkt = match input.next() {
Ok(p) => p,
Err(e) if e.is_eof() => {
Err(e) if e.to_string().contains("End of file") => {
break;
}
Err(e) => {
panic!("unexpected input error: {}", e);
}
};
let pts = pkt.pts().unwrap();
frame_time += recording::Duration(pkt.duration() as i64);
frame_time += recording::Duration(i64::from(pkt.duration));
output
.write(
pkt.data().expect("packet without data"),
frame_time,
pts,
pkt.is_key(),
)
.write(pkt.data, frame_time, pkt.pts, pkt.is_key)
.unwrap();
end_pts = Some(pts + pkt.duration() as i64);
end_pts = Some(pkt.pts + i64::from(pkt.duration));
}
output.close(end_pts).unwrap();
db.syncer_channel.flush();
@ -2373,28 +2366,25 @@ mod tests {
}
fn compare_mp4s(new_filename: &str, pts_offset: i64, shorten: i64) {
let mut orig = stream::FFMPEG
let (orig_extra_data, mut orig) = stream::FFMPEG
.open(stream::Source::File("src/testdata/clip.mp4"))
.unwrap();
let mut new = stream::FFMPEG
let (new_extra_data, mut new) = stream::FFMPEG
.open(stream::Source::File(new_filename))
.unwrap();
assert_eq!(
orig.get_extra_data().unwrap(),
new.get_extra_data().unwrap()
);
assert_eq!(orig_extra_data, new_extra_data);
let mut final_durations = None;
loop {
let orig_pkt = match orig.get_next() {
let orig_pkt = match orig.next() {
Ok(p) => Some(p),
Err(e) if e.is_eof() => None,
Err(e) if e.to_string() == "End of file" => None,
Err(e) => {
panic!("unexpected input error: {}", e);
}
};
let new_pkt = match new.get_next() {
let new_pkt = match new.next() {
Ok(p) => Some(p),
Err(e) if e.is_eof() => {
Err(e) if e.to_string() == "End of file" => {
break;
}
Err(e) => {
@ -2406,11 +2396,10 @@ mod tests {
(None, None) => break,
(o, n) => panic!("orig: {} new: {}", o.is_some(), n.is_some()),
};
assert_eq!(orig_pkt.pts().unwrap(), new_pkt.pts().unwrap() + pts_offset);
assert_eq!(orig_pkt.dts(), new_pkt.dts() + pts_offset);
assert_eq!(orig_pkt.data(), new_pkt.data());
assert_eq!(orig_pkt.is_key(), new_pkt.is_key());
final_durations = Some((orig_pkt.duration() as i64, new_pkt.duration() as i64));
assert_eq!(orig_pkt.pts, new_pkt.pts + pts_offset);
assert_eq!(orig_pkt.data, new_pkt.data);
assert_eq!(orig_pkt.is_key, new_pkt.is_key);
final_durations = Some((i64::from(orig_pkt.duration), i64::from(new_pkt.duration)));
}
if let Some((orig_dur, new_dur)) = final_durations {

View File

@ -3,52 +3,108 @@
// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception.
use crate::h264;
use bytes::Buf;
use cstr::cstr;
use failure::format_err;
use failure::{bail, Error};
use futures::StreamExt;
use lazy_static::lazy_static;
use log::warn;
use retina::client::{Credentials, Playing, Session};
use retina::codec::{CodecItem, VideoParameters};
use std::convert::TryFrom;
use std::ffi::CString;
use std::num::NonZeroU32;
use std::result::Result;
use url::Url;
static START: parking_lot::Once = parking_lot::Once::new();
static START_FFMPEG: parking_lot::Once = parking_lot::Once::new();
lazy_static! {
pub static ref FFMPEG: Ffmpeg = Ffmpeg::new();
}
pub enum RtspLibrary {
Ffmpeg,
Retina,
}
impl std::str::FromStr for RtspLibrary {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"ffmpeg" => RtspLibrary::Ffmpeg,
"retina" => RtspLibrary::Retina,
_ => bail!("unknown RTSP library {:?}", s),
})
}
}
impl RtspLibrary {
pub fn opener(&self) -> &'static dyn Opener {
match self {
RtspLibrary::Ffmpeg => &*FFMPEG,
RtspLibrary::Retina => &RETINA,
}
}
}
#[cfg(test)]
pub enum Source<'a> {
/// A filename, for testing.
#[cfg(test)]
File(&'a str),
/// An RTSP stream, for production use.
Rtsp { url: &'a str, redacted_url: &'a str },
Rtsp {
url: Url,
username: Option<String>,
password: Option<String>,
},
}
pub trait Opener<S: Stream>: Sync {
fn open(&self, src: Source) -> Result<S, Error>;
#[cfg(not(test))]
pub enum Source {
/// An RTSP stream, for production use.
Rtsp {
url: Url,
username: Option<String>,
password: Option<String>,
},
}
pub trait Stream {
fn get_video_codecpar(&self) -> ffmpeg::avcodec::InputCodecParameters<'_>;
fn get_extra_data(&self) -> Result<h264::ExtraData, Error>;
fn get_next(&mut self) -> Result<ffmpeg::avcodec::Packet, ffmpeg::Error>;
pub trait Opener: Send + Sync {
fn open(&self, src: Source) -> Result<(h264::ExtraData, Box<dyn Stream>), Error>;
}
pub struct VideoFrame<'a> {
pub pts: i64,
/// An estimate of the duration of the frame, or zero.
/// This can be deceptive and is only used by some testing code.
pub duration: i32,
pub is_key: bool,
pub data: &'a [u8],
}
pub trait Stream: Send {
fn next(&mut self) -> Result<VideoFrame, Error>;
}
pub struct Ffmpeg {}
impl Ffmpeg {
fn new() -> Ffmpeg {
START.call_once(|| {
START_FFMPEG.call_once(|| {
ffmpeg::Ffmpeg::new();
});
Ffmpeg {}
}
}
impl Opener<FfmpegStream> for Ffmpeg {
fn open(&self, src: Source) -> Result<FfmpegStream, Error> {
impl Opener for Ffmpeg {
fn open(&self, src: Source) -> Result<(h264::ExtraData, Box<dyn Stream>), Error> {
use ffmpeg::avformat::InputFormatContext;
let mut input = match src {
#[cfg(test)]
@ -72,7 +128,11 @@ impl Opener<FfmpegStream> for Ffmpeg {
}
i
}
Source::Rtsp { url, redacted_url } => {
Source::Rtsp {
url,
username,
password,
} => {
let mut open_options = ffmpeg::avutil::Dictionary::new();
open_options
.set(cstr!("rtsp_transport"), cstr!("tcp"))
@ -98,11 +158,23 @@ impl Opener<FfmpegStream> for Ffmpeg {
.set(cstr!("allowed_media_types"), cstr!("video"))
.unwrap();
let i = InputFormatContext::open(&CString::new(url).unwrap(), &mut open_options)?;
let mut url_with_credentials = url.clone();
if let Some(u) = username.as_deref() {
url_with_credentials
.set_username(u)
.map_err(|_| format_err!("unable to set username on url {}", url))?;
}
url_with_credentials
.set_password(password.as_deref())
.map_err(|_| format_err!("unable to set password on url {}", url))?;
let i = InputFormatContext::open(
&CString::new(url_with_credentials.as_str())?,
&mut open_options,
)?;
if !open_options.empty() {
warn!(
"While opening URL {}, some options were not understood: {}",
redacted_url, open_options
url, open_options
);
}
i
@ -127,22 +199,12 @@ impl Opener<FfmpegStream> for Ffmpeg {
None => bail!("no video stream"),
};
Ok(FfmpegStream { input, video_i })
let video = input.streams().get(video_i);
let codec = video.codecpar();
let codec_id = codec.codec_id();
if !codec_id.is_h264() {
bail!("stream's video codec {:?} is not h264", codec_id);
}
}
pub struct FfmpegStream {
input: ffmpeg::avformat::InputFormatContext<'static>,
video_i: usize,
}
impl Stream for FfmpegStream {
fn get_video_codecpar(&self) -> ffmpeg::avcodec::InputCodecParameters {
self.input.streams().get(self.video_i).codecpar()
}
fn get_extra_data(&self) -> Result<h264::ExtraData, Error> {
let video = self.input.streams().get(self.video_i);
let tb = video.time_base();
if tb.num != 1 || tb.den != 90000 {
bail!(
@ -151,25 +213,234 @@ impl Stream for FfmpegStream {
tb.den
);
}
let codec = video.codecpar();
let codec_id = codec.codec_id();
if !codec_id.is_h264() {
bail!("stream's video codec {:?} is not h264", codec_id);
}
let dims = codec.dims();
h264::ExtraData::parse(
let extra_data = h264::ExtraData::parse(
codec.extradata(),
u16::try_from(dims.width)?,
u16::try_from(dims.height)?,
)
}
fn get_next(&mut self) -> Result<ffmpeg::avcodec::Packet, ffmpeg::Error> {
loop {
let p = self.input.read_frame()?;
if p.stream_index() == self.video_i {
return Ok(p);
}
}
)?;
let need_transform = extra_data.need_transform;
let stream = Box::new(FfmpegStream {
input,
video_i,
data: Vec::new(),
need_transform,
});
Ok((extra_data, stream))
}
}
struct FfmpegStream {
input: ffmpeg::avformat::InputFormatContext<'static>,
video_i: usize,
data: Vec<u8>,
need_transform: bool,
}
impl Stream for FfmpegStream {
fn next(&mut self) -> Result<VideoFrame, Error> {
let pkt = loop {
let pkt = self.input.read_frame()?;
if pkt.stream_index() == self.video_i {
break pkt;
}
};
let data = pkt
.data()
.ok_or_else(|| format_err!("packet with no data"))?;
if self.need_transform {
h264::transform_sample_data(data, &mut self.data)?;
} else {
// This copy isn't strictly necessary, but this path is only taken in testing anyway.
self.data.clear();
self.data.extend_from_slice(data);
}
let pts = pkt.pts().ok_or_else(|| format_err!("packet with no pts"))?;
Ok(VideoFrame {
pts,
is_key: pkt.is_key(),
duration: pkt.duration(),
data: &self.data,
})
}
}
pub struct RetinaOpener {}
pub const RETINA: RetinaOpener = RetinaOpener {};
impl Opener for RetinaOpener {
fn open(&self, src: Source) -> Result<(h264::ExtraData, Box<dyn Stream>), Error> {
let (startup_tx, startup_rx) = tokio::sync::oneshot::channel();
let (frame_tx, frame_rx) = tokio::sync::mpsc::channel(1);
let handle = tokio::runtime::Handle::current();
let (url, username, password) = match src {
#[cfg(test)]
Source::File(_) => bail!("Retina doesn't support .mp4 files"),
Source::Rtsp {
url,
username,
password,
} => (url, username, password),
};
let creds = match (username, password) {
(None, None) => None,
(Some(username), Some(password)) => Some(Credentials { username, password }),
_ => bail!("expected username and password together"),
};
// TODO: connection timeout.
handle.spawn(async move {
let (session, mut video_params) = match RetinaOpener::play(url, creds).await {
Err(e) => {
let _ = startup_tx.send(Err(e));
return;
}
Ok((s, v)) => (s, v),
};
let session = match session.demuxed() {
Ok(s) => s,
Err(e) => {
let _ = startup_tx.send(Err(e));
return;
}
};
tokio::pin!(session);
// First frame.
loop {
match session.next().await {
Some(Err(e)) => {
let _ = startup_tx.send(Err(e));
return;
}
Some(Ok(CodecItem::VideoFrame(mut v))) => {
if let Some(v) = v.new_parameters.take() {
video_params = v;
}
if v.is_random_access_point {
if startup_tx.send(Ok(video_params)).is_err() {
return;
}
if frame_tx.send(Ok(v)).await.is_err() {
return;
}
break;
}
}
Some(Ok(_)) => {}
None => {
let _ =
startup_tx.send(Err(format_err!("stream closed before first frame")));
return;
}
}
}
// Following frames.
let mut need_key_frame = false;
while let Some(item) = session.next().await {
match item {
Err(e) => {
let _ = frame_tx.send(Err(e)).await;
return;
}
Ok(CodecItem::VideoFrame(v)) => {
if v.loss > 0 {
if !v.is_random_access_point {
log::info!(
"lost {} RTP packets; waiting for next key frame @ {:?}",
v.loss,
v.start_ctx()
);
need_key_frame = true;
continue;
} else {
log::info!(
"lost {} RTP packets; already have key frame @ {:?}",
v.loss,
v.start_ctx()
);
need_key_frame = false;
}
} else if need_key_frame && !v.is_random_access_point {
continue;
} else if need_key_frame {
log::info!("recovering from loss with key frame @ {:?}", v.start_ctx());
need_key_frame = false;
}
if frame_tx.send(Ok(v)).await.is_err() {
return; // other end died.
}
}
_ => {}
}
}
});
let video_params = handle.block_on(startup_rx)??;
let dims = video_params.pixel_dimensions();
let extra_data = h264::ExtraData::parse(
video_params.extra_data(),
u16::try_from(dims.0)?,
u16::try_from(dims.1)?,
)?;
let stream = Box::new(RetinaStream {
frame_rx,
data: Vec::new(),
});
Ok((extra_data, stream))
}
}
impl RetinaOpener {
async fn play(
url: Url,
creds: Option<Credentials>,
) -> Result<(Session<Playing>, VideoParameters), Error> {
let mut session = retina::client::Session::describe(url, creds).await?;
let (video_i, video_params) = session
.streams()
.iter()
.enumerate()
.find_map(|(i, s)| match s.parameters() {
Some(retina::codec::Parameters::Video(v)) => Some((i, v.clone())),
_ => None,
})
.ok_or_else(|| format_err!("couldn't find H.264 video stream"))?;
session.setup(video_i).await?;
let session = session
.play(
retina::client::PlayPolicy::default()
.enforce_timestamps_with_max_jump_secs(NonZeroU32::new(10).unwrap()),
)
.await?;
Ok((session, video_params))
}
}
struct RetinaStream {
frame_rx: tokio::sync::mpsc::Receiver<Result<retina::codec::VideoFrame, Error>>,
data: Vec<u8>,
}
impl Stream for RetinaStream {
fn next(&mut self) -> Result<VideoFrame, Error> {
let mut frame = self
.frame_rx
.blocking_recv()
.ok_or_else(|| format_err!("stream ended"))??;
self.data.clear();
while frame.has_remaining() {
let chunk = frame.chunk();
self.data.extend_from_slice(chunk);
let len = chunk.len();
frame.advance(len);
}
Ok(VideoFrame {
pts: frame.timestamp.elapsed(),
duration: 0,
is_key: frame.is_random_access_point,
data: &self.data,
})
}
}

View File

@ -2,11 +2,10 @@
// Copyright (C) 2020 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt.
// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception.
use crate::h264;
use crate::stream;
use base::clock::{Clocks, TimerGuard};
use db::{dir, recording, writer, Camera, Database, Stream};
use failure::{bail, format_err, Error};
use failure::{bail, Error};
use log::{debug, info, trace, warn};
use std::result::Result;
use std::sync::atomic::{AtomicBool, Ordering};
@ -16,22 +15,20 @@ use url::Url;
pub static ROTATE_INTERVAL_SEC: i64 = 60;
/// Common state that can be used by multiple `Streamer` instances.
pub struct Environment<'a, 'b, C, S>
pub struct Environment<'a, 'tmp, C>
where
C: Clocks + Clone,
S: 'a + stream::Stream,
{
pub opener: &'a dyn stream::Opener<S>,
pub db: &'b Arc<Database<C>>,
pub shutdown: &'b Arc<AtomicBool>,
pub opener: &'a dyn stream::Opener,
pub db: &'tmp Arc<Database<C>>,
pub shutdown: &'tmp Arc<AtomicBool>,
}
/// Connects to a given RTSP stream and writes recordings to the database via [`writer::Writer`].
/// Streamer is meant to be long-lived; it will sleep and retry after each failure.
pub struct Streamer<'a, C, S>
pub struct Streamer<'a, C>
where
C: Clocks + Clone,
S: 'a + stream::Stream,
{
shutdown: Arc<AtomicBool>,
@ -41,20 +38,20 @@ where
db: Arc<Database<C>>,
dir: Arc<dir::SampleFileDir>,
syncer_channel: writer::SyncerChannel<::std::fs::File>,
opener: &'a dyn stream::Opener<S>,
opener: &'a dyn stream::Opener,
stream_id: i32,
short_name: String,
url: Url,
redacted_url: Url,
username: Option<String>,
password: Option<String>,
}
impl<'a, C, S> Streamer<'a, C, S>
impl<'a, C> Streamer<'a, C>
where
C: 'a + Clocks + Clone,
S: 'a + stream::Stream,
{
pub fn new<'b>(
env: &Environment<'a, 'b, C, S>,
pub fn new<'tmp>(
env: &Environment<'a, 'tmp, C>,
dir: Arc<dir::SampleFileDir>,
syncer_channel: writer::SyncerChannel<::std::fs::File>,
stream_id: i32,
@ -63,14 +60,9 @@ where
rotate_offset_sec: i64,
rotate_interval_sec: i64,
) -> Result<Self, Error> {
let mut url = Url::parse(&s.rtsp_url)?;
let mut redacted_url = url.clone();
if !c.username.is_empty() {
url.set_username(&c.username)
.map_err(|_| format_err!("can't set username"))?;
redacted_url.set_username(&c.username).unwrap();
url.set_password(Some(&c.password)).unwrap();
redacted_url.set_password(Some("redacted")).unwrap();
let url = Url::parse(&s.rtsp_url)?;
if !url.username().is_empty() || url.password().is_some() {
bail!("RTSP URL shouldn't include credentials");
}
Ok(Streamer {
shutdown: env.shutdown.clone(),
@ -83,7 +75,8 @@ where
stream_id,
short_name: format!("{}-{}", c.short_name, s.type_.as_str()),
url,
redacted_url,
username: c.username.clone(),
password: c.password.clone(),
})
}
@ -91,6 +84,9 @@ where
&self.short_name
}
/// Runs the streamer; blocks.
/// Note that when using Retina as the RTSP library, this must be called
/// within a tokio runtime context; see [tokio::runtime::Handle].
pub fn run(&mut self) {
while !self.shutdown.load(Ordering::SeqCst) {
if let Err(e) = self.run_once() {
@ -108,18 +104,18 @@ where
}
fn run_once(&mut self) -> Result<(), Error> {
info!("{}: Opening input: {}", self.short_name, self.redacted_url);
info!("{}: Opening input: {}", self.short_name, self.url.as_str());
let clocks = self.db.clocks();
let mut stream = {
let _t = TimerGuard::new(&clocks, || format!("opening {}", self.redacted_url));
let (extra_data, mut stream) = {
let _t = TimerGuard::new(&clocks, || format!("opening {}", self.url.as_str()));
self.opener.open(stream::Source::Rtsp {
url: self.url.as_str(),
redacted_url: self.redacted_url.as_str(),
url: self.url.clone(),
username: self.username.clone(),
password: self.password.clone(),
})?
};
let realtime_offset = self.db.clocks().realtime() - clocks.monotonic();
let extra_data = stream.get_extra_data()?;
let video_sample_entry_id = {
let _t = TimerGuard::new(&clocks, || "inserting video sample entry");
self.db.lock().insert_video_sample_entry(extra_data.entry)?
@ -128,7 +124,6 @@ where
// Seconds since epoch at which to next rotate.
let mut rotate: Option<i64> = None;
let mut transformed = Vec::new();
let mut w = writer::Writer::new(
&self.dir,
&self.db,
@ -139,10 +134,9 @@ where
while !self.shutdown.load(Ordering::SeqCst) {
let pkt = {
let _t = TimerGuard::new(&clocks, || "getting next packet");
stream.get_next()?
stream.next()?
};
let pts = pkt.pts().ok_or_else(|| format_err!("packet with no pts"))?;
if !seen_key_frame && !pkt.is_key() {
if !seen_key_frame && !pkt.is_key {
continue;
} else if !seen_key_frame {
debug!("{}: have first key frame", self.short_name);
@ -151,10 +145,10 @@ where
let frame_realtime = clocks.monotonic() + realtime_offset;
let local_time = recording::Time::new(frame_realtime);
rotate = if let Some(r) = rotate {
if frame_realtime.sec > r && pkt.is_key() {
if frame_realtime.sec > r && pkt.is_key {
trace!("{}: write on normal rotation", self.short_name);
let _t = TimerGuard::new(&clocks, || "closing writer");
w.close(Some(pts))?;
w.close(Some(pkt.pts))?;
None
} else {
Some(r)
@ -186,20 +180,8 @@ where
r
}
};
let orig_data = match pkt.data() {
Some(d) => d,
None => bail!("packet has no data"),
};
let transformed_data = if extra_data.need_transform {
h264::transform_sample_data(orig_data, &mut transformed)?;
transformed.as_slice()
} else {
orig_data
};
let _t = TimerGuard::new(&clocks, || {
format!("writing {} bytes", transformed_data.len())
});
w.write(transformed_data, local_time, pts, pkt.is_key())?;
let _t = TimerGuard::new(&clocks, || format!("writing {} bytes", pkt.data.len()));
w.write(pkt.data, local_time, pkt.pts, pkt.is_key)?;
rotate = Some(r);
}
if rotate.is_some() {
@ -225,9 +207,9 @@ mod tests {
use std::sync::Arc;
use time;
struct ProxyingStream<'a> {
clocks: &'a clock::SimulatedClocks,
inner: stream::FfmpegStream,
struct ProxyingStream {
clocks: clock::SimulatedClocks,
inner: Box<dyn stream::Stream>,
buffered: time::Duration,
slept: time::Duration,
ts_offset: i64,
@ -235,17 +217,17 @@ mod tests {
pkts_left: u32,
}
impl<'a> ProxyingStream<'a> {
impl ProxyingStream {
fn new(
clocks: &'a clock::SimulatedClocks,
clocks: clock::SimulatedClocks,
buffered: time::Duration,
inner: stream::FfmpegStream,
inner: Box<dyn stream::Stream>,
) -> ProxyingStream {
clocks.sleep(buffered);
ProxyingStream {
clocks: clocks,
inner: inner,
buffered: buffered,
clocks,
inner,
buffered,
slept: time::Duration::seconds(0),
ts_offset: 0,
ts_offset_pkts_left: 0,
@ -254,21 +236,22 @@ mod tests {
}
}
impl<'a> Stream for ProxyingStream<'a> {
fn get_next(&mut self) -> Result<ffmpeg::avcodec::Packet, ffmpeg::Error> {
impl Stream for ProxyingStream {
fn next(&mut self) -> Result<stream::VideoFrame, Error> {
if self.pkts_left == 0 {
return Err(ffmpeg::Error::eof());
bail!("end of stream");
}
self.pkts_left -= 1;
let mut pkt = self.inner.get_next()?;
let mut frame = self.inner.next()?;
// XXX: comment wrong.
// Emulate the behavior of real cameras that send some pre-buffered frames immediately
// on connect. After that, advance clock to the end of this frame.
// Avoid accumulating conversion error by tracking the total amount to sleep and how
// much we've already slept, rather than considering each frame in isolation.
{
let goal = pkt.pts().unwrap() + pkt.duration() as i64;
let goal = frame.pts + i64::from(frame.duration);
let goal = time::Duration::nanoseconds(
goal * 1_000_000_000 / recording::TIME_UNITS_PER_SEC,
);
@ -281,39 +264,31 @@ mod tests {
if self.ts_offset_pkts_left > 0 {
self.ts_offset_pkts_left -= 1;
let old_pts = pkt.pts().unwrap();
let old_dts = pkt.dts();
pkt.set_pts(Some(old_pts + self.ts_offset));
pkt.set_dts(old_dts + self.ts_offset);
frame.pts += self.ts_offset;
// In a real rtsp stream, the duration of a packet is not known until the
// next packet. ffmpeg's duration is an unreliable estimate. Set it to something
// ridiculous.
pkt.set_duration(i32::try_from(3600 * recording::TIME_UNITS_PER_SEC).unwrap());
frame.duration = i32::try_from(3600 * recording::TIME_UNITS_PER_SEC).unwrap();
}
Ok(pkt)
}
fn get_video_codecpar(&self) -> ffmpeg::avcodec::InputCodecParameters<'_> {
self.inner.get_video_codecpar()
}
fn get_extra_data(&self) -> Result<h264::ExtraData, Error> {
self.inner.get_extra_data()
Ok(frame)
}
}
struct MockOpener<'a> {
expected_url: String,
streams: Mutex<Vec<ProxyingStream<'a>>>,
struct MockOpener {
expected_url: url::Url,
streams: Mutex<Vec<(h264::ExtraData, Box<dyn stream::Stream>)>>,
shutdown: Arc<AtomicBool>,
}
impl<'a> stream::Opener<ProxyingStream<'a>> for MockOpener<'a> {
fn open(&self, src: stream::Source) -> Result<ProxyingStream<'a>, Error> {
impl stream::Opener for MockOpener {
fn open(
&self,
src: stream::Source,
) -> Result<(h264::ExtraData, Box<dyn stream::Stream>), Error> {
match src {
stream::Source::Rtsp { url, .. } => assert_eq!(url, &self.expected_url),
stream::Source::Rtsp { url, .. } => assert_eq!(&url, &self.expected_url),
stream::Source::File(_) => panic!("expected rtsp url"),
};
let mut l = self.streams.lock();
@ -361,16 +336,16 @@ mod tests {
let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0));
clocks.sleep(time::Duration::seconds(86400)); // to 2015-04-26 00:00:00 UTC
let stream = stream::FFMPEG
let (extra_data, stream) = stream::FFMPEG
.open(stream::Source::File("src/testdata/clip.mp4"))
.unwrap();
let mut stream = ProxyingStream::new(&clocks, time::Duration::seconds(2), stream);
let mut stream = ProxyingStream::new(clocks.clone(), time::Duration::seconds(2), stream);
stream.ts_offset = 123456; // starting pts of the input should be irrelevant
stream.ts_offset_pkts_left = u32::max_value();
stream.pkts_left = u32::max_value();
let opener = MockOpener {
expected_url: "rtsp://foo:bar@test-camera/main".to_owned(),
streams: Mutex::new(vec![stream]),
expected_url: url::Url::parse("rtsp://test-camera/main").unwrap(),
streams: Mutex::new(vec![(extra_data, Box::new(stream))]),
shutdown: Arc::new(AtomicBool::new(false)),
};
let db = testutil::TestDb::new(clocks.clone());
@ -439,5 +414,8 @@ mod tests {
assert_eq!(1, recordings[1].id.recording());
assert_eq!(recording::Time(128700576719993), recordings[1].start);
assert_eq!(db::RecordingFlags::TrailingZero as i32, recordings[1].flags);
drop(env);
drop(opener);
}
}