diff --git a/design/time.md b/design/time.md index ce503c5..965c819 100644 --- a/design/time.md +++ b/design/time.md @@ -125,9 +125,6 @@ information: beginning of the session. Some cameras omit them entirely depending on firmware version, as noted in [this forum post](https://www.cctvforum.com/topic/40914-video-sync-with-hikvision-ipcams-tech-query-about-rtcp/). - Additionally, Moonfire NVR currently uses ffmpeg's libavformat for RTSP - protocol handling; this library exposes these reports in a limited - fashion. The camera records video frames as in the diagram below: diff --git a/docker/deploy.bash b/docker/deploy.bash index a98bada..3e41e87 100755 --- a/docker/deploy.bash +++ b/docker/deploy.bash @@ -19,7 +19,6 @@ uname -a export DEBIAN_FRONTEND=noninteractive time apt-get update time apt-get install --assume-yes --no-install-recommends \ - ffmpeg \ libncurses6 \ libncursesw6 \ locales \ diff --git a/docker/dev.bash b/docker/dev.bash index b99205b..91c8711 100755 --- a/docker/dev.bash +++ b/docker/dev.bash @@ -78,10 +78,6 @@ time apt-get update # Install the packages for the target architecture. packages+=( - ffmpeg"${apt_target_suffix}" - libavcodec-dev"${apt_target_suffix}" - libavformat-dev"${apt_target_suffix}" - libavutil-dev"${apt_target_suffix}" libncurses-dev"${apt_target_suffix}" libsqlite3-dev"${apt_target_suffix}" ) diff --git a/guide/build.md b/guide/build.md index 4a02f01..de34a48 100644 --- a/guide/build.md +++ b/guide/build.md @@ -168,15 +168,6 @@ Linux VM and filesystem overlay. To build the server, you will need the following C libraries installed: -* [ffmpeg](http://ffmpeg.org/) version 2.x or 3.x, including `libavutil`, - `libavcodec` (to inspect H.264 frames), and `libavformat` (to connect to - RTSP servers and write `.mp4` files). - - Note ffmpeg library versions older than 55.1.101, along with all versions of - the competing project [libav](http://libav.org), don't support socket - timeouts for RTSP. For reliable reconnections on error, it's strongly - recommended to use ffmpeg library versions >= 55.1.101. - * [SQLite3](https://www.sqlite.org/), at least version 3.8.2. (You can skip this if you compile with `--features=bundled` and don't mind the `moonfire-nvr sql` command not working.) @@ -213,7 +204,7 @@ On macOS with [Homebrew](https://brew.sh/) and Xcode installed, try the following command: ```console -$ brew install ffmpeg node +$ brew install node ``` Next, you need Rust 1.56+ and Cargo. The easiest way to install them is by diff --git a/guide/troubleshooting.md b/guide/troubleshooting.md index 4285506..77e75ed 100644 --- a/guide/troubleshooting.md +++ b/guide/troubleshooting.md @@ -11,7 +11,6 @@ need more help. * [Camera stream errors](#camera-stream-errors) * [Problems](#problems) * [Server errors](#server-errors) - * [Problems reading video from cameras](#problems-reading-video-from-cameras) * [`clock_gettime failed: EPERM: Operation not permitted`](#clock_gettime-failed-eperm-operation-not-permitted) * [`Error: pts not monotonically increasing; got 26615520 then 26539470`](#error-pts-not-monotonically-increasing-got-26615520-then-26539470) * [Out of disk space](#out-of-disk-space) @@ -88,8 +87,7 @@ Moonfire NVR names a few important thread types as follows: * `main`: during `moonfire-nvr run`, the main thread does initial setup then just waits for the other threads. In other subcommands, it does everything. * `s-CAMERA-TYPE` (one per stream, where `TYPE` is `main` or `sub`): these - threads write video data to disk. When using `--rtsp-library=ffmpeg`, they - also read the video data from the cameras via RTSP. + threads receive video from the cameras (via RTSP) and write it to disk. * `sync-PATH` (one per sample file directory): These threads call `fsync` to * commit sample files to disk, delete old sample files, and flush the database. @@ -210,14 +208,13 @@ W20201228 21:27:11.402 s-driveway-sub moonfire_base::clock] writing 37 bytes too ### Camera stream errors Warnings like the following indicate that a camera stream was lost due to some -error and Moonfire NVR will try reconnecting shortly. In this case, -`End of file` means that the camera ended the stream. This might happen when the -camera is rebooting or if Moonfire is not consuming packets quickly enough. -In the latter case, you'll likely see a `getting next packet took PT...S!` -message as described above. +error and Moonfire NVR will try reconnecting shortly. `Stream ended` might +happen when the camera is rebooting or if Moonfire is not consuming packets +quickly enough. In the latter case, you'll likely see a +`getting next packet took PT...S!` message as described above. ``` -W20210309 00:28:55.527 s-courtyard-sub moonfire_nvr::streamer] courtyard-sub: sleeping for PT1S after error: End of file +W20210309 00:28:55.527 s-courtyard-sub moonfire_nvr::streamer] courtyard-sub: sleeping for PT1S after error: Stream ended (set environment variable RUST_BACKTRACE=1 to see backtraces) ``` @@ -225,13 +222,6 @@ W20210309 00:28:55.527 s-courtyard-sub moonfire_nvr::streamer] courtyard-sub: sl ### Server errors -#### Problems reading video from cameras - -Moonfire NVR is switching its RTSP handling from ffmpeg to a pure-Rust -library developed by Moonfire NVR's author. If it doesn't read camera -data successfully, please try restarting with `--rtsp-library=ffmpeg` to see -if the problem goes away. Then please file a bug! - #### `clock_gettime failed: EPERM: Operation not permitted` If commands fail with an error like the following, you're likely running diff --git a/server/Cargo.lock b/server/Cargo.lock index cce07a4..6b430ec 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -1078,19 +1078,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "moonfire-ffmpeg" -version = "0.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccb80b7a294201578799191cd66b077cd90cef999f461edc7919465fc118a135" -dependencies = [ - "cc", - "libc", - "log", - "parking_lot", - "pkg-config", -] - [[package]] name = "moonfire-nvr" version = "0.7.2" @@ -1100,7 +1087,6 @@ dependencies = [ "byteorder", "bytes", "clap", - "cstr", "cursive", "failure", "fnv", @@ -1115,7 +1101,7 @@ dependencies = [ "memchr", "moonfire-base", "moonfire-db", - "moonfire-ffmpeg", + "mp4", "mylog", "nix", "nom", @@ -1144,6 +1130,19 @@ dependencies = [ "uuid", ] +[[package]] +name = "mp4" +version = "0.9.2" +source = "git+https://github.com/scottlamb/mp4-rust?branch=moonfire#388fb47653305fb153de4e11d8cbc6f307e02ee5" +dependencies = [ + "byteorder", + "bytes", + "num-rational", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "mp4ra-rust" version = "0.1.0" @@ -1227,6 +1226,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-bigint" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-complex" version = "0.4.0" @@ -1264,8 +1274,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d41702bd167c2df5520b384281bc111a4b5efcf7fbc4c9c222c815b07e0a6a6a" dependencies = [ "autocfg", + "num-bigint", "num-integer", "num-traits", + "serde", ] [[package]] diff --git a/server/Cargo.toml b/server/Cargo.toml index 4683aa6..022bc8b 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -26,11 +26,9 @@ blake3 = "1.0.0" bytes = "1" byteorder = "1.0" clap = { version = "2.33.3", default-features = false, features = ["color", "wrap_help"] } -cstr = "0.2.5" cursive = "0.17.0" db = { package = "moonfire-db", path = "db" } failure = "0.1.1" -ffmpeg = { package = "moonfire-ffmpeg", version = "0.0.2" } futures = "0.3" fnv = "1.0" h264-reader = "0.5.0" @@ -66,6 +64,7 @@ url = "2.1.1" uuid = { version = "0.8", features = ["serde", "std", "v4"] } [dev-dependencies] +mp4 = { git = "https://github.com/scottlamb/mp4-rust", branch = "moonfire" } num-rational = { version = "0.4.0", default-features = false, features = ["std"] } reqwest = { version = "0.11.0", default-features = false, features = ["json"] } tempfile = "3.2.0" diff --git a/server/src/cmds/config/cameras.rs b/server/src/cmds/config/cameras.rs index 4853629..f20355f 100644 --- a/server/src/cmds/config/cameras.rs +++ b/server/src/cmds/config/cameras.rs @@ -10,6 +10,7 @@ use cursive::Cursive; use db::writer; use failure::{bail, format_err, Error, ResultExt}; use std::collections::BTreeMap; +use std::str::FromStr; use std::sync::Arc; use url::Url; @@ -202,32 +203,44 @@ fn press_edit(siv: &mut Cursive, db: &Arc, id: Option) { } } -fn press_test_inner(url: Url, username: String, password: String) -> Result { - let pass_creds = !username.is_empty(); - let (extra_data, _stream) = stream::FFMPEG.open( +fn press_test_inner( + url: Url, + username: String, + password: String, + transport: retina::client::Transport, +) -> Result { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_time() + .enable_io() + .build()?; + let _guard = rt.enter(); + let (extra_data, _stream) = stream::OPENER.open( + &rt, "test stream".to_owned(), - stream::Source::Rtsp { - url, - username: if pass_creds { Some(username) } else { None }, - password: if pass_creds { Some(password) } else { None }, - transport: retina::client::Transport::Tcp, - session_group: Default::default(), - }, + url, + retina::client::SessionOptions::default() + .creds(if username.is_empty() { + None + } else { + Some(retina::client::Credentials { username, password }) + }) + .transport(transport), )?; Ok(format!( "{}x{} video stream", - extra_data.entry.width, extra_data.entry.height + extra_data.width, extra_data.height )) } fn press_test(siv: &mut Cursive, t: db::StreamType) { let c = get_camera(siv); - let url = &c.streams[t.index()].url; - let url = match parse_url(url, &["rtsp"]) { + let s = &c.streams[t.index()]; + let transport = retina::client::Transport::from_str(s.rtsp_transport).unwrap_or_default(); + let url = match parse_url(&s.url, &["rtsp"]) { Ok(Some(u)) => u, _ => panic!( "test button should only be enabled with valid URL, not {:?}", - url + &s.url ), }; let username = c.username; @@ -248,7 +261,7 @@ fn press_test(siv: &mut Cursive, t: db::StreamType) { siv.set_fps(5); let sink = siv.cb_sink().clone(); ::std::thread::spawn(move || { - let r = press_test_inner(url.clone(), username, password); + let r = press_test_inner(url.clone(), username, password, transport); sink.send(Box::new(move |siv: &mut Cursive| { // Polling is no longer necessary. siv.set_fps(0); diff --git a/server/src/cmds/run/config.rs b/server/src/cmds/run/config.rs index 2b0871c..2aa9d21 100644 --- a/server/src/cmds/run/config.rs +++ b/server/src/cmds/run/config.rs @@ -35,13 +35,6 @@ pub struct ConfigFile { /// Defaults to the number of cores on the system. #[serde(default)] pub worker_threads: Option, - - /// RTSP library to use for fetching the cameras' video stream. - /// Moonfire NVR is in the process of switching from `ffmpeg` (used since - /// the beginning of the project) to `retina` (a pure-Rust RTSP library - /// developed by Moonfire NVR's author). - #[serde(default)] - pub rtsp_library: crate::stream::RtspLibrary, } /// Per-bind configuration. diff --git a/server/src/cmds/run/mod.rs b/server/src/cmds/run/mod.rs index ca880c5..8978ce8 100644 --- a/server/src/cmds/run/mod.rs +++ b/server/src/cmds/run/mod.rs @@ -270,7 +270,7 @@ async fn inner( let streams = l.streams_by_id().len(); let env = streamer::Environment { db: &db, - opener: config.rtsp_library.opener(), + opener: &crate::stream::OPENER, shutdown_rx: &shutdown_rx, }; @@ -302,7 +302,6 @@ async fn inner( } // Then start up streams. - let handle = tokio::runtime::Handle::current(); let l = db.lock(); for (i, (id, stream)) in l.streams_by_id().iter().enumerate() { if stream.config.mode != db::json::STREAM_MODE_RECORD { @@ -340,14 +339,10 @@ async fn inner( )?; info!("Starting streamer for {}", streamer.short_name()); let name = format!("s-{}", streamer.short_name()); - let handle = handle.clone(); streamers.push( thread::Builder::new() .name(name) - .spawn(move || { - let _enter = handle.enter(); - streamer.run(); - }) + .spawn(move || streamer.run()) .expect("can't create thread"), ); } @@ -397,7 +392,9 @@ async fn inner( let db = db.clone(); move || { for streamer in streamers.drain(..) { - streamer.join().unwrap(); + if streamer.join().is_err() { + log::error!("streamer panicked; look for previous panic message"); + } } if let Some(mut ss) = syncers { // The syncers shut down when all channels to them have been dropped. diff --git a/server/src/h264.rs b/server/src/h264.rs index 123f619..e3c23ae 100644 --- a/server/src/h264.rs +++ b/server/src/h264.rs @@ -19,16 +19,10 @@ //! would be more trouble than it's worth. use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; +use db::VideoSampleEntryToInsert; use failure::{bail, format_err, Error}; use std::convert::TryFrom; -// See ISO/IEC 14496-10 table 7-1 - NAL unit type codes, syntax element categories, and NAL unit -// type classes. -const NAL_UNIT_SEQ_PARAMETER_SET: u8 = 7; -const NAL_UNIT_PIC_PARAMETER_SET: u8 = 8; - -const NAL_UNIT_TYPE_MASK: u8 = 0x1F; // bottom 5 bits of first byte of unit. - // For certain common sub stream anamorphic resolutions, add a pixel aspect ratio box. // Assume the camera is 16x9. These are just the standard wide mode; default_pixel_aspect_ratio // tries the transpose also. @@ -49,7 +43,7 @@ const PIXEL_ASPECT_RATIOS: [((u16, u16), (u16, u16)); 6] = [ /// doesn't tell us what to do. /// /// Note that at least in the case of .mp4 muxing, we don't need to fix up the underlying SPS. -/// SPS; PixelAspectRatioBox's definition says that it overrides the H.264-level declaration. +/// PixelAspectRatioBox's definition says that it overrides the H.264-level declaration. fn default_pixel_aspect_ratio(width: u16, height: u16) -> (u16, u16) { if width >= height { PIXEL_ASPECT_RATIOS @@ -66,257 +60,121 @@ fn default_pixel_aspect_ratio(width: u16, height: u16) -> (u16, u16) { } } -/// Decodes a H.264 Annex B byte stream into NAL units. Calls `f` for each NAL unit in the byte -/// stream. Aborts if `f` returns error. -/// -/// Note `f` is called with the encoded NAL form, not the RBSP. The NAL header byte and any -/// emulation prevention bytes will be present. -/// -/// See ISO/IEC 14496-10 section B.2: Byte stream NAL unit decoding process. -/// This is a relatively simple, unoptimized implementation. -/// -/// TODO: detect invalid byte streams. For example, several 0x00s not followed by a 0x01, a stream -/// stream not starting with 0x00 0x00 0x00 0x01, or an empty NAL unit. -fn decode_h264_annex_b<'a, F>(mut data: &'a [u8], mut f: F) -> Result<(), Error> -where - F: FnMut(&'a [u8]) -> Result<(), Error>, -{ - let start_code = &b"\x00\x00\x01"[..]; - use nom::FindSubstring; - 'outer: while let Some(pos) = data.find_substring(start_code) { - let mut unit = &data[0..pos]; - data = &data[pos + start_code.len()..]; - // Have zero or more bytes that end in a start code. Strip out any trailing 0x00s and - // process the unit if there's anything left. - loop { - match unit.last() { - None => continue 'outer, - Some(b) if *b == 0 => { - unit = &unit[..unit.len() - 1]; - } - Some(_) => break, - } - } - f(unit)?; +/// Parses the `AvcDecoderConfigurationRecord` in the "extra data". +pub fn parse_extra_data(extradata: &[u8]) -> Result { + let avcc = h264_reader::avcc::AvcDecoderConfigurationRecord::try_from(extradata) + .map_err(|e| format_err!("Bad AvcDecoderConfigurationRecord: {:?}", e))?; + if avcc.num_of_sequence_parameter_sets() != 1 { + bail!("Multiple SPSs!"); } - - // No remaining start codes; likely a unit left. - if !data.is_empty() { - f(data)?; - } - Ok(()) -} - -/// Parses Annex B extra data, returning a tuple holding the `sps` and `pps` substrings. -fn parse_annex_b_extra_data(data: &[u8]) -> Result<(&[u8], &[u8]), Error> { - let mut sps = None; - let mut pps = None; - decode_h264_annex_b(data, |unit| { - let nal_type = (unit[0] as u8) & NAL_UNIT_TYPE_MASK; - match nal_type { - NAL_UNIT_SEQ_PARAMETER_SET => sps = Some(unit), - NAL_UNIT_PIC_PARAMETER_SET => pps = Some(unit), - _ => bail!("Expected SPS and PPS; got type {}", nal_type), - }; - Ok(()) + let ctx = avcc + .create_context(()) + .map_err(|e| format_err!("Can't load SPS+PPS: {:?}", e))?; + let sps = ctx + .sps_by_id(h264_reader::nal::pps::ParamSetId::from_u32(0).unwrap()) + .ok_or_else(|| format_err!("No SPS 0"))?; + let pixel_dimensions = sps + .pixel_dimensions() + .map_err(|e| format_err!("SPS has invalid pixel dimensions: {:?}", e))?; + let width = u16::try_from(pixel_dimensions.0).map_err(|_| { + format_err!( + "bad dimensions {}x{}", + pixel_dimensions.0, + pixel_dimensions.1 + ) })?; - match (sps, pps) { - (Some(s), Some(p)) => Ok((s, p)), - _ => bail!("SPS and PPS must be specified"), - } -} - -/// Parsed representation of ffmpeg's "extradata". -#[derive(Debug, PartialEq, Eq)] -pub struct ExtraData { - pub entry: db::VideoSampleEntryToInsert, - - /// True iff sample data should be transformed from Annex B format to AVC format via a call to - /// `transform_sample_data`. (The assumption is that if the extra data was in Annex B format, - /// the sample data is also.) - pub need_transform: bool, -} - -impl ExtraData { - /// Parses "extradata" from ffmpeg. This data may be in either Annex B format or AVC format. - pub fn parse(extradata: &[u8], width: u16, height: u16) -> Result { - let raw_sps_and_pps; - let need_transform; - let ctx; - let sps_owner; - let sps; // reference to either within ctx or to sps_owner. - if extradata.starts_with(b"\x00\x00\x00\x01") || extradata.starts_with(b"\x00\x00\x01") { - // ffmpeg supplied "extradata" in Annex B format. - let (s, p) = parse_annex_b_extra_data(extradata)?; - let rbsp = h264_reader::rbsp::decode_nal(&s[1..]); - sps_owner = h264_reader::nal::sps::SeqParameterSet::from_bytes(&rbsp) - .map_err(|e| format_err!("Bad SPS: {:?}", e))?; - sps = &sps_owner; - raw_sps_and_pps = Some((s, p)); - need_transform = true; - } else { - // Assume "extradata" holds an AVCDecoderConfiguration. - need_transform = false; - raw_sps_and_pps = None; - let avcc = h264_reader::avcc::AvcDecoderConfigurationRecord::try_from(extradata) - .map_err(|e| format_err!("Bad AvcDecoderConfigurationRecord: {:?}", e))?; - if avcc.num_of_sequence_parameter_sets() != 1 { - bail!("Multiple SPSs!"); - } - ctx = avcc - .create_context(()) - .map_err(|e| format_err!("Can't load SPS+PPS: {:?}", e))?; - sps = ctx - .sps_by_id(h264_reader::nal::pps::ParamSetId::from_u32(0).unwrap()) - .ok_or_else(|| format_err!("No SPS 0"))?; - }; - - let mut sample_entry = Vec::with_capacity(256); - - // This is a concatenation of the following boxes/classes. - - // SampleEntry, ISO/IEC 14496-12 section 8.5.2. - let avc1_len_pos = sample_entry.len(); - // length placeholder + type + reserved + data_reference_index = 1 - sample_entry.extend_from_slice(b"\x00\x00\x00\x00avc1\x00\x00\x00\x00\x00\x00\x00\x01"); - - // VisualSampleEntry, ISO/IEC 14496-12 section 12.1.3. - sample_entry.extend_from_slice(&[0; 16]); // pre-defined + reserved - sample_entry.write_u16::(width)?; - sample_entry.write_u16::(height)?; - sample_entry.extend_from_slice(&[ - 0x00, 0x48, 0x00, 0x00, // horizresolution - 0x00, 0x48, 0x00, 0x00, // vertresolution - 0x00, 0x00, 0x00, 0x00, // reserved - 0x00, 0x01, // frame count - 0x00, 0x00, 0x00, 0x00, // compressorname - 0x00, 0x00, 0x00, 0x00, // - 0x00, 0x00, 0x00, 0x00, // - 0x00, 0x00, 0x00, 0x00, // - 0x00, 0x00, 0x00, 0x00, // - 0x00, 0x00, 0x00, 0x00, // - 0x00, 0x00, 0x00, 0x00, // - 0x00, 0x00, 0x00, 0x00, // - 0x00, 0x18, 0xff, 0xff, // depth + pre_defined - ]); - - // AVCSampleEntry, ISO/IEC 14496-15 section 5.3.4.1. - // AVCConfigurationBox, ISO/IEC 14496-15 section 5.3.4.1. - let avcc_len_pos = sample_entry.len(); - sample_entry.extend_from_slice(b"\x00\x00\x00\x00avcC"); - - if let Some((sps, pps)) = raw_sps_and_pps { - // Create the AVCDecoderConfiguration, ISO/IEC 14496-15 section 5.2.4.1. - // The beginning of the AVCDecoderConfiguration takes a few values from - // the SPS (ISO/IEC 14496-10 section 7.3.2.1.1). One caveat: that section - // defines the syntax in terms of RBSP, not NAL. The difference is the - // escaping of 00 00 01 and 00 00 02; see notes about - // "emulation_prevention_three_byte" in ISO/IEC 14496-10 section 7.4. - // It looks like 00 is not a valid value of profile_idc, so this distinction - // shouldn't be relevant here. And ffmpeg seems to ignore it. - sample_entry.push(1); // configurationVersion - sample_entry.push(sps[1]); // profile_idc . AVCProfileIndication - sample_entry.push(sps[2]); // ...misc bits... . profile_compatibility - sample_entry.push(sps[3]); // level_idc . AVCLevelIndication - - // Hardcode lengthSizeMinusOne to 3, matching TransformSampleData's 4-byte - // lengths. - sample_entry.push(0xff); - - // Only support one SPS and PPS. - // ffmpeg's ff_isom_write_avcc has the same limitation, so it's probably - // fine. This next byte is a reserved 0b111 + a 5-bit # of SPSs (1). - sample_entry.push(0xe1); - sample_entry.write_u16::(u16::try_from(sps.len())?)?; - sample_entry.extend_from_slice(sps); - sample_entry.push(1); // # of PPSs. - sample_entry.write_u16::(u16::try_from(pps.len())?)?; - sample_entry.extend_from_slice(pps); - } else { - sample_entry.extend_from_slice(extradata); - }; - - // Fix up avc1 and avcC box lengths. - let cur_pos = sample_entry.len(); - BigEndian::write_u32( - &mut sample_entry[avcc_len_pos..avcc_len_pos + 4], - u32::try_from(cur_pos - avcc_len_pos)?, - ); - - // PixelAspectRatioBox, ISO/IEC 14496-12 section 12.1.4.2. - // Write a PixelAspectRatioBox if necessary, as the sub streams can be be anamorphic. - let pasp = sps - .vui_parameters - .as_ref() - .and_then(|v| v.aspect_ratio_info.as_ref()) - .and_then(|a| a.clone().get()) - .unwrap_or_else(|| default_pixel_aspect_ratio(width, height)); - if pasp != (1, 1) { - sample_entry.extend_from_slice(b"\x00\x00\x00\x10pasp"); // length + box name - sample_entry.write_u32::(pasp.0.into())?; - sample_entry.write_u32::(pasp.1.into())?; - } - - let cur_pos = sample_entry.len(); - BigEndian::write_u32( - &mut sample_entry[avc1_len_pos..avc1_len_pos + 4], - u32::try_from(cur_pos - avc1_len_pos)?, - ); - - let profile_idc = sample_entry[103]; - let constraint_flags = sample_entry[104]; - let level_idc = sample_entry[105]; - - let rfc6381_codec = format!( - "avc1.{:02x}{:02x}{:02x}", - profile_idc, constraint_flags, level_idc - ); - Ok(ExtraData { - entry: db::VideoSampleEntryToInsert { - data: sample_entry, - rfc6381_codec, - width, - height, - pasp_h_spacing: pasp.0, - pasp_v_spacing: pasp.1, - }, - need_transform, - }) - } -} - -/// Transforms sample data from Annex B format to AVC format. Should be called on samples iff -/// `ExtraData::need_transform` is true. Uses an out parameter `avc_sample` rather than a return -/// so that memory allocations can be reused from sample to sample. -pub fn transform_sample_data(annexb_sample: &[u8], avc_sample: &mut Vec) -> Result<(), Error> { - // See AVCParameterSamples, ISO/IEC 14496-15 section 5.3.2. - avc_sample.clear(); - - // The output will be about as long as the input. Annex B stop codes require at least three - // bytes; many seem to be four. The output lengths are exactly four. - avc_sample.reserve(annexb_sample.len() + 4); - decode_h264_annex_b(annexb_sample, |unit| { - // 4-byte length; this must match ParseExtraData's lengthSizeMinusOne == 3. - avc_sample.write_u32::(unit.len() as u32)?; // length - avc_sample.extend_from_slice(unit); - Ok(()) + let height = u16::try_from(pixel_dimensions.1).map_err(|_| { + format_err!( + "bad dimensions {}x{}", + pixel_dimensions.0, + pixel_dimensions.1 + ) })?; - Ok(()) + + let mut sample_entry = Vec::with_capacity(256); + + // This is a concatenation of the following boxes/classes. + + // SampleEntry, ISO/IEC 14496-12 section 8.5.2. + let avc1_len_pos = sample_entry.len(); + // length placeholder + type + reserved + data_reference_index = 1 + sample_entry.extend_from_slice(b"\x00\x00\x00\x00avc1\x00\x00\x00\x00\x00\x00\x00\x01"); + + // VisualSampleEntry, ISO/IEC 14496-12 section 12.1.3. + sample_entry.extend_from_slice(&[0; 16]); // pre-defined + reserved + sample_entry.write_u16::(width)?; + sample_entry.write_u16::(height)?; + sample_entry.extend_from_slice(&[ + 0x00, 0x48, 0x00, 0x00, // horizresolution + 0x00, 0x48, 0x00, 0x00, // vertresolution + 0x00, 0x00, 0x00, 0x00, // reserved + 0x00, 0x01, // frame count + 0x00, 0x00, 0x00, 0x00, // compressorname + 0x00, 0x00, 0x00, 0x00, // + 0x00, 0x00, 0x00, 0x00, // + 0x00, 0x00, 0x00, 0x00, // + 0x00, 0x00, 0x00, 0x00, // + 0x00, 0x00, 0x00, 0x00, // + 0x00, 0x00, 0x00, 0x00, // + 0x00, 0x00, 0x00, 0x00, // + 0x00, 0x18, 0xff, 0xff, // depth + pre_defined + ]); + + // AVCSampleEntry, ISO/IEC 14496-15 section 5.3.4.1. + // AVCConfigurationBox, ISO/IEC 14496-15 section 5.3.4.1. + let avcc_len_pos = sample_entry.len(); + sample_entry.extend_from_slice(b"\x00\x00\x00\x00avcC"); + sample_entry.extend_from_slice(extradata); + + // Fix up avc1 and avcC box lengths. + let cur_pos = sample_entry.len(); + BigEndian::write_u32( + &mut sample_entry[avcc_len_pos..avcc_len_pos + 4], + u32::try_from(cur_pos - avcc_len_pos)?, + ); + + // PixelAspectRatioBox, ISO/IEC 14496-12 section 12.1.4.2. + // Write a PixelAspectRatioBox if necessary, as the sub streams can be be anamorphic. + let pasp = sps + .vui_parameters + .as_ref() + .and_then(|v| v.aspect_ratio_info.as_ref()) + .and_then(|a| a.clone().get()) + .unwrap_or_else(|| default_pixel_aspect_ratio(width, height)); + if pasp != (1, 1) { + sample_entry.extend_from_slice(b"\x00\x00\x00\x10pasp"); // length + box name + sample_entry.write_u32::(pasp.0.into())?; + sample_entry.write_u32::(pasp.1.into())?; + } + + let cur_pos = sample_entry.len(); + BigEndian::write_u32( + &mut sample_entry[avc1_len_pos..avc1_len_pos + 4], + u32::try_from(cur_pos - avc1_len_pos)?, + ); + + let profile_idc = sample_entry[103]; + let constraint_flags = sample_entry[104]; + let level_idc = sample_entry[105]; + + let rfc6381_codec = format!( + "avc1.{:02x}{:02x}{:02x}", + profile_idc, constraint_flags, level_idc + ); + Ok(VideoSampleEntryToInsert { + data: sample_entry, + rfc6381_codec, + width, + height, + pasp_h_spacing: pasp.0, + pasp_v_spacing: pasp.1, + }) } #[cfg(test)] mod tests { use db::testutil; - #[rustfmt::skip] - const ANNEX_B_TEST_INPUT: [u8; 35] = [ - 0x00, 0x00, 0x00, 0x01, 0x67, 0x4d, 0x00, 0x1f, - 0x9a, 0x66, 0x02, 0x80, 0x2d, 0xff, 0x35, 0x01, - 0x01, 0x01, 0x40, 0x00, 0x00, 0xfa, 0x00, 0x00, - 0x1d, 0x4c, 0x01, 0x00, 0x00, 0x00, 0x01, 0x68, - 0xee, 0x3c, 0x80, - ]; - #[rustfmt::skip] const AVC_DECODER_CONFIG_TEST_INPUT: [u8; 38] = [ 0x01, 0x4d, 0x00, 0x1f, 0xff, 0xe1, 0x00, 0x17, @@ -347,79 +205,14 @@ mod tests { 0x68, 0xee, 0x3c, 0x80, ]; - #[test] - fn test_decode() { - testutil::init(); - let data = &ANNEX_B_TEST_INPUT; - let mut pieces = Vec::new(); - super::decode_h264_annex_b(data, |p| { - pieces.push(p); - Ok(()) - }) - .unwrap(); - assert_eq!(&pieces, &[&data[4..27], &data[31..]]); - } - #[test] fn test_sample_entry_from_avc_decoder_config() { testutil::init(); - let e = super::ExtraData::parse(&AVC_DECODER_CONFIG_TEST_INPUT, 1280, 720).unwrap(); - assert_eq!(&e.entry.data[..], &TEST_OUTPUT[..]); - assert_eq!(e.entry.width, 1280); - assert_eq!(e.entry.height, 720); - assert_eq!(e.entry.rfc6381_codec, "avc1.4d001f"); - assert_eq!(e.need_transform, false); - } - - #[test] - fn test_sample_entry_from_annex_b() { - testutil::init(); - let e = super::ExtraData::parse(&ANNEX_B_TEST_INPUT, 1280, 720).unwrap(); - assert_eq!(e.entry.width, 1280); - assert_eq!(e.entry.height, 720); - assert_eq!(e.entry.rfc6381_codec, "avc1.4d001f"); - assert_eq!(e.need_transform, true); - } - - #[test] - fn test_transform_sample_data() { - testutil::init(); - #[rustfmt::skip] - const INPUT: [u8; 64] = [ - 0x00, 0x00, 0x00, 0x01, 0x67, 0x4d, 0x00, 0x1f, - 0x9a, 0x66, 0x02, 0x80, 0x2d, 0xff, 0x35, 0x01, - 0x01, 0x01, 0x40, 0x00, 0x00, 0xfa, 0x00, 0x00, - 0x1d, 0x4c, 0x01, - - 0x00, 0x00, 0x00, 0x01, 0x68, 0xee, 0x3c, 0x80, - - 0x00, 0x00, 0x00, 0x01, 0x06, 0x06, 0x01, 0xc4, - 0x80, - - 0x00, 0x00, 0x00, 0x01, 0x65, 0x88, 0x80, 0x10, - 0x00, 0x08, 0x7f, 0x00, 0x5d, 0x27, 0xb5, 0xc1, - 0xff, 0x8c, 0xd6, 0x35, - // (truncated) - ]; - #[rustfmt::skip] - const EXPECTED_OUTPUT: [u8; 64] = [ - 0x00, 0x00, 0x00, 0x17, 0x67, 0x4d, 0x00, 0x1f, - 0x9a, 0x66, 0x02, 0x80, 0x2d, 0xff, 0x35, 0x01, - 0x01, 0x01, 0x40, 0x00, 0x00, 0xfa, 0x00, 0x00, - 0x1d, 0x4c, 0x01, - - 0x00, 0x00, 0x00, 0x04, 0x68, 0xee, 0x3c, 0x80, - - 0x00, 0x00, 0x00, 0x05, 0x06, 0x06, 0x01, 0xc4, - 0x80, - - 0x00, 0x00, 0x00, 0x10, 0x65, 0x88, 0x80, 0x10, - 0x00, 0x08, 0x7f, 0x00, 0x5d, 0x27, 0xb5, 0xc1, - 0xff, 0x8c, 0xd6, 0x35, - ]; - let mut out = Vec::new(); - super::transform_sample_data(&INPUT, &mut out).unwrap(); - assert_eq!(&out[..], &EXPECTED_OUTPUT[..]); + let e = super::parse_extra_data(&AVC_DECODER_CONFIG_TEST_INPUT).unwrap(); + assert_eq!(&e.data[..], &TEST_OUTPUT[..]); + assert_eq!(e.width, 1280); + assert_eq!(e.height, 720); + assert_eq!(e.rfc6381_codec, "avc1.4d001f"); } #[test] diff --git a/server/src/mp4.rs b/server/src/mp4.rs index 265259f..5d32348 100644 --- a/server/src/mp4.rs +++ b/server/src/mp4.rs @@ -1984,7 +1984,7 @@ impl fmt::Debug for File { #[cfg(test)] mod tests { use super::*; - use crate::stream::{self, Opener}; + use crate::stream; use base::clock::RealClocks; use byteorder::{BigEndian, ByteOrder}; use db::recording::{self, TIME_UNITS_PER_SEC}; @@ -2278,20 +2278,13 @@ mod tests { } fn copy_mp4_to_db(db: &mut TestDb) { - let (extra_data, mut input) = stream::FFMPEG - .open( - "test".to_owned(), - stream::Source::File("src/testdata/clip.mp4"), - ) - .unwrap(); + let (extra_data, input) = + stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap(); + let mut input: Box = Box::new(input); // 2015-04-26 00:00:00 UTC. const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); - let video_sample_entry_id = db - .db - .lock() - .insert_video_sample_entry(extra_data.entry) - .unwrap(); + let video_sample_entry_id = db.db.lock().insert_video_sample_entry(extra_data).unwrap(); let dir = db.dirs_by_stream_id.get(&TEST_STREAM_ID).unwrap(); let mut output = writer::Writer::new( dir, @@ -2324,7 +2317,7 @@ mod tests { output .write( &mut db.shutdown_rx, - pkt.data, + &pkt.data[..], frame_time, pkt.pts, pkt.is_key, @@ -2398,18 +2391,28 @@ mod tests { } fn compare_mp4s(new_filename: &str, pts_offset: i64, shorten: i64) { - let (orig_extra_data, mut orig) = stream::FFMPEG - .open( - "test".to_owned(), - stream::Source::File("src/testdata/clip.mp4"), - ) - .unwrap(); - let (new_extra_data, mut new) = stream::FFMPEG - .open("test".to_owned(), stream::Source::File(new_filename)) - .unwrap(); + let (orig_extra_data, orig) = + stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap(); + let (new_extra_data, new) = stream::testutil::Mp4Stream::open(new_filename).unwrap(); + + if pts_offset > 0 { + // The mp4 crate doesn't interpret the edit list. Manually inspect it. + let elst = new.elst().unwrap(); + assert_eq!( + &elst.entries, + &[mp4::mp4box::elst::ElstEntry { + segment_duration: new.duration(), + media_time: pts_offset as u64, + media_rate: mp4::FixedPointU16::new(1), + }] + ); + } + + let mut orig: Box = Box::new(orig); + let mut new: Box = Box::new(new); assert_eq!(orig_extra_data, new_extra_data); let mut final_durations = None; - loop { + for i in 0.. { let orig_pkt = match orig.next() { Ok(p) => Some(p), Err(e) if e.to_string() == "End of file" => None, @@ -2431,9 +2434,13 @@ mod tests { (None, None) => break, (o, n) => panic!("orig: {} new: {}", o.is_some(), n.is_some()), }; - assert_eq!(orig_pkt.pts, new_pkt.pts + pts_offset); - assert_eq!(orig_pkt.data, new_pkt.data); - assert_eq!(orig_pkt.is_key, new_pkt.is_key); + assert_eq!( + orig_pkt.pts, new_pkt.pts, /*+ pts_offset*/ + "pkt {} pts", + i + ); + assert_eq!(orig_pkt.data, new_pkt.data, "pkt {} data", i); + assert_eq!(orig_pkt.is_key, new_pkt.is_key, "pkt {} key", i); final_durations = Some((i64::from(orig_pkt.duration), i64::from(new_pkt.duration))); } diff --git a/server/src/stream.rs b/server/src/stream.rs index 83fab29..c225626 100644 --- a/server/src/stream.rs +++ b/server/src/stream.rs @@ -3,96 +3,37 @@ // SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception. use crate::h264; -use cstr::cstr; +use bytes::Bytes; use failure::format_err; use failure::{bail, Error}; use futures::StreamExt; -use lazy_static::lazy_static; -use log::warn; -use retina::client::{Credentials, Transport}; +use retina::client::Demuxed; use retina::codec::{CodecItem, VideoParameters}; -use serde::Deserialize; -use std::convert::TryFrom; -use std::ffi::CString; use std::pin::Pin; use std::result::Result; -use std::sync::Arc; use url::Url; -static START_FFMPEG: parking_lot::Once = parking_lot::Once::new(); - static RETINA_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); -lazy_static! { - pub static ref FFMPEG: Ffmpeg = Ffmpeg::new(); -} - -#[derive(Copy, Clone, Debug, Deserialize)] -pub enum RtspLibrary { - Ffmpeg, - Retina, -} - -impl Default for RtspLibrary { - fn default() -> Self { - RtspLibrary::Retina - } -} - -impl std::str::FromStr for RtspLibrary { - type Err = Error; - - fn from_str(s: &str) -> Result { - Ok(match s { - "ffmpeg" => RtspLibrary::Ffmpeg, - "retina" => RtspLibrary::Retina, - _ => bail!("unknown RTSP library {:?}", s), - }) - } -} - -impl RtspLibrary { - pub fn opener(&self) -> &'static dyn Opener { - match self { - RtspLibrary::Ffmpeg => &*FFMPEG, - RtspLibrary::Retina => &RETINA, - } - } -} - -#[cfg(test)] -pub enum Source<'a> { - /// A filename, for testing. - File(&'a str), - - /// An RTSP stream, for production use. - Rtsp { - url: Url, - username: Option, - password: Option, - transport: Transport, - session_group: Arc, - }, -} - -#[cfg(not(test))] -pub enum Source { - /// An RTSP stream, for production use. - Rtsp { - url: Url, - username: Option, - password: Option, - transport: Transport, - session_group: Arc, - }, -} - +/// Opens a RTSP stream. This is a trait for test injection. pub trait Opener: Send + Sync { - fn open(&self, label: String, src: Source) - -> Result<(h264::ExtraData, Box), Error>; + /// Opens the given RTSP URL. + /// + /// Note: despite the blocking interface, this expects to be called from + /// a tokio runtime with IO and time enabled. Takes the + /// [`tokio::runtime::Runtime`] rather than using + /// `tokio::runtime::Handle::current()` because `Runtime::block_on` can + /// drive IO and timers while `Handle::block_on` can not. + fn open<'a>( + &self, + rt: &'a tokio::runtime::Runtime, + label: String, + url: Url, + options: retina::client::SessionOptions, + ) -> Result<(db::VideoSampleEntryToInsert, Box), Error>; } -pub struct VideoFrame<'a> { +pub struct VideoFrame { pub pts: i64, /// An estimate of the duration of the frame, or zero. @@ -100,308 +41,54 @@ pub struct VideoFrame<'a> { pub duration: i32, pub is_key: bool, - pub data: &'a [u8], + pub data: Bytes, } pub trait Stream: Send { fn next(&mut self) -> Result; } -pub struct Ffmpeg {} +pub struct RealOpener; -impl Ffmpeg { - fn new() -> Ffmpeg { - START_FFMPEG.call_once(|| { - ffmpeg::Ffmpeg::new(); - }); - Ffmpeg {} - } -} +pub const OPENER: RealOpener = RealOpener; -impl Opener for Ffmpeg { - fn open( +impl Opener for RealOpener { + fn open<'a>( &self, + rt: &'a tokio::runtime::Runtime, label: String, - src: Source, - ) -> Result<(h264::ExtraData, Box), Error> { - use ffmpeg::avformat::InputFormatContext; - let mut input = match src { - #[cfg(test)] - Source::File(filename) => { - let mut open_options = ffmpeg::avutil::Dictionary::new(); - - // Work around https://github.com/scottlamb/moonfire-nvr/issues/10 - open_options - .set(cstr!("advanced_editlist"), cstr!("false")) - .unwrap(); - let url = format!("file:{}", filename); - let i = InputFormatContext::open( - &CString::new(url.clone()).unwrap(), - &mut open_options, - )?; - if !open_options.empty() { - warn!( - "{}: While opening URL {}, some options were not understood: {}", - &label, url, open_options - ); - } - i - } - Source::Rtsp { - url, - username, - password, - transport, - .. - } => { - let mut open_options = ffmpeg::avutil::Dictionary::new(); - open_options - .set( - cstr!("rtsp_transport"), - match transport { - Transport::Tcp => cstr!("tcp"), - Transport::Udp => cstr!("udp"), - }, - ) - .unwrap(); - open_options - .set(cstr!("user-agent"), cstr!("moonfire-nvr")) - .unwrap(); - - // 10-second socket timeout, in microseconds. - open_options - .set(cstr!("stimeout"), cstr!("10000000")) - .unwrap(); - - // Without this option, the first packet has an incorrect pts. - // https://trac.ffmpeg.org/ticket/5018 - open_options - .set(cstr!("fflags"), cstr!("nobuffer")) - .unwrap(); - - // Moonfire NVR currently only supports video, so receiving audio is wasteful. - // It also triggers . - open_options - .set(cstr!("allowed_media_types"), cstr!("video")) - .unwrap(); - - let mut url_with_credentials = url.clone(); - if let Some(u) = username.as_deref() { - url_with_credentials - .set_username(u) - .map_err(|_| format_err!("unable to set username on url {}", url))?; - } - url_with_credentials - .set_password(password.as_deref()) - .map_err(|_| format_err!("unable to set password on url {}", url))?; - let i = InputFormatContext::open( - &CString::new(url_with_credentials.as_str())?, - &mut open_options, - )?; - if !open_options.empty() { - warn!( - "{}: While opening URL {}, some options were not understood: {}", - &label, url, open_options - ); - } - i - } - }; - - input.find_stream_info()?; - - // Find the video stream. - let mut video_i = None; - { - let s = input.streams(); - for i in 0..s.len() { - if s.get(i).codecpar().codec_type().is_video() { - video_i = Some(i); - break; - } - } - } - let video_i = match video_i { - Some(i) => i, - None => bail!("no video stream"), - }; - - let video = input.streams().get(video_i); - let codec = video.codecpar(); - let codec_id = codec.codec_id(); - if !codec_id.is_h264() { - bail!("stream's video codec {:?} is not h264", codec_id); - } - let tb = video.time_base(); - if tb.num != 1 || tb.den != 90000 { - bail!( - "video stream has timebase {}/{}; expected 1/90000", - tb.num, - tb.den - ); - } - let dims = codec.dims(); - let extra_data = h264::ExtraData::parse( - codec.extradata(), - u16::try_from(dims.width)?, - u16::try_from(dims.height)?, - )?; - let need_transform = extra_data.need_transform; - let stream = Box::new(FfmpegStream { - input, - video_i, - data: Vec::new(), - need_transform, - }); - Ok((extra_data, stream)) - } -} - -struct FfmpegStream { - input: ffmpeg::avformat::InputFormatContext<'static>, - video_i: usize, - data: Vec, - need_transform: bool, -} - -impl Stream for FfmpegStream { - fn next(&mut self) -> Result { - let pkt = loop { - let pkt = self.input.read_frame()?; - if pkt.stream_index() == self.video_i { - break pkt; - } - }; - let data = pkt - .data() - .ok_or_else(|| format_err!("packet with no data"))?; - if self.need_transform { - h264::transform_sample_data(data, &mut self.data)?; - } else { - // This copy isn't strictly necessary, but this path is only taken in testing anyway. - self.data.clear(); - self.data.extend_from_slice(data); - } - let pts = pkt.pts().ok_or_else(|| format_err!("packet with no pts"))?; - Ok(VideoFrame { - pts, - is_key: pkt.is_key(), - duration: pkt.duration(), - data: &self.data, - }) - } -} - -pub struct RetinaOpener {} - -pub const RETINA: RetinaOpener = RetinaOpener {}; - -impl Opener for RetinaOpener { - fn open( - &self, - label: String, - src: Source, - ) -> Result<(h264::ExtraData, Box), Error> { - let (startup_tx, startup_rx) = tokio::sync::oneshot::channel(); - let (frame_tx, frame_rx) = tokio::sync::mpsc::channel(1); - let handle = tokio::runtime::Handle::current(); - let (url, options) = match src { - #[cfg(test)] - Source::File(_) => bail!("Retina doesn't support .mp4 files"), - Source::Rtsp { - url, - username, - password, - transport, - session_group, - } => ( - url, - retina::client::SessionOptions::default() - .creds(match (username, password) { - (None, None) => None, - (Some(username), password) => Some(Credentials { - username, - password: password.unwrap_or_default(), - }), - _ => bail!("must supply username when supplying password"), - }) - .transport(transport) - .session_group(session_group) - .user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION"))), - ), - }; - - handle.spawn(async move { - let r = tokio::time::timeout(RETINA_TIMEOUT, RetinaOpener::play(url, options)).await; - let (mut session, video_params, first_frame) = - match r.unwrap_or_else(|_| Err(format_err!("timeout opening stream"))) { - Err(e) => { - let _ = startup_tx.send(Err(e)); - return; - } - Ok((s, p, f)) => (s, p, f), - }; - if startup_tx.send(Ok(video_params)).is_err() { - return; - } - if frame_tx.send(Ok(first_frame)).await.is_err() { - return; - } - - // Read following frames. - let mut deadline = tokio::time::Instant::now() + RETINA_TIMEOUT; - loop { - match tokio::time::timeout_at(deadline, session.next()).await { - Err(_) => { - let _ = frame_tx - .send(Err(format_err!("timeout getting next frame"))) - .await; - return; - } - Ok(Some(Err(e))) => { - let _ = frame_tx.send(Err(e.into())).await; - return; - } - Ok(None) => break, - Ok(Some(Ok(CodecItem::VideoFrame(v)))) => { - if let Some(p) = v.new_parameters { - // TODO: we could start a new recording without dropping the connection. - let _ = frame_tx.send(Err(format_err!("parameter; change: {:?}", p))); - return; - } - deadline = tokio::time::Instant::now() + RETINA_TIMEOUT; - if v.loss > 0 { - log::warn!( - "{}: lost {} RTP packets @ {}", - &label, - v.loss, - v.start_ctx() - ); - } - if frame_tx.send(Ok(v)).await.is_err() { - return; // other end died. - } - } - Ok(Some(Ok(_))) => {} - } - } - }); - let video_params = handle.block_on(startup_rx)??; - let dims = video_params.pixel_dimensions(); - let extra_data = h264::ExtraData::parse( - video_params.extra_data(), - u16::try_from(dims.0)?, - u16::try_from(dims.1)?, - )?; + url: Url, + options: retina::client::SessionOptions, + ) -> Result<(db::VideoSampleEntryToInsert, Box), Error> { + let options = options.user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION"))); + let (session, video_params, first_frame) = rt.block_on(tokio::time::timeout( + RETINA_TIMEOUT, + RetinaStream::play(url, options), + ))??; + let extra_data = h264::parse_extra_data(video_params.extra_data())?; let stream = Box::new(RetinaStream { - frame_rx, - frame: None, + rt, + label, + session, + first_frame: Some(first_frame), }); Ok((extra_data, stream)) } } -impl RetinaOpener { +struct RetinaStream<'a> { + rt: &'a tokio::runtime::Runtime, + label: String, + session: Pin>, + + /// The first frame, if not yet returned from `next`. + /// + /// This frame is special because we sometimes need to fetch it as part of getting the video + /// parameters. + first_frame: Option, +} + +impl<'a> RetinaStream<'a> { /// Plays to first frame. No timeout; that's the caller's responsibility. async fn play( url: Url, @@ -459,27 +146,119 @@ impl RetinaOpener { first_frame, )) } + + /// Fetches a non-initial frame. + async fn fetch_next_frame( + label: &str, + mut session: Pin<&mut Demuxed>, + ) -> Result { + loop { + match session.next().await.transpose()? { + None => bail!("end of stream"), + Some(CodecItem::VideoFrame(v)) => { + if let Some(p) = v.new_parameters { + // TODO: we could start a new recording without dropping the connection. + bail!("parameter change: {:?}", p); + } + if v.loss > 0 { + log::warn!( + "{}: lost {} RTP packets @ {}", + &label, + v.loss, + v.start_ctx() + ); + } + return Ok(v); + } + Some(_) => {} + } + } + } } -struct RetinaStream { - frame_rx: tokio::sync::mpsc::Receiver>, - frame: Option, -} - -impl Stream for RetinaStream { +impl<'a> Stream for RetinaStream<'a> { fn next(&mut self) -> Result { - // TODO: use Option::insert after bumping MSRV to 1.53. - self.frame = Some( - self.frame_rx - .blocking_recv() - .ok_or_else(|| format_err!("stream ended"))??, - ); - let frame = self.frame.as_ref().unwrap(); + let frame = self.first_frame.take().map(Ok).unwrap_or_else(|| { + self.rt + .block_on(tokio::time::timeout( + RETINA_TIMEOUT, + RetinaStream::fetch_next_frame(&self.label, self.session.as_mut()), + )) + .map_err(|_| format_err!("timeout getting next frame"))? + })?; Ok(VideoFrame { pts: frame.timestamp.elapsed(), duration: 0, is_key: frame.is_random_access_point, - data: &frame.data()[..], + data: frame.into_data(), }) } } + +#[cfg(test)] +pub mod testutil { + use super::*; + use std::convert::TryFrom; + use std::io::Cursor; + + pub struct Mp4Stream { + reader: mp4::Mp4Reader>>, + h264_track_id: u32, + next_sample_id: u32, + } + + impl Mp4Stream { + /// Opens a stream, with a return matching that expected by [`Opener`]. + pub fn open(path: &str) -> Result<(db::VideoSampleEntryToInsert, Self), Error> { + let f = std::fs::read(path)?; + let len = f.len(); + let reader = mp4::Mp4Reader::read_header(Cursor::new(f), u64::try_from(len)?)?; + let h264_track = match reader + .tracks() + .values() + .find(|t| matches!(t.media_type(), Ok(mp4::MediaType::H264))) + { + None => bail!("expected a H.264 track"), + Some(t) => t, + }; + let extra_data = h264::parse_extra_data(&h264_track.extra_data()?[..])?; + let h264_track_id = h264_track.track_id(); + let stream = Mp4Stream { + reader, + h264_track_id, + next_sample_id: 1, + }; + Ok((extra_data, stream)) + } + + pub fn duration(&self) -> u64 { + self.reader.moov.mvhd.duration + } + + /// Returns the edit list from the H.264 stream, if any. + pub fn elst(&self) -> Option<&mp4::mp4box::elst::ElstBox> { + let h264_track = self.reader.tracks().get(&self.h264_track_id).unwrap(); + h264_track + .trak + .edts + .as_ref() + .and_then(|edts| edts.elst.as_ref()) + } + } + + impl Stream for Mp4Stream { + fn next(&mut self) -> Result { + let sample = self + .reader + .read_sample(self.h264_track_id, self.next_sample_id)? + .ok_or_else(|| format_err!("End of file"))?; + self.next_sample_id += 1; + Ok(VideoFrame { + pts: sample.start_time as i64, + duration: sample.duration as i32, + is_key: sample.is_sync, + data: sample.bytes, + }) + } + } +} diff --git a/server/src/streamer.rs b/server/src/streamer.rs index dc0a741..33bcc4e 100644 --- a/server/src/streamer.rs +++ b/server/src/streamer.rs @@ -110,11 +110,15 @@ where } /// Runs the streamer; blocks. - /// Note that when using Retina as the RTSP library, this must be called - /// within a tokio runtime context; see [tokio::runtime::Handle]. pub fn run(&mut self) { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .unwrap(); + let _guard = rt.enter(); while self.shutdown_rx.check().is_ok() { - if let Err(e) = self.run_once() { + if let Err(e) = self.run_once(&rt) { let sleep_time = time::Duration::seconds(1); warn!( "{}: sleeping for {} after error: {}", @@ -128,7 +132,7 @@ where info!("{}: shutting down", self.short_name); } - fn run_once(&mut self) -> Result<(), Error> { + fn run_once(&mut self, rt: &tokio::runtime::Runtime) -> Result<(), Error> { info!("{}: Opening input: {}", self.short_name, self.url.as_str()); let clocks = self.db.clocks(); @@ -142,7 +146,7 @@ where max_expires.saturating_duration_since(tokio::time::Instant::now()), status.num_sessions ); - tokio::runtime::Handle::current().block_on(async { + rt.block_on(async { tokio::select! { _ = self.session_group.await_stale_sessions(&status) => Ok(()), _ = self.shutdown_rx.as_future() => Err(base::shutdown::ShutdownError), @@ -160,28 +164,26 @@ where let (extra_data, mut stream) = { let _t = TimerGuard::new(&clocks, || format!("opening {}", self.url.as_str())); self.opener.open( + rt, self.short_name.clone(), - stream::Source::Rtsp { - url: self.url.clone(), - username: if self.username.is_empty() { + self.url.clone(), + retina::client::SessionOptions::default() + .creds(if self.username.is_empty() { None } else { - Some(self.username.clone()) - }, - password: if self.password.is_empty() { - None - } else { - Some(self.password.clone()) - }, - transport: self.transport, - session_group: self.session_group.clone(), - }, + Some(retina::client::Credentials { + username: self.username.clone(), + password: self.password.clone(), + }) + }) + .transport(self.transport) + .session_group(self.session_group.clone()), )? }; let realtime_offset = self.db.clocks().realtime() - clocks.monotonic(); let video_sample_entry_id = { let _t = TimerGuard::new(&clocks, || "inserting video sample entry"); - self.db.lock().insert_video_sample_entry(extra_data.entry)? + self.db.lock().insert_video_sample_entry(extra_data)? }; let mut seen_key_frame = false; @@ -253,7 +255,7 @@ where let _t = TimerGuard::new(&clocks, || format!("writing {} bytes", pkt.data.len())); w.write( &mut self.shutdown_rx, - pkt.data, + &pkt.data[..], local_time, pkt.pts, pkt.is_key, @@ -270,8 +272,7 @@ where #[cfg(test)] mod tests { - use crate::h264; - use crate::stream::{self, Opener, Stream}; + use crate::stream::{self, Stream}; use base::clock::{self, Clocks}; use db::{recording, testutil, CompositeId}; use failure::{bail, Error}; @@ -353,20 +354,19 @@ mod tests { struct MockOpener { expected_url: url::Url, - streams: Mutex)>>, + streams: Mutex)>>, shutdown_tx: Mutex>, } impl stream::Opener for MockOpener { fn open( &self, + _rt: &tokio::runtime::Runtime, _label: String, - src: stream::Source, - ) -> Result<(h264::ExtraData, Box), Error> { - match src { - stream::Source::Rtsp { url, .. } => assert_eq!(&url, &self.expected_url), - stream::Source::File(_) => panic!("expected rtsp url"), - }; + url: url::Url, + _options: retina::client::SessionOptions, + ) -> Result<(db::VideoSampleEntryToInsert, Box), Error> { + assert_eq!(&url, &self.expected_url); let mut l = self.streams.lock(); match l.pop() { Some(stream) => { @@ -412,13 +412,10 @@ mod tests { let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0)); clocks.sleep(time::Duration::seconds(86400)); // to 2015-04-26 00:00:00 UTC - let (extra_data, stream) = stream::FFMPEG - .open( - "test".to_owned(), - stream::Source::File("src/testdata/clip.mp4"), - ) - .unwrap(); - let mut stream = ProxyingStream::new(clocks.clone(), time::Duration::seconds(2), stream); + let (extra_data, stream) = + stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap(); + let mut stream = + ProxyingStream::new(clocks.clone(), time::Duration::seconds(2), Box::new(stream)); stream.ts_offset = 123456; // starting pts of the input should be irrelevant stream.ts_offset_pkts_left = u32::max_value(); stream.pkts_left = u32::max_value();