From 032bd7657700ef7ca78bef517877be4c0a09c6c0 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Mon, 7 Jun 2021 14:36:53 -0700 Subject: [PATCH] support --rtsp-library=retina (#37) This isn't well-tested and doesn't yet support an initial connection timeout. But in a quick test, it successfully returns video! I'd like to do some more aggressive code restructuring for zero-copy and to have only one writer thread per sample file directory (rather than the syncer thread + one writer thread per RTSP stream). But I'll likely wait until I drop support for ffmpeg entirely. --- server/Cargo.lock | 125 ++++++++++ server/Cargo.toml | 1 + server/db/db.rs | 16 +- server/db/testutil.rs | 4 +- server/src/cmds/config/cameras.rs | 48 ++-- server/src/cmds/run.rs | 14 +- server/src/json.rs | 8 +- server/src/mp4.rs | 47 ++-- server/src/stream.rs | 365 ++++++++++++++++++++++++++---- server/src/streamer.rs | 154 ++++++------- 10 files changed, 582 insertions(+), 200 deletions(-) diff --git a/server/Cargo.lock b/server/Cargo.lock index 4cf6a34..9de1303 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -83,6 +83,27 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" +[[package]] +name = "async-stream" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atty" version = "0.2.14" @@ -322,6 +343,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" +[[package]] +name = "cookie-factory" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" + [[package]] name = "cpufeatures" version = "0.1.4" @@ -507,6 +534,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "digest_auth" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa30657988b2ced88f68fe490889e739bf98d342916c33ed3100af1d6f1cbc9c" +dependencies = [ + "digest", + "hex", + "md-5", + "rand", + "sha2", +] + [[package]] name = "dirs" version = "1.0.5" @@ -862,6 +902,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "hmac" version = "0.11.0" @@ -1120,6 +1166,17 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" +[[package]] +name = "md-5" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15" +dependencies = [ + "block-buffer", + "digest", + "opaque-debug", +] + [[package]] name = "memchr" version = "2.4.0" @@ -1260,6 +1317,7 @@ dependencies = [ "protobuf", "reffers", "reqwest", + "retina", "ring", "rusqlite", "serde", @@ -1812,6 +1870,35 @@ dependencies = [ "winreg", ] +[[package]] +name = "retina" +version = "0.0.1" +source = "git+https://github.com/scottlamb/retina?branch=main#1bcc864344cf54fdb691a0e11669a663d3c1d7c9" +dependencies = [ + "async-stream", + "base64", + "bitreader", + "bytes", + "digest_auth", + "failure", + "futures", + "h264-reader", + "hex", + "log", + "once_cell", + "pin-project", + "pretty-hex", + "rtcp", + "rtp-rs", + "rtsp-types", + "sdp", + "smallvec", + "time", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "rfc6381-codec" version = "0.1.0" @@ -1848,6 +1935,33 @@ dependencies = [ "winapi", ] +[[package]] +name = "rtcp" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5fb4431b04a948fd91622a75d65a95da3ed2f0be26c902f3d027a23b78fbc96" +dependencies = [ + "bytes", + "thiserror", +] + +[[package]] +name = "rtp-rs" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1110d695193d446e901de09921ffbf2d86ae351bbfde9c5b53863ce177e17f5" + +[[package]] +name = "rtsp-types" +version = "0.0.2" +source = "git+https://github.com/sdroege/rtsp-types#53bdf9a74175946572eb3509a6343696724123eb" +dependencies = [ + "cookie-factory", + "nom", + "tinyvec", + "url", +] + [[package]] name = "rusqlite" version = "0.25.3" @@ -1922,6 +2036,17 @@ dependencies = [ "sha2", ] +[[package]] +name = "sdp" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f14aa4ddf1473d50e1664f5c353017981f43f27ede9136a54f60c4bb56d8b152" +dependencies = [ + "rand", + "thiserror", + "url", +] + [[package]] name = "serde" version = "1.0.126" diff --git a/server/Cargo.toml b/server/Cargo.toml index 0b5a268..9c7f368 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -45,6 +45,7 @@ nom = "6.0.0" parking_lot = { version = "0.11.1", features = [] } protobuf = { git = "https://github.com/stepancheg/rust-protobuf" } reffers = "0.6.0" +retina = { git = "https://github.com/scottlamb/retina", branch = "main" } ring = "0.16.2" rusqlite = "0.25.3" serde = { version = "1.0", features = ["derive"] } diff --git a/server/db/db.rs b/server/db/db.rs index 1b1d312..e87ecd9 100644 --- a/server/db/db.rs +++ b/server/db/db.rs @@ -350,8 +350,8 @@ pub struct Camera { pub short_name: String, pub description: String, pub onvif_host: String, - pub username: String, - pub password: String, + pub username: Option, + pub password: Option, pub streams: [Option; 2], } @@ -500,8 +500,8 @@ pub struct CameraChange { pub short_name: String, pub description: String, pub onvif_host: String, - pub username: String, - pub password: String, + pub username: Option, + pub password: Option, /// `StreamType t` is represented by `streams[t.index()]`. A default StreamChange will /// correspond to no stream in the database, provided there are no existing recordings for that @@ -2361,8 +2361,8 @@ mod tests { camera_id = row.id; assert_eq!(uuid, row.uuid); assert_eq!("test-camera", row.onvif_host); - assert_eq!("foo", row.username); - assert_eq!("bar", row.password); + assert_eq!(Some("foo"), row.username.as_deref()); + assert_eq!(Some("bar"), row.password.as_deref()); //assert_eq!("/main", row.main_rtsp_url); //assert_eq!("/sub", row.sub_rtsp_url); //assert_eq!(42, row.retain_bytes); @@ -2513,8 +2513,8 @@ mod tests { short_name: "testcam".to_owned(), description: "".to_owned(), onvif_host: "test-camera".to_owned(), - username: "foo".to_owned(), - password: "bar".to_owned(), + username: Some("foo".to_owned()), + password: Some("bar".to_owned()), streams: [ StreamChange { sample_file_dir_id: Some(sample_file_dir_id), diff --git a/server/db/testutil.rs b/server/db/testutil.rs index 5c81185..50d9fcb 100644 --- a/server/db/testutil.rs +++ b/server/db/testutil.rs @@ -84,8 +84,8 @@ impl TestDb { short_name: "test camera".to_owned(), description: "".to_owned(), onvif_host: "test-camera".to_owned(), - username: "foo".to_owned(), - password: "bar".to_owned(), + username: Some("foo".to_owned()), + password: Some("bar".to_owned()), streams: [ db::StreamChange { sample_file_dir_id: Some(sample_file_dir_id), diff --git a/server/src/cmds/config/cameras.rs b/server/src/cmds/config/cameras.rs index fe068c5..cd853ff 100644 --- a/server/src/cmds/config/cameras.rs +++ b/server/src/cmds/config/cameras.rs @@ -2,7 +2,7 @@ // Copyright (C) 2020 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt. // SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception. -use crate::stream::{self, Opener, Stream}; +use crate::stream::{self, Opener}; use base::strutil::{decode_size, encode_size}; use cursive::traits::{Boxable, Finder, Identifiable}; use cursive::views; @@ -35,24 +35,30 @@ fn get_change(siv: &mut Cursive) -> db::CameraChange { .get_content() .as_str() .into(); - let u = siv + let username = match siv .find_name::("username") .unwrap() .get_content() .as_str() - .into(); - let p = siv + { + "" => None, + u => Some(u.to_owned()), + }; + let password = match siv .find_name::("password") .unwrap() .get_content() .as_str() - .into(); + { + "" => None, + p => Some(p.to_owned()), + }; let mut c = db::CameraChange { short_name: sn, description: d, onvif_host: h, - username: u, - password: p, + username, + password, streams: Default::default(), }; for &t in &db::ALL_STREAM_TYPES { @@ -114,12 +120,16 @@ fn press_edit(siv: &mut Cursive, db: &Arc, id: Option) { } } -fn press_test_inner(url: &Url) -> Result { - let stream = stream::FFMPEG.open(stream::Source::Rtsp { - url: url.as_str(), - redacted_url: url.as_str(), // don't need redaction in config UI. +fn press_test_inner( + url: Url, + username: Option, + password: Option, +) -> Result { + let (extra_data, _stream) = stream::FFMPEG.open(stream::Source::Rtsp { + url, + username, + password, })?; - let extra_data = stream.get_extra_data()?; Ok(format!( "{}x{} video stream", extra_data.entry.width, extra_data.entry.height @@ -128,7 +138,7 @@ fn press_test_inner(url: &Url) -> Result { fn press_test(siv: &mut Cursive, t: db::StreamType) { let c = get_change(siv); - let mut url = match Url::parse(&c.streams[t.index()].rtsp_url) { + let url = match Url::parse(&c.streams[t.index()].rtsp_url) { Ok(u) => u, Err(e) => { siv.add_layer( @@ -139,11 +149,9 @@ fn press_test(siv: &mut Cursive, t: db::StreamType) { return; } }; + let username = c.username; + let password = c.password; - if !c.username.is_empty() { - let _ = url.set_username(&c.username); - let _ = url.set_password(Some(&c.password)); - } siv.add_layer( views::Dialog::text(format!( "Testing {} stream at {}. This may take a while \ @@ -159,7 +167,7 @@ fn press_test(siv: &mut Cursive, t: db::StreamType) { siv.set_fps(5); let sink = siv.cb_sink().clone(); ::std::thread::spawn(move || { - let r = press_test_inner(&url); + let r = press_test_inner(url.clone(), username, password); sink.send(Box::new(move |siv: &mut Cursive| { // Polling is no longer necessary. siv.set_fps(0); @@ -442,8 +450,8 @@ fn edit_camera_dialog(db: &Arc, siv: &mut Cursive, item: &Option Result { let streams = l.streams_by_id().len(); let env = streamer::Environment { db: &db, - opener: &*stream::FFMPEG, + opener: args.rtsp_library.opener(), shutdown: &shutdown_streamers, }; @@ -227,6 +234,7 @@ pub async fn run(args: &Args) -> Result { } // Then start up streams. + let handle = tokio::runtime::Handle::current(); let l = db.lock(); for (i, (id, stream)) in l.streams_by_id().iter().enumerate() { if !stream.record { @@ -259,10 +267,12 @@ pub async fn run(args: &Args) -> Result { )?; info!("Starting streamer for {}", streamer.short_name()); let name = format!("s-{}", streamer.short_name()); + let handle = handle.clone(); streamers.push( thread::Builder::new() .name(name) .spawn(move || { + let _enter = handle.enter(); streamer.run(); }) .expect("can't create thread"), diff --git a/server/src/json.rs b/server/src/json.rs index 35b207a..a128514 100644 --- a/server/src/json.rs +++ b/server/src/json.rs @@ -70,8 +70,8 @@ pub struct Camera<'a> { #[serde(rename_all = "camelCase")] pub struct CameraConfig<'a> { pub onvif_host: &'a str, - pub username: &'a str, - pub password: &'a str, + pub username: Option<&'a str>, + pub password: Option<&'a str>, } #[derive(Debug, Serialize)] @@ -191,8 +191,8 @@ impl<'a> Camera<'a> { false => None, true => Some(CameraConfig { onvif_host: &c.onvif_host, - username: &c.username, - password: &c.password, + username: c.username.as_deref(), + password: c.password.as_deref(), }), }, streams: [ diff --git a/server/src/mp4.rs b/server/src/mp4.rs index 38db3b2..b508011 100644 --- a/server/src/mp4.rs +++ b/server/src/mp4.rs @@ -1971,7 +1971,7 @@ impl fmt::Debug for File { #[cfg(test)] mod tests { use super::*; - use crate::stream::{self, Opener, Stream}; + use crate::stream::{self, Opener}; use base::clock::RealClocks; use byteorder::{BigEndian, ByteOrder}; use db::recording::{self, TIME_UNITS_PER_SEC}; @@ -2255,13 +2255,12 @@ mod tests { } fn copy_mp4_to_db(db: &TestDb) { - let mut input = stream::FFMPEG + let (extra_data, mut input) = stream::FFMPEG .open(stream::Source::File("src/testdata/clip.mp4")) .unwrap(); // 2015-04-26 00:00:00 UTC. const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); - let extra_data = input.get_extra_data().unwrap(); let video_sample_entry_id = db .db .lock() @@ -2286,26 +2285,20 @@ mod tests { let mut frame_time = START_TIME; loop { - let pkt = match input.get_next() { + let pkt = match input.next() { Ok(p) => p, - Err(e) if e.is_eof() => { + Err(e) if e.to_string().contains("End of file") => { break; } Err(e) => { panic!("unexpected input error: {}", e); } }; - let pts = pkt.pts().unwrap(); - frame_time += recording::Duration(pkt.duration() as i64); + frame_time += recording::Duration(i64::from(pkt.duration)); output - .write( - pkt.data().expect("packet without data"), - frame_time, - pts, - pkt.is_key(), - ) + .write(pkt.data, frame_time, pkt.pts, pkt.is_key) .unwrap(); - end_pts = Some(pts + pkt.duration() as i64); + end_pts = Some(pkt.pts + i64::from(pkt.duration)); } output.close(end_pts).unwrap(); db.syncer_channel.flush(); @@ -2373,28 +2366,25 @@ mod tests { } fn compare_mp4s(new_filename: &str, pts_offset: i64, shorten: i64) { - let mut orig = stream::FFMPEG + let (orig_extra_data, mut orig) = stream::FFMPEG .open(stream::Source::File("src/testdata/clip.mp4")) .unwrap(); - let mut new = stream::FFMPEG + let (new_extra_data, mut new) = stream::FFMPEG .open(stream::Source::File(new_filename)) .unwrap(); - assert_eq!( - orig.get_extra_data().unwrap(), - new.get_extra_data().unwrap() - ); + assert_eq!(orig_extra_data, new_extra_data); let mut final_durations = None; loop { - let orig_pkt = match orig.get_next() { + let orig_pkt = match orig.next() { Ok(p) => Some(p), - Err(e) if e.is_eof() => None, + Err(e) if e.to_string() == "End of file" => None, Err(e) => { panic!("unexpected input error: {}", e); } }; - let new_pkt = match new.get_next() { + let new_pkt = match new.next() { Ok(p) => Some(p), - Err(e) if e.is_eof() => { + Err(e) if e.to_string() == "End of file" => { break; } Err(e) => { @@ -2406,11 +2396,10 @@ mod tests { (None, None) => break, (o, n) => panic!("orig: {} new: {}", o.is_some(), n.is_some()), }; - assert_eq!(orig_pkt.pts().unwrap(), new_pkt.pts().unwrap() + pts_offset); - assert_eq!(orig_pkt.dts(), new_pkt.dts() + pts_offset); - assert_eq!(orig_pkt.data(), new_pkt.data()); - assert_eq!(orig_pkt.is_key(), new_pkt.is_key()); - final_durations = Some((orig_pkt.duration() as i64, new_pkt.duration() as i64)); + assert_eq!(orig_pkt.pts, new_pkt.pts + pts_offset); + assert_eq!(orig_pkt.data, new_pkt.data); + assert_eq!(orig_pkt.is_key, new_pkt.is_key); + final_durations = Some((i64::from(orig_pkt.duration), i64::from(new_pkt.duration))); } if let Some((orig_dur, new_dur)) = final_durations { diff --git a/server/src/stream.rs b/server/src/stream.rs index 52e4749..f050e2c 100644 --- a/server/src/stream.rs +++ b/server/src/stream.rs @@ -3,52 +3,108 @@ // SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception. use crate::h264; +use bytes::Buf; use cstr::cstr; +use failure::format_err; use failure::{bail, Error}; +use futures::StreamExt; use lazy_static::lazy_static; use log::warn; +use retina::client::{Credentials, Playing, Session}; +use retina::codec::{CodecItem, VideoParameters}; use std::convert::TryFrom; use std::ffi::CString; +use std::num::NonZeroU32; use std::result::Result; +use url::Url; -static START: parking_lot::Once = parking_lot::Once::new(); +static START_FFMPEG: parking_lot::Once = parking_lot::Once::new(); lazy_static! { pub static ref FFMPEG: Ffmpeg = Ffmpeg::new(); } +pub enum RtspLibrary { + Ffmpeg, + Retina, +} + +impl std::str::FromStr for RtspLibrary { + type Err = Error; + + fn from_str(s: &str) -> Result { + Ok(match s { + "ffmpeg" => RtspLibrary::Ffmpeg, + "retina" => RtspLibrary::Retina, + _ => bail!("unknown RTSP library {:?}", s), + }) + } +} + +impl RtspLibrary { + pub fn opener(&self) -> &'static dyn Opener { + match self { + RtspLibrary::Ffmpeg => &*FFMPEG, + RtspLibrary::Retina => &RETINA, + } + } +} + +#[cfg(test)] pub enum Source<'a> { /// A filename, for testing. - #[cfg(test)] File(&'a str), /// An RTSP stream, for production use. - Rtsp { url: &'a str, redacted_url: &'a str }, + Rtsp { + url: Url, + username: Option, + password: Option, + }, } -pub trait Opener: Sync { - fn open(&self, src: Source) -> Result; +#[cfg(not(test))] +pub enum Source { + /// An RTSP stream, for production use. + Rtsp { + url: Url, + username: Option, + password: Option, + }, } -pub trait Stream { - fn get_video_codecpar(&self) -> ffmpeg::avcodec::InputCodecParameters<'_>; - fn get_extra_data(&self) -> Result; - fn get_next(&mut self) -> Result; +pub trait Opener: Send + Sync { + fn open(&self, src: Source) -> Result<(h264::ExtraData, Box), Error>; +} + +pub struct VideoFrame<'a> { + pub pts: i64, + + /// An estimate of the duration of the frame, or zero. + /// This can be deceptive and is only used by some testing code. + pub duration: i32, + + pub is_key: bool, + pub data: &'a [u8], +} + +pub trait Stream: Send { + fn next(&mut self) -> Result; } pub struct Ffmpeg {} impl Ffmpeg { fn new() -> Ffmpeg { - START.call_once(|| { + START_FFMPEG.call_once(|| { ffmpeg::Ffmpeg::new(); }); Ffmpeg {} } } -impl Opener for Ffmpeg { - fn open(&self, src: Source) -> Result { +impl Opener for Ffmpeg { + fn open(&self, src: Source) -> Result<(h264::ExtraData, Box), Error> { use ffmpeg::avformat::InputFormatContext; let mut input = match src { #[cfg(test)] @@ -72,7 +128,11 @@ impl Opener for Ffmpeg { } i } - Source::Rtsp { url, redacted_url } => { + Source::Rtsp { + url, + username, + password, + } => { let mut open_options = ffmpeg::avutil::Dictionary::new(); open_options .set(cstr!("rtsp_transport"), cstr!("tcp")) @@ -98,11 +158,23 @@ impl Opener for Ffmpeg { .set(cstr!("allowed_media_types"), cstr!("video")) .unwrap(); - let i = InputFormatContext::open(&CString::new(url).unwrap(), &mut open_options)?; + let mut url_with_credentials = url.clone(); + if let Some(u) = username.as_deref() { + url_with_credentials + .set_username(u) + .map_err(|_| format_err!("unable to set username on url {}", url))?; + } + url_with_credentials + .set_password(password.as_deref()) + .map_err(|_| format_err!("unable to set password on url {}", url))?; + let i = InputFormatContext::open( + &CString::new(url_with_credentials.as_str())?, + &mut open_options, + )?; if !open_options.empty() { warn!( "While opening URL {}, some options were not understood: {}", - redacted_url, open_options + url, open_options ); } i @@ -127,22 +199,12 @@ impl Opener for Ffmpeg { None => bail!("no video stream"), }; - Ok(FfmpegStream { input, video_i }) - } -} - -pub struct FfmpegStream { - input: ffmpeg::avformat::InputFormatContext<'static>, - video_i: usize, -} - -impl Stream for FfmpegStream { - fn get_video_codecpar(&self) -> ffmpeg::avcodec::InputCodecParameters { - self.input.streams().get(self.video_i).codecpar() - } - - fn get_extra_data(&self) -> Result { - let video = self.input.streams().get(self.video_i); + let video = input.streams().get(video_i); + let codec = video.codecpar(); + let codec_id = codec.codec_id(); + if !codec_id.is_h264() { + bail!("stream's video codec {:?} is not h264", codec_id); + } let tb = video.time_base(); if tb.num != 1 || tb.den != 90000 { bail!( @@ -151,25 +213,234 @@ impl Stream for FfmpegStream { tb.den ); } - let codec = video.codecpar(); - let codec_id = codec.codec_id(); - if !codec_id.is_h264() { - bail!("stream's video codec {:?} is not h264", codec_id); - } let dims = codec.dims(); - h264::ExtraData::parse( + let extra_data = h264::ExtraData::parse( codec.extradata(), u16::try_from(dims.width)?, u16::try_from(dims.height)?, - ) - } - - fn get_next(&mut self) -> Result { - loop { - let p = self.input.read_frame()?; - if p.stream_index() == self.video_i { - return Ok(p); - } - } + )?; + let need_transform = extra_data.need_transform; + let stream = Box::new(FfmpegStream { + input, + video_i, + data: Vec::new(), + need_transform, + }); + Ok((extra_data, stream)) + } +} + +struct FfmpegStream { + input: ffmpeg::avformat::InputFormatContext<'static>, + video_i: usize, + data: Vec, + need_transform: bool, +} + +impl Stream for FfmpegStream { + fn next(&mut self) -> Result { + let pkt = loop { + let pkt = self.input.read_frame()?; + if pkt.stream_index() == self.video_i { + break pkt; + } + }; + let data = pkt + .data() + .ok_or_else(|| format_err!("packet with no data"))?; + if self.need_transform { + h264::transform_sample_data(data, &mut self.data)?; + } else { + // This copy isn't strictly necessary, but this path is only taken in testing anyway. + self.data.clear(); + self.data.extend_from_slice(data); + } + let pts = pkt.pts().ok_or_else(|| format_err!("packet with no pts"))?; + Ok(VideoFrame { + pts, + is_key: pkt.is_key(), + duration: pkt.duration(), + data: &self.data, + }) + } +} + +pub struct RetinaOpener {} + +pub const RETINA: RetinaOpener = RetinaOpener {}; + +impl Opener for RetinaOpener { + fn open(&self, src: Source) -> Result<(h264::ExtraData, Box), Error> { + let (startup_tx, startup_rx) = tokio::sync::oneshot::channel(); + let (frame_tx, frame_rx) = tokio::sync::mpsc::channel(1); + let handle = tokio::runtime::Handle::current(); + let (url, username, password) = match src { + #[cfg(test)] + Source::File(_) => bail!("Retina doesn't support .mp4 files"), + Source::Rtsp { + url, + username, + password, + } => (url, username, password), + }; + let creds = match (username, password) { + (None, None) => None, + (Some(username), Some(password)) => Some(Credentials { username, password }), + _ => bail!("expected username and password together"), + }; + + // TODO: connection timeout. + handle.spawn(async move { + let (session, mut video_params) = match RetinaOpener::play(url, creds).await { + Err(e) => { + let _ = startup_tx.send(Err(e)); + return; + } + Ok((s, v)) => (s, v), + }; + let session = match session.demuxed() { + Ok(s) => s, + Err(e) => { + let _ = startup_tx.send(Err(e)); + return; + } + }; + tokio::pin!(session); + + // First frame. + loop { + match session.next().await { + Some(Err(e)) => { + let _ = startup_tx.send(Err(e)); + return; + } + Some(Ok(CodecItem::VideoFrame(mut v))) => { + if let Some(v) = v.new_parameters.take() { + video_params = v; + } + if v.is_random_access_point { + if startup_tx.send(Ok(video_params)).is_err() { + return; + } + if frame_tx.send(Ok(v)).await.is_err() { + return; + } + break; + } + } + Some(Ok(_)) => {} + None => { + let _ = + startup_tx.send(Err(format_err!("stream closed before first frame"))); + return; + } + } + } + + // Following frames. + let mut need_key_frame = false; + while let Some(item) = session.next().await { + match item { + Err(e) => { + let _ = frame_tx.send(Err(e)).await; + return; + } + Ok(CodecItem::VideoFrame(v)) => { + if v.loss > 0 { + if !v.is_random_access_point { + log::info!( + "lost {} RTP packets; waiting for next key frame @ {:?}", + v.loss, + v.start_ctx() + ); + need_key_frame = true; + continue; + } else { + log::info!( + "lost {} RTP packets; already have key frame @ {:?}", + v.loss, + v.start_ctx() + ); + need_key_frame = false; + } + } else if need_key_frame && !v.is_random_access_point { + continue; + } else if need_key_frame { + log::info!("recovering from loss with key frame @ {:?}", v.start_ctx()); + need_key_frame = false; + } + if frame_tx.send(Ok(v)).await.is_err() { + return; // other end died. + } + } + _ => {} + } + } + }); + let video_params = handle.block_on(startup_rx)??; + let dims = video_params.pixel_dimensions(); + let extra_data = h264::ExtraData::parse( + video_params.extra_data(), + u16::try_from(dims.0)?, + u16::try_from(dims.1)?, + )?; + let stream = Box::new(RetinaStream { + frame_rx, + data: Vec::new(), + }); + Ok((extra_data, stream)) + } +} + +impl RetinaOpener { + async fn play( + url: Url, + creds: Option, + ) -> Result<(Session, VideoParameters), Error> { + let mut session = retina::client::Session::describe(url, creds).await?; + let (video_i, video_params) = session + .streams() + .iter() + .enumerate() + .find_map(|(i, s)| match s.parameters() { + Some(retina::codec::Parameters::Video(v)) => Some((i, v.clone())), + _ => None, + }) + .ok_or_else(|| format_err!("couldn't find H.264 video stream"))?; + session.setup(video_i).await?; + let session = session + .play( + retina::client::PlayPolicy::default() + .enforce_timestamps_with_max_jump_secs(NonZeroU32::new(10).unwrap()), + ) + .await?; + Ok((session, video_params)) + } +} + +struct RetinaStream { + frame_rx: tokio::sync::mpsc::Receiver>, + data: Vec, +} + +impl Stream for RetinaStream { + fn next(&mut self) -> Result { + let mut frame = self + .frame_rx + .blocking_recv() + .ok_or_else(|| format_err!("stream ended"))??; + self.data.clear(); + while frame.has_remaining() { + let chunk = frame.chunk(); + self.data.extend_from_slice(chunk); + let len = chunk.len(); + frame.advance(len); + } + Ok(VideoFrame { + pts: frame.timestamp.elapsed(), + duration: 0, + is_key: frame.is_random_access_point, + data: &self.data, + }) } } diff --git a/server/src/streamer.rs b/server/src/streamer.rs index 4cf1a44..03bdb85 100644 --- a/server/src/streamer.rs +++ b/server/src/streamer.rs @@ -2,11 +2,10 @@ // Copyright (C) 2020 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt. // SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception. -use crate::h264; use crate::stream; use base::clock::{Clocks, TimerGuard}; use db::{dir, recording, writer, Camera, Database, Stream}; -use failure::{bail, format_err, Error}; +use failure::{bail, Error}; use log::{debug, info, trace, warn}; use std::result::Result; use std::sync::atomic::{AtomicBool, Ordering}; @@ -16,22 +15,20 @@ use url::Url; pub static ROTATE_INTERVAL_SEC: i64 = 60; /// Common state that can be used by multiple `Streamer` instances. -pub struct Environment<'a, 'b, C, S> +pub struct Environment<'a, 'tmp, C> where C: Clocks + Clone, - S: 'a + stream::Stream, { - pub opener: &'a dyn stream::Opener, - pub db: &'b Arc>, - pub shutdown: &'b Arc, + pub opener: &'a dyn stream::Opener, + pub db: &'tmp Arc>, + pub shutdown: &'tmp Arc, } /// Connects to a given RTSP stream and writes recordings to the database via [`writer::Writer`]. /// Streamer is meant to be long-lived; it will sleep and retry after each failure. -pub struct Streamer<'a, C, S> +pub struct Streamer<'a, C> where C: Clocks + Clone, - S: 'a + stream::Stream, { shutdown: Arc, @@ -41,20 +38,20 @@ where db: Arc>, dir: Arc, syncer_channel: writer::SyncerChannel<::std::fs::File>, - opener: &'a dyn stream::Opener, + opener: &'a dyn stream::Opener, stream_id: i32, short_name: String, url: Url, - redacted_url: Url, + username: Option, + password: Option, } -impl<'a, C, S> Streamer<'a, C, S> +impl<'a, C> Streamer<'a, C> where C: 'a + Clocks + Clone, - S: 'a + stream::Stream, { - pub fn new<'b>( - env: &Environment<'a, 'b, C, S>, + pub fn new<'tmp>( + env: &Environment<'a, 'tmp, C>, dir: Arc, syncer_channel: writer::SyncerChannel<::std::fs::File>, stream_id: i32, @@ -63,14 +60,9 @@ where rotate_offset_sec: i64, rotate_interval_sec: i64, ) -> Result { - let mut url = Url::parse(&s.rtsp_url)?; - let mut redacted_url = url.clone(); - if !c.username.is_empty() { - url.set_username(&c.username) - .map_err(|_| format_err!("can't set username"))?; - redacted_url.set_username(&c.username).unwrap(); - url.set_password(Some(&c.password)).unwrap(); - redacted_url.set_password(Some("redacted")).unwrap(); + let url = Url::parse(&s.rtsp_url)?; + if !url.username().is_empty() || url.password().is_some() { + bail!("RTSP URL shouldn't include credentials"); } Ok(Streamer { shutdown: env.shutdown.clone(), @@ -83,7 +75,8 @@ where stream_id, short_name: format!("{}-{}", c.short_name, s.type_.as_str()), url, - redacted_url, + username: c.username.clone(), + password: c.password.clone(), }) } @@ -91,6 +84,9 @@ where &self.short_name } + /// Runs the streamer; blocks. + /// Note that when using Retina as the RTSP library, this must be called + /// within a tokio runtime context; see [tokio::runtime::Handle]. pub fn run(&mut self) { while !self.shutdown.load(Ordering::SeqCst) { if let Err(e) = self.run_once() { @@ -108,18 +104,18 @@ where } fn run_once(&mut self) -> Result<(), Error> { - info!("{}: Opening input: {}", self.short_name, self.redacted_url); + info!("{}: Opening input: {}", self.short_name, self.url.as_str()); let clocks = self.db.clocks(); - let mut stream = { - let _t = TimerGuard::new(&clocks, || format!("opening {}", self.redacted_url)); + let (extra_data, mut stream) = { + let _t = TimerGuard::new(&clocks, || format!("opening {}", self.url.as_str())); self.opener.open(stream::Source::Rtsp { - url: self.url.as_str(), - redacted_url: self.redacted_url.as_str(), + url: self.url.clone(), + username: self.username.clone(), + password: self.password.clone(), })? }; let realtime_offset = self.db.clocks().realtime() - clocks.monotonic(); - let extra_data = stream.get_extra_data()?; let video_sample_entry_id = { let _t = TimerGuard::new(&clocks, || "inserting video sample entry"); self.db.lock().insert_video_sample_entry(extra_data.entry)? @@ -128,7 +124,6 @@ where // Seconds since epoch at which to next rotate. let mut rotate: Option = None; - let mut transformed = Vec::new(); let mut w = writer::Writer::new( &self.dir, &self.db, @@ -139,10 +134,9 @@ where while !self.shutdown.load(Ordering::SeqCst) { let pkt = { let _t = TimerGuard::new(&clocks, || "getting next packet"); - stream.get_next()? + stream.next()? }; - let pts = pkt.pts().ok_or_else(|| format_err!("packet with no pts"))?; - if !seen_key_frame && !pkt.is_key() { + if !seen_key_frame && !pkt.is_key { continue; } else if !seen_key_frame { debug!("{}: have first key frame", self.short_name); @@ -151,10 +145,10 @@ where let frame_realtime = clocks.monotonic() + realtime_offset; let local_time = recording::Time::new(frame_realtime); rotate = if let Some(r) = rotate { - if frame_realtime.sec > r && pkt.is_key() { + if frame_realtime.sec > r && pkt.is_key { trace!("{}: write on normal rotation", self.short_name); let _t = TimerGuard::new(&clocks, || "closing writer"); - w.close(Some(pts))?; + w.close(Some(pkt.pts))?; None } else { Some(r) @@ -186,20 +180,8 @@ where r } }; - let orig_data = match pkt.data() { - Some(d) => d, - None => bail!("packet has no data"), - }; - let transformed_data = if extra_data.need_transform { - h264::transform_sample_data(orig_data, &mut transformed)?; - transformed.as_slice() - } else { - orig_data - }; - let _t = TimerGuard::new(&clocks, || { - format!("writing {} bytes", transformed_data.len()) - }); - w.write(transformed_data, local_time, pts, pkt.is_key())?; + let _t = TimerGuard::new(&clocks, || format!("writing {} bytes", pkt.data.len())); + w.write(pkt.data, local_time, pkt.pts, pkt.is_key)?; rotate = Some(r); } if rotate.is_some() { @@ -225,9 +207,9 @@ mod tests { use std::sync::Arc; use time; - struct ProxyingStream<'a> { - clocks: &'a clock::SimulatedClocks, - inner: stream::FfmpegStream, + struct ProxyingStream { + clocks: clock::SimulatedClocks, + inner: Box, buffered: time::Duration, slept: time::Duration, ts_offset: i64, @@ -235,17 +217,17 @@ mod tests { pkts_left: u32, } - impl<'a> ProxyingStream<'a> { + impl ProxyingStream { fn new( - clocks: &'a clock::SimulatedClocks, + clocks: clock::SimulatedClocks, buffered: time::Duration, - inner: stream::FfmpegStream, + inner: Box, ) -> ProxyingStream { clocks.sleep(buffered); ProxyingStream { - clocks: clocks, - inner: inner, - buffered: buffered, + clocks, + inner, + buffered, slept: time::Duration::seconds(0), ts_offset: 0, ts_offset_pkts_left: 0, @@ -254,21 +236,22 @@ mod tests { } } - impl<'a> Stream for ProxyingStream<'a> { - fn get_next(&mut self) -> Result { + impl Stream for ProxyingStream { + fn next(&mut self) -> Result { if self.pkts_left == 0 { - return Err(ffmpeg::Error::eof()); + bail!("end of stream"); } self.pkts_left -= 1; - let mut pkt = self.inner.get_next()?; + let mut frame = self.inner.next()?; + // XXX: comment wrong. // Emulate the behavior of real cameras that send some pre-buffered frames immediately // on connect. After that, advance clock to the end of this frame. // Avoid accumulating conversion error by tracking the total amount to sleep and how // much we've already slept, rather than considering each frame in isolation. { - let goal = pkt.pts().unwrap() + pkt.duration() as i64; + let goal = frame.pts + i64::from(frame.duration); let goal = time::Duration::nanoseconds( goal * 1_000_000_000 / recording::TIME_UNITS_PER_SEC, ); @@ -281,39 +264,31 @@ mod tests { if self.ts_offset_pkts_left > 0 { self.ts_offset_pkts_left -= 1; - let old_pts = pkt.pts().unwrap(); - let old_dts = pkt.dts(); - pkt.set_pts(Some(old_pts + self.ts_offset)); - pkt.set_dts(old_dts + self.ts_offset); + frame.pts += self.ts_offset; // In a real rtsp stream, the duration of a packet is not known until the // next packet. ffmpeg's duration is an unreliable estimate. Set it to something // ridiculous. - pkt.set_duration(i32::try_from(3600 * recording::TIME_UNITS_PER_SEC).unwrap()); + frame.duration = i32::try_from(3600 * recording::TIME_UNITS_PER_SEC).unwrap(); } - Ok(pkt) - } - - fn get_video_codecpar(&self) -> ffmpeg::avcodec::InputCodecParameters<'_> { - self.inner.get_video_codecpar() - } - - fn get_extra_data(&self) -> Result { - self.inner.get_extra_data() + Ok(frame) } } - struct MockOpener<'a> { - expected_url: String, - streams: Mutex>>, + struct MockOpener { + expected_url: url::Url, + streams: Mutex)>>, shutdown: Arc, } - impl<'a> stream::Opener> for MockOpener<'a> { - fn open(&self, src: stream::Source) -> Result, Error> { + impl stream::Opener for MockOpener { + fn open( + &self, + src: stream::Source, + ) -> Result<(h264::ExtraData, Box), Error> { match src { - stream::Source::Rtsp { url, .. } => assert_eq!(url, &self.expected_url), + stream::Source::Rtsp { url, .. } => assert_eq!(&url, &self.expected_url), stream::Source::File(_) => panic!("expected rtsp url"), }; let mut l = self.streams.lock(); @@ -361,16 +336,16 @@ mod tests { let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0)); clocks.sleep(time::Duration::seconds(86400)); // to 2015-04-26 00:00:00 UTC - let stream = stream::FFMPEG + let (extra_data, stream) = stream::FFMPEG .open(stream::Source::File("src/testdata/clip.mp4")) .unwrap(); - let mut stream = ProxyingStream::new(&clocks, time::Duration::seconds(2), stream); + let mut stream = ProxyingStream::new(clocks.clone(), time::Duration::seconds(2), stream); stream.ts_offset = 123456; // starting pts of the input should be irrelevant stream.ts_offset_pkts_left = u32::max_value(); stream.pkts_left = u32::max_value(); let opener = MockOpener { - expected_url: "rtsp://foo:bar@test-camera/main".to_owned(), - streams: Mutex::new(vec![stream]), + expected_url: url::Url::parse("rtsp://test-camera/main").unwrap(), + streams: Mutex::new(vec![(extra_data, Box::new(stream))]), shutdown: Arc::new(AtomicBool::new(false)), }; let db = testutil::TestDb::new(clocks.clone()); @@ -439,5 +414,8 @@ mod tests { assert_eq!(1, recordings[1].id.recording()); assert_eq!(recording::Time(128700576719993), recordings[1].start); assert_eq!(db::RecordingFlags::TrailingZero as i32, recordings[1].flags); + + drop(env); + drop(opener); } }