drop ffmpeg support

* switch the config interface over to use Retina and make the test
  button honor rtsp_transport = udp.

* adjust the threading model of the Retina streaming code.

  Before, it spawned a background future that read from the runtime and
  wrote to a channel. Other calls read from this channel.

  After, it does work directly from within the block_on calls (no
  channels).

  The immediate motivation was that the config interface didn't have
  another runtime handy. And passing in a current thread runtime
  deadlocked. I later learned this is a difference between
  Runtime::block_on and Handle::block_on. The former will drive IO and
  timers; the latter will not.

  But this is also more efficient to avoid so many thread hand-offs.
  Both the context switches and the extra spinning that
  tokio appears to do as mentioned here:
  https://github.com/scottlamb/retina/issues/5#issuecomment-871971550

  This may not be the final word on the threading model. Eventually
  I may not have per-stream writing threads at all. But I think it will
  be easier to look at this after getting rid of the separate
  `moonfire-nvr config` subcommand in favor of a web interface.

* in tests, read `.mp4` files via the `mp4` crate rather than ffmpeg.
  The annoying part is that this doesn't parse edit lists; oh well.

* simplify the `Opener` interface. Formerly, it'd take either a RTSP
  URL or a path to a `.mp4` file, and they'd share some code because
  they both sometimes used ffmpeg. Now, they're totally different
  libraries (`retina` vs `mp4`). Pull the latter out to a `testutil`
  module with a different interface that exposes more of the `mp4`
  stuff. Now `Opener` is just for RTSP.

* simplify the h264 module. It had a lot of logic to deal with Annex B.
  Retina doesn't use this encoding.

Fixes #36
Fixes #126
This commit is contained in:
Scott Lamb
2022-03-18 10:30:23 -07:00
parent be3a5b200e
commit 307a3884a0
14 changed files with 403 additions and 840 deletions

42
server/Cargo.lock generated
View File

@@ -1078,19 +1078,6 @@ dependencies = [
"uuid",
]
[[package]]
name = "moonfire-ffmpeg"
version = "0.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccb80b7a294201578799191cd66b077cd90cef999f461edc7919465fc118a135"
dependencies = [
"cc",
"libc",
"log",
"parking_lot",
"pkg-config",
]
[[package]]
name = "moonfire-nvr"
version = "0.7.2"
@@ -1100,7 +1087,6 @@ dependencies = [
"byteorder",
"bytes",
"clap",
"cstr",
"cursive",
"failure",
"fnv",
@@ -1115,7 +1101,7 @@ dependencies = [
"memchr",
"moonfire-base",
"moonfire-db",
"moonfire-ffmpeg",
"mp4",
"mylog",
"nix",
"nom",
@@ -1144,6 +1130,19 @@ dependencies = [
"uuid",
]
[[package]]
name = "mp4"
version = "0.9.2"
source = "git+https://github.com/scottlamb/mp4-rust?branch=moonfire#388fb47653305fb153de4e11d8cbc6f307e02ee5"
dependencies = [
"byteorder",
"bytes",
"num-rational",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "mp4ra-rust"
version = "0.1.0"
@@ -1227,6 +1226,17 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-complex"
version = "0.4.0"
@@ -1264,8 +1274,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d41702bd167c2df5520b384281bc111a4b5efcf7fbc4c9c222c815b07e0a6a6a"
dependencies = [
"autocfg",
"num-bigint",
"num-integer",
"num-traits",
"serde",
]
[[package]]

View File

@@ -26,11 +26,9 @@ blake3 = "1.0.0"
bytes = "1"
byteorder = "1.0"
clap = { version = "2.33.3", default-features = false, features = ["color", "wrap_help"] }
cstr = "0.2.5"
cursive = "0.17.0"
db = { package = "moonfire-db", path = "db" }
failure = "0.1.1"
ffmpeg = { package = "moonfire-ffmpeg", version = "0.0.2" }
futures = "0.3"
fnv = "1.0"
h264-reader = "0.5.0"
@@ -66,6 +64,7 @@ url = "2.1.1"
uuid = { version = "0.8", features = ["serde", "std", "v4"] }
[dev-dependencies]
mp4 = { git = "https://github.com/scottlamb/mp4-rust", branch = "moonfire" }
num-rational = { version = "0.4.0", default-features = false, features = ["std"] }
reqwest = { version = "0.11.0", default-features = false, features = ["json"] }
tempfile = "3.2.0"

View File

@@ -10,6 +10,7 @@ use cursive::Cursive;
use db::writer;
use failure::{bail, format_err, Error, ResultExt};
use std::collections::BTreeMap;
use std::str::FromStr;
use std::sync::Arc;
use url::Url;
@@ -202,32 +203,44 @@ fn press_edit(siv: &mut Cursive, db: &Arc<db::Database>, id: Option<i32>) {
}
}
fn press_test_inner(url: Url, username: String, password: String) -> Result<String, Error> {
let pass_creds = !username.is_empty();
let (extra_data, _stream) = stream::FFMPEG.open(
fn press_test_inner(
url: Url,
username: String,
password: String,
transport: retina::client::Transport,
) -> Result<String, Error> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.enable_io()
.build()?;
let _guard = rt.enter();
let (extra_data, _stream) = stream::OPENER.open(
&rt,
"test stream".to_owned(),
stream::Source::Rtsp {
url,
username: if pass_creds { Some(username) } else { None },
password: if pass_creds { Some(password) } else { None },
transport: retina::client::Transport::Tcp,
session_group: Default::default(),
},
url,
retina::client::SessionOptions::default()
.creds(if username.is_empty() {
None
} else {
Some(retina::client::Credentials { username, password })
})
.transport(transport),
)?;
Ok(format!(
"{}x{} video stream",
extra_data.entry.width, extra_data.entry.height
extra_data.width, extra_data.height
))
}
fn press_test(siv: &mut Cursive, t: db::StreamType) {
let c = get_camera(siv);
let url = &c.streams[t.index()].url;
let url = match parse_url(url, &["rtsp"]) {
let s = &c.streams[t.index()];
let transport = retina::client::Transport::from_str(s.rtsp_transport).unwrap_or_default();
let url = match parse_url(&s.url, &["rtsp"]) {
Ok(Some(u)) => u,
_ => panic!(
"test button should only be enabled with valid URL, not {:?}",
url
&s.url
),
};
let username = c.username;
@@ -248,7 +261,7 @@ fn press_test(siv: &mut Cursive, t: db::StreamType) {
siv.set_fps(5);
let sink = siv.cb_sink().clone();
::std::thread::spawn(move || {
let r = press_test_inner(url.clone(), username, password);
let r = press_test_inner(url.clone(), username, password, transport);
sink.send(Box::new(move |siv: &mut Cursive| {
// Polling is no longer necessary.
siv.set_fps(0);

View File

@@ -35,13 +35,6 @@ pub struct ConfigFile {
/// Defaults to the number of cores on the system.
#[serde(default)]
pub worker_threads: Option<usize>,
/// RTSP library to use for fetching the cameras' video stream.
/// Moonfire NVR is in the process of switching from `ffmpeg` (used since
/// the beginning of the project) to `retina` (a pure-Rust RTSP library
/// developed by Moonfire NVR's author).
#[serde(default)]
pub rtsp_library: crate::stream::RtspLibrary,
}
/// Per-bind configuration.

View File

@@ -270,7 +270,7 @@ async fn inner(
let streams = l.streams_by_id().len();
let env = streamer::Environment {
db: &db,
opener: config.rtsp_library.opener(),
opener: &crate::stream::OPENER,
shutdown_rx: &shutdown_rx,
};
@@ -302,7 +302,6 @@ async fn inner(
}
// Then start up streams.
let handle = tokio::runtime::Handle::current();
let l = db.lock();
for (i, (id, stream)) in l.streams_by_id().iter().enumerate() {
if stream.config.mode != db::json::STREAM_MODE_RECORD {
@@ -340,14 +339,10 @@ async fn inner(
)?;
info!("Starting streamer for {}", streamer.short_name());
let name = format!("s-{}", streamer.short_name());
let handle = handle.clone();
streamers.push(
thread::Builder::new()
.name(name)
.spawn(move || {
let _enter = handle.enter();
streamer.run();
})
.spawn(move || streamer.run())
.expect("can't create thread"),
);
}
@@ -397,7 +392,9 @@ async fn inner(
let db = db.clone();
move || {
for streamer in streamers.drain(..) {
streamer.join().unwrap();
if streamer.join().is_err() {
log::error!("streamer panicked; look for previous panic message");
}
}
if let Some(mut ss) = syncers {
// The syncers shut down when all channels to them have been dropped.

View File

@@ -19,16 +19,10 @@
//! would be more trouble than it's worth.
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use db::VideoSampleEntryToInsert;
use failure::{bail, format_err, Error};
use std::convert::TryFrom;
// See ISO/IEC 14496-10 table 7-1 - NAL unit type codes, syntax element categories, and NAL unit
// type classes.
const NAL_UNIT_SEQ_PARAMETER_SET: u8 = 7;
const NAL_UNIT_PIC_PARAMETER_SET: u8 = 8;
const NAL_UNIT_TYPE_MASK: u8 = 0x1F; // bottom 5 bits of first byte of unit.
// For certain common sub stream anamorphic resolutions, add a pixel aspect ratio box.
// Assume the camera is 16x9. These are just the standard wide mode; default_pixel_aspect_ratio
// tries the transpose also.
@@ -49,7 +43,7 @@ const PIXEL_ASPECT_RATIOS: [((u16, u16), (u16, u16)); 6] = [
/// doesn't tell us what to do.
///
/// Note that at least in the case of .mp4 muxing, we don't need to fix up the underlying SPS.
/// SPS; PixelAspectRatioBox's definition says that it overrides the H.264-level declaration.
/// PixelAspectRatioBox's definition says that it overrides the H.264-level declaration.
fn default_pixel_aspect_ratio(width: u16, height: u16) -> (u16, u16) {
if width >= height {
PIXEL_ASPECT_RATIOS
@@ -66,257 +60,121 @@ fn default_pixel_aspect_ratio(width: u16, height: u16) -> (u16, u16) {
}
}
/// Decodes a H.264 Annex B byte stream into NAL units. Calls `f` for each NAL unit in the byte
/// stream. Aborts if `f` returns error.
///
/// Note `f` is called with the encoded NAL form, not the RBSP. The NAL header byte and any
/// emulation prevention bytes will be present.
///
/// See ISO/IEC 14496-10 section B.2: Byte stream NAL unit decoding process.
/// This is a relatively simple, unoptimized implementation.
///
/// TODO: detect invalid byte streams. For example, several 0x00s not followed by a 0x01, a stream
/// stream not starting with 0x00 0x00 0x00 0x01, or an empty NAL unit.
fn decode_h264_annex_b<'a, F>(mut data: &'a [u8], mut f: F) -> Result<(), Error>
where
F: FnMut(&'a [u8]) -> Result<(), Error>,
{
let start_code = &b"\x00\x00\x01"[..];
use nom::FindSubstring;
'outer: while let Some(pos) = data.find_substring(start_code) {
let mut unit = &data[0..pos];
data = &data[pos + start_code.len()..];
// Have zero or more bytes that end in a start code. Strip out any trailing 0x00s and
// process the unit if there's anything left.
loop {
match unit.last() {
None => continue 'outer,
Some(b) if *b == 0 => {
unit = &unit[..unit.len() - 1];
}
Some(_) => break,
}
}
f(unit)?;
/// Parses the `AvcDecoderConfigurationRecord` in the "extra data".
pub fn parse_extra_data(extradata: &[u8]) -> Result<VideoSampleEntryToInsert, Error> {
let avcc = h264_reader::avcc::AvcDecoderConfigurationRecord::try_from(extradata)
.map_err(|e| format_err!("Bad AvcDecoderConfigurationRecord: {:?}", e))?;
if avcc.num_of_sequence_parameter_sets() != 1 {
bail!("Multiple SPSs!");
}
// No remaining start codes; likely a unit left.
if !data.is_empty() {
f(data)?;
}
Ok(())
}
/// Parses Annex B extra data, returning a tuple holding the `sps` and `pps` substrings.
fn parse_annex_b_extra_data(data: &[u8]) -> Result<(&[u8], &[u8]), Error> {
let mut sps = None;
let mut pps = None;
decode_h264_annex_b(data, |unit| {
let nal_type = (unit[0] as u8) & NAL_UNIT_TYPE_MASK;
match nal_type {
NAL_UNIT_SEQ_PARAMETER_SET => sps = Some(unit),
NAL_UNIT_PIC_PARAMETER_SET => pps = Some(unit),
_ => bail!("Expected SPS and PPS; got type {}", nal_type),
};
Ok(())
let ctx = avcc
.create_context(())
.map_err(|e| format_err!("Can't load SPS+PPS: {:?}", e))?;
let sps = ctx
.sps_by_id(h264_reader::nal::pps::ParamSetId::from_u32(0).unwrap())
.ok_or_else(|| format_err!("No SPS 0"))?;
let pixel_dimensions = sps
.pixel_dimensions()
.map_err(|e| format_err!("SPS has invalid pixel dimensions: {:?}", e))?;
let width = u16::try_from(pixel_dimensions.0).map_err(|_| {
format_err!(
"bad dimensions {}x{}",
pixel_dimensions.0,
pixel_dimensions.1
)
})?;
match (sps, pps) {
(Some(s), Some(p)) => Ok((s, p)),
_ => bail!("SPS and PPS must be specified"),
}
}
/// Parsed representation of ffmpeg's "extradata".
#[derive(Debug, PartialEq, Eq)]
pub struct ExtraData {
pub entry: db::VideoSampleEntryToInsert,
/// True iff sample data should be transformed from Annex B format to AVC format via a call to
/// `transform_sample_data`. (The assumption is that if the extra data was in Annex B format,
/// the sample data is also.)
pub need_transform: bool,
}
impl ExtraData {
/// Parses "extradata" from ffmpeg. This data may be in either Annex B format or AVC format.
pub fn parse(extradata: &[u8], width: u16, height: u16) -> Result<ExtraData, Error> {
let raw_sps_and_pps;
let need_transform;
let ctx;
let sps_owner;
let sps; // reference to either within ctx or to sps_owner.
if extradata.starts_with(b"\x00\x00\x00\x01") || extradata.starts_with(b"\x00\x00\x01") {
// ffmpeg supplied "extradata" in Annex B format.
let (s, p) = parse_annex_b_extra_data(extradata)?;
let rbsp = h264_reader::rbsp::decode_nal(&s[1..]);
sps_owner = h264_reader::nal::sps::SeqParameterSet::from_bytes(&rbsp)
.map_err(|e| format_err!("Bad SPS: {:?}", e))?;
sps = &sps_owner;
raw_sps_and_pps = Some((s, p));
need_transform = true;
} else {
// Assume "extradata" holds an AVCDecoderConfiguration.
need_transform = false;
raw_sps_and_pps = None;
let avcc = h264_reader::avcc::AvcDecoderConfigurationRecord::try_from(extradata)
.map_err(|e| format_err!("Bad AvcDecoderConfigurationRecord: {:?}", e))?;
if avcc.num_of_sequence_parameter_sets() != 1 {
bail!("Multiple SPSs!");
}
ctx = avcc
.create_context(())
.map_err(|e| format_err!("Can't load SPS+PPS: {:?}", e))?;
sps = ctx
.sps_by_id(h264_reader::nal::pps::ParamSetId::from_u32(0).unwrap())
.ok_or_else(|| format_err!("No SPS 0"))?;
};
let mut sample_entry = Vec::with_capacity(256);
// This is a concatenation of the following boxes/classes.
// SampleEntry, ISO/IEC 14496-12 section 8.5.2.
let avc1_len_pos = sample_entry.len();
// length placeholder + type + reserved + data_reference_index = 1
sample_entry.extend_from_slice(b"\x00\x00\x00\x00avc1\x00\x00\x00\x00\x00\x00\x00\x01");
// VisualSampleEntry, ISO/IEC 14496-12 section 12.1.3.
sample_entry.extend_from_slice(&[0; 16]); // pre-defined + reserved
sample_entry.write_u16::<BigEndian>(width)?;
sample_entry.write_u16::<BigEndian>(height)?;
sample_entry.extend_from_slice(&[
0x00, 0x48, 0x00, 0x00, // horizresolution
0x00, 0x48, 0x00, 0x00, // vertresolution
0x00, 0x00, 0x00, 0x00, // reserved
0x00, 0x01, // frame count
0x00, 0x00, 0x00, 0x00, // compressorname
0x00, 0x00, 0x00, 0x00, //
0x00, 0x00, 0x00, 0x00, //
0x00, 0x00, 0x00, 0x00, //
0x00, 0x00, 0x00, 0x00, //
0x00, 0x00, 0x00, 0x00, //
0x00, 0x00, 0x00, 0x00, //
0x00, 0x00, 0x00, 0x00, //
0x00, 0x18, 0xff, 0xff, // depth + pre_defined
]);
// AVCSampleEntry, ISO/IEC 14496-15 section 5.3.4.1.
// AVCConfigurationBox, ISO/IEC 14496-15 section 5.3.4.1.
let avcc_len_pos = sample_entry.len();
sample_entry.extend_from_slice(b"\x00\x00\x00\x00avcC");
if let Some((sps, pps)) = raw_sps_and_pps {
// Create the AVCDecoderConfiguration, ISO/IEC 14496-15 section 5.2.4.1.
// The beginning of the AVCDecoderConfiguration takes a few values from
// the SPS (ISO/IEC 14496-10 section 7.3.2.1.1). One caveat: that section
// defines the syntax in terms of RBSP, not NAL. The difference is the
// escaping of 00 00 01 and 00 00 02; see notes about
// "emulation_prevention_three_byte" in ISO/IEC 14496-10 section 7.4.
// It looks like 00 is not a valid value of profile_idc, so this distinction
// shouldn't be relevant here. And ffmpeg seems to ignore it.
sample_entry.push(1); // configurationVersion
sample_entry.push(sps[1]); // profile_idc . AVCProfileIndication
sample_entry.push(sps[2]); // ...misc bits... . profile_compatibility
sample_entry.push(sps[3]); // level_idc . AVCLevelIndication
// Hardcode lengthSizeMinusOne to 3, matching TransformSampleData's 4-byte
// lengths.
sample_entry.push(0xff);
// Only support one SPS and PPS.
// ffmpeg's ff_isom_write_avcc has the same limitation, so it's probably
// fine. This next byte is a reserved 0b111 + a 5-bit # of SPSs (1).
sample_entry.push(0xe1);
sample_entry.write_u16::<BigEndian>(u16::try_from(sps.len())?)?;
sample_entry.extend_from_slice(sps);
sample_entry.push(1); // # of PPSs.
sample_entry.write_u16::<BigEndian>(u16::try_from(pps.len())?)?;
sample_entry.extend_from_slice(pps);
} else {
sample_entry.extend_from_slice(extradata);
};
// Fix up avc1 and avcC box lengths.
let cur_pos = sample_entry.len();
BigEndian::write_u32(
&mut sample_entry[avcc_len_pos..avcc_len_pos + 4],
u32::try_from(cur_pos - avcc_len_pos)?,
);
// PixelAspectRatioBox, ISO/IEC 14496-12 section 12.1.4.2.
// Write a PixelAspectRatioBox if necessary, as the sub streams can be be anamorphic.
let pasp = sps
.vui_parameters
.as_ref()
.and_then(|v| v.aspect_ratio_info.as_ref())
.and_then(|a| a.clone().get())
.unwrap_or_else(|| default_pixel_aspect_ratio(width, height));
if pasp != (1, 1) {
sample_entry.extend_from_slice(b"\x00\x00\x00\x10pasp"); // length + box name
sample_entry.write_u32::<BigEndian>(pasp.0.into())?;
sample_entry.write_u32::<BigEndian>(pasp.1.into())?;
}
let cur_pos = sample_entry.len();
BigEndian::write_u32(
&mut sample_entry[avc1_len_pos..avc1_len_pos + 4],
u32::try_from(cur_pos - avc1_len_pos)?,
);
let profile_idc = sample_entry[103];
let constraint_flags = sample_entry[104];
let level_idc = sample_entry[105];
let rfc6381_codec = format!(
"avc1.{:02x}{:02x}{:02x}",
profile_idc, constraint_flags, level_idc
);
Ok(ExtraData {
entry: db::VideoSampleEntryToInsert {
data: sample_entry,
rfc6381_codec,
width,
height,
pasp_h_spacing: pasp.0,
pasp_v_spacing: pasp.1,
},
need_transform,
})
}
}
/// Transforms sample data from Annex B format to AVC format. Should be called on samples iff
/// `ExtraData::need_transform` is true. Uses an out parameter `avc_sample` rather than a return
/// so that memory allocations can be reused from sample to sample.
pub fn transform_sample_data(annexb_sample: &[u8], avc_sample: &mut Vec<u8>) -> Result<(), Error> {
// See AVCParameterSamples, ISO/IEC 14496-15 section 5.3.2.
avc_sample.clear();
// The output will be about as long as the input. Annex B stop codes require at least three
// bytes; many seem to be four. The output lengths are exactly four.
avc_sample.reserve(annexb_sample.len() + 4);
decode_h264_annex_b(annexb_sample, |unit| {
// 4-byte length; this must match ParseExtraData's lengthSizeMinusOne == 3.
avc_sample.write_u32::<BigEndian>(unit.len() as u32)?; // length
avc_sample.extend_from_slice(unit);
Ok(())
let height = u16::try_from(pixel_dimensions.1).map_err(|_| {
format_err!(
"bad dimensions {}x{}",
pixel_dimensions.0,
pixel_dimensions.1
)
})?;
Ok(())
let mut sample_entry = Vec::with_capacity(256);
// This is a concatenation of the following boxes/classes.
// SampleEntry, ISO/IEC 14496-12 section 8.5.2.
let avc1_len_pos = sample_entry.len();
// length placeholder + type + reserved + data_reference_index = 1
sample_entry.extend_from_slice(b"\x00\x00\x00\x00avc1\x00\x00\x00\x00\x00\x00\x00\x01");
// VisualSampleEntry, ISO/IEC 14496-12 section 12.1.3.
sample_entry.extend_from_slice(&[0; 16]); // pre-defined + reserved
sample_entry.write_u16::<BigEndian>(width)?;
sample_entry.write_u16::<BigEndian>(height)?;
sample_entry.extend_from_slice(&[
0x00, 0x48, 0x00, 0x00, // horizresolution
0x00, 0x48, 0x00, 0x00, // vertresolution
0x00, 0x00, 0x00, 0x00, // reserved
0x00, 0x01, // frame count
0x00, 0x00, 0x00, 0x00, // compressorname
0x00, 0x00, 0x00, 0x00, //
0x00, 0x00, 0x00, 0x00, //
0x00, 0x00, 0x00, 0x00, //
0x00, 0x00, 0x00, 0x00, //
0x00, 0x00, 0x00, 0x00, //
0x00, 0x00, 0x00, 0x00, //
0x00, 0x00, 0x00, 0x00, //
0x00, 0x18, 0xff, 0xff, // depth + pre_defined
]);
// AVCSampleEntry, ISO/IEC 14496-15 section 5.3.4.1.
// AVCConfigurationBox, ISO/IEC 14496-15 section 5.3.4.1.
let avcc_len_pos = sample_entry.len();
sample_entry.extend_from_slice(b"\x00\x00\x00\x00avcC");
sample_entry.extend_from_slice(extradata);
// Fix up avc1 and avcC box lengths.
let cur_pos = sample_entry.len();
BigEndian::write_u32(
&mut sample_entry[avcc_len_pos..avcc_len_pos + 4],
u32::try_from(cur_pos - avcc_len_pos)?,
);
// PixelAspectRatioBox, ISO/IEC 14496-12 section 12.1.4.2.
// Write a PixelAspectRatioBox if necessary, as the sub streams can be be anamorphic.
let pasp = sps
.vui_parameters
.as_ref()
.and_then(|v| v.aspect_ratio_info.as_ref())
.and_then(|a| a.clone().get())
.unwrap_or_else(|| default_pixel_aspect_ratio(width, height));
if pasp != (1, 1) {
sample_entry.extend_from_slice(b"\x00\x00\x00\x10pasp"); // length + box name
sample_entry.write_u32::<BigEndian>(pasp.0.into())?;
sample_entry.write_u32::<BigEndian>(pasp.1.into())?;
}
let cur_pos = sample_entry.len();
BigEndian::write_u32(
&mut sample_entry[avc1_len_pos..avc1_len_pos + 4],
u32::try_from(cur_pos - avc1_len_pos)?,
);
let profile_idc = sample_entry[103];
let constraint_flags = sample_entry[104];
let level_idc = sample_entry[105];
let rfc6381_codec = format!(
"avc1.{:02x}{:02x}{:02x}",
profile_idc, constraint_flags, level_idc
);
Ok(VideoSampleEntryToInsert {
data: sample_entry,
rfc6381_codec,
width,
height,
pasp_h_spacing: pasp.0,
pasp_v_spacing: pasp.1,
})
}
#[cfg(test)]
mod tests {
use db::testutil;
#[rustfmt::skip]
const ANNEX_B_TEST_INPUT: [u8; 35] = [
0x00, 0x00, 0x00, 0x01, 0x67, 0x4d, 0x00, 0x1f,
0x9a, 0x66, 0x02, 0x80, 0x2d, 0xff, 0x35, 0x01,
0x01, 0x01, 0x40, 0x00, 0x00, 0xfa, 0x00, 0x00,
0x1d, 0x4c, 0x01, 0x00, 0x00, 0x00, 0x01, 0x68,
0xee, 0x3c, 0x80,
];
#[rustfmt::skip]
const AVC_DECODER_CONFIG_TEST_INPUT: [u8; 38] = [
0x01, 0x4d, 0x00, 0x1f, 0xff, 0xe1, 0x00, 0x17,
@@ -347,79 +205,14 @@ mod tests {
0x68, 0xee, 0x3c, 0x80,
];
#[test]
fn test_decode() {
testutil::init();
let data = &ANNEX_B_TEST_INPUT;
let mut pieces = Vec::new();
super::decode_h264_annex_b(data, |p| {
pieces.push(p);
Ok(())
})
.unwrap();
assert_eq!(&pieces, &[&data[4..27], &data[31..]]);
}
#[test]
fn test_sample_entry_from_avc_decoder_config() {
testutil::init();
let e = super::ExtraData::parse(&AVC_DECODER_CONFIG_TEST_INPUT, 1280, 720).unwrap();
assert_eq!(&e.entry.data[..], &TEST_OUTPUT[..]);
assert_eq!(e.entry.width, 1280);
assert_eq!(e.entry.height, 720);
assert_eq!(e.entry.rfc6381_codec, "avc1.4d001f");
assert_eq!(e.need_transform, false);
}
#[test]
fn test_sample_entry_from_annex_b() {
testutil::init();
let e = super::ExtraData::parse(&ANNEX_B_TEST_INPUT, 1280, 720).unwrap();
assert_eq!(e.entry.width, 1280);
assert_eq!(e.entry.height, 720);
assert_eq!(e.entry.rfc6381_codec, "avc1.4d001f");
assert_eq!(e.need_transform, true);
}
#[test]
fn test_transform_sample_data() {
testutil::init();
#[rustfmt::skip]
const INPUT: [u8; 64] = [
0x00, 0x00, 0x00, 0x01, 0x67, 0x4d, 0x00, 0x1f,
0x9a, 0x66, 0x02, 0x80, 0x2d, 0xff, 0x35, 0x01,
0x01, 0x01, 0x40, 0x00, 0x00, 0xfa, 0x00, 0x00,
0x1d, 0x4c, 0x01,
0x00, 0x00, 0x00, 0x01, 0x68, 0xee, 0x3c, 0x80,
0x00, 0x00, 0x00, 0x01, 0x06, 0x06, 0x01, 0xc4,
0x80,
0x00, 0x00, 0x00, 0x01, 0x65, 0x88, 0x80, 0x10,
0x00, 0x08, 0x7f, 0x00, 0x5d, 0x27, 0xb5, 0xc1,
0xff, 0x8c, 0xd6, 0x35,
// (truncated)
];
#[rustfmt::skip]
const EXPECTED_OUTPUT: [u8; 64] = [
0x00, 0x00, 0x00, 0x17, 0x67, 0x4d, 0x00, 0x1f,
0x9a, 0x66, 0x02, 0x80, 0x2d, 0xff, 0x35, 0x01,
0x01, 0x01, 0x40, 0x00, 0x00, 0xfa, 0x00, 0x00,
0x1d, 0x4c, 0x01,
0x00, 0x00, 0x00, 0x04, 0x68, 0xee, 0x3c, 0x80,
0x00, 0x00, 0x00, 0x05, 0x06, 0x06, 0x01, 0xc4,
0x80,
0x00, 0x00, 0x00, 0x10, 0x65, 0x88, 0x80, 0x10,
0x00, 0x08, 0x7f, 0x00, 0x5d, 0x27, 0xb5, 0xc1,
0xff, 0x8c, 0xd6, 0x35,
];
let mut out = Vec::new();
super::transform_sample_data(&INPUT, &mut out).unwrap();
assert_eq!(&out[..], &EXPECTED_OUTPUT[..]);
let e = super::parse_extra_data(&AVC_DECODER_CONFIG_TEST_INPUT).unwrap();
assert_eq!(&e.data[..], &TEST_OUTPUT[..]);
assert_eq!(e.width, 1280);
assert_eq!(e.height, 720);
assert_eq!(e.rfc6381_codec, "avc1.4d001f");
}
#[test]

View File

@@ -1984,7 +1984,7 @@ impl fmt::Debug for File {
#[cfg(test)]
mod tests {
use super::*;
use crate::stream::{self, Opener};
use crate::stream;
use base::clock::RealClocks;
use byteorder::{BigEndian, ByteOrder};
use db::recording::{self, TIME_UNITS_PER_SEC};
@@ -2278,20 +2278,13 @@ mod tests {
}
fn copy_mp4_to_db(db: &mut TestDb<RealClocks>) {
let (extra_data, mut input) = stream::FFMPEG
.open(
"test".to_owned(),
stream::Source::File("src/testdata/clip.mp4"),
)
.unwrap();
let (extra_data, input) =
stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
let mut input: Box<dyn stream::Stream> = Box::new(input);
// 2015-04-26 00:00:00 UTC.
const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC);
let video_sample_entry_id = db
.db
.lock()
.insert_video_sample_entry(extra_data.entry)
.unwrap();
let video_sample_entry_id = db.db.lock().insert_video_sample_entry(extra_data).unwrap();
let dir = db.dirs_by_stream_id.get(&TEST_STREAM_ID).unwrap();
let mut output = writer::Writer::new(
dir,
@@ -2324,7 +2317,7 @@ mod tests {
output
.write(
&mut db.shutdown_rx,
pkt.data,
&pkt.data[..],
frame_time,
pkt.pts,
pkt.is_key,
@@ -2398,18 +2391,28 @@ mod tests {
}
fn compare_mp4s(new_filename: &str, pts_offset: i64, shorten: i64) {
let (orig_extra_data, mut orig) = stream::FFMPEG
.open(
"test".to_owned(),
stream::Source::File("src/testdata/clip.mp4"),
)
.unwrap();
let (new_extra_data, mut new) = stream::FFMPEG
.open("test".to_owned(), stream::Source::File(new_filename))
.unwrap();
let (orig_extra_data, orig) =
stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
let (new_extra_data, new) = stream::testutil::Mp4Stream::open(new_filename).unwrap();
if pts_offset > 0 {
// The mp4 crate doesn't interpret the edit list. Manually inspect it.
let elst = new.elst().unwrap();
assert_eq!(
&elst.entries,
&[mp4::mp4box::elst::ElstEntry {
segment_duration: new.duration(),
media_time: pts_offset as u64,
media_rate: mp4::FixedPointU16::new(1),
}]
);
}
let mut orig: Box<dyn stream::Stream> = Box::new(orig);
let mut new: Box<dyn stream::Stream> = Box::new(new);
assert_eq!(orig_extra_data, new_extra_data);
let mut final_durations = None;
loop {
for i in 0.. {
let orig_pkt = match orig.next() {
Ok(p) => Some(p),
Err(e) if e.to_string() == "End of file" => None,
@@ -2431,9 +2434,13 @@ mod tests {
(None, None) => break,
(o, n) => panic!("orig: {} new: {}", o.is_some(), n.is_some()),
};
assert_eq!(orig_pkt.pts, new_pkt.pts + pts_offset);
assert_eq!(orig_pkt.data, new_pkt.data);
assert_eq!(orig_pkt.is_key, new_pkt.is_key);
assert_eq!(
orig_pkt.pts, new_pkt.pts, /*+ pts_offset*/
"pkt {} pts",
i
);
assert_eq!(orig_pkt.data, new_pkt.data, "pkt {} data", i);
assert_eq!(orig_pkt.is_key, new_pkt.is_key, "pkt {} key", i);
final_durations = Some((i64::from(orig_pkt.duration), i64::from(new_pkt.duration)));
}

View File

@@ -3,96 +3,37 @@
// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception.
use crate::h264;
use cstr::cstr;
use bytes::Bytes;
use failure::format_err;
use failure::{bail, Error};
use futures::StreamExt;
use lazy_static::lazy_static;
use log::warn;
use retina::client::{Credentials, Transport};
use retina::client::Demuxed;
use retina::codec::{CodecItem, VideoParameters};
use serde::Deserialize;
use std::convert::TryFrom;
use std::ffi::CString;
use std::pin::Pin;
use std::result::Result;
use std::sync::Arc;
use url::Url;
static START_FFMPEG: parking_lot::Once = parking_lot::Once::new();
static RETINA_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
lazy_static! {
pub static ref FFMPEG: Ffmpeg = Ffmpeg::new();
}
#[derive(Copy, Clone, Debug, Deserialize)]
pub enum RtspLibrary {
Ffmpeg,
Retina,
}
impl Default for RtspLibrary {
fn default() -> Self {
RtspLibrary::Retina
}
}
impl std::str::FromStr for RtspLibrary {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"ffmpeg" => RtspLibrary::Ffmpeg,
"retina" => RtspLibrary::Retina,
_ => bail!("unknown RTSP library {:?}", s),
})
}
}
impl RtspLibrary {
pub fn opener(&self) -> &'static dyn Opener {
match self {
RtspLibrary::Ffmpeg => &*FFMPEG,
RtspLibrary::Retina => &RETINA,
}
}
}
#[cfg(test)]
pub enum Source<'a> {
/// A filename, for testing.
File(&'a str),
/// An RTSP stream, for production use.
Rtsp {
url: Url,
username: Option<String>,
password: Option<String>,
transport: Transport,
session_group: Arc<retina::client::SessionGroup>,
},
}
#[cfg(not(test))]
pub enum Source {
/// An RTSP stream, for production use.
Rtsp {
url: Url,
username: Option<String>,
password: Option<String>,
transport: Transport,
session_group: Arc<retina::client::SessionGroup>,
},
}
/// Opens a RTSP stream. This is a trait for test injection.
pub trait Opener: Send + Sync {
fn open(&self, label: String, src: Source)
-> Result<(h264::ExtraData, Box<dyn Stream>), Error>;
/// Opens the given RTSP URL.
///
/// Note: despite the blocking interface, this expects to be called from
/// a tokio runtime with IO and time enabled. Takes the
/// [`tokio::runtime::Runtime`] rather than using
/// `tokio::runtime::Handle::current()` because `Runtime::block_on` can
/// drive IO and timers while `Handle::block_on` can not.
fn open<'a>(
&self,
rt: &'a tokio::runtime::Runtime,
label: String,
url: Url,
options: retina::client::SessionOptions,
) -> Result<(db::VideoSampleEntryToInsert, Box<dyn Stream + 'a>), Error>;
}
pub struct VideoFrame<'a> {
pub struct VideoFrame {
pub pts: i64,
/// An estimate of the duration of the frame, or zero.
@@ -100,308 +41,54 @@ pub struct VideoFrame<'a> {
pub duration: i32,
pub is_key: bool,
pub data: &'a [u8],
pub data: Bytes,
}
pub trait Stream: Send {
fn next(&mut self) -> Result<VideoFrame, Error>;
}
pub struct Ffmpeg {}
pub struct RealOpener;
impl Ffmpeg {
fn new() -> Ffmpeg {
START_FFMPEG.call_once(|| {
ffmpeg::Ffmpeg::new();
});
Ffmpeg {}
}
}
pub const OPENER: RealOpener = RealOpener;
impl Opener for Ffmpeg {
fn open(
impl Opener for RealOpener {
fn open<'a>(
&self,
rt: &'a tokio::runtime::Runtime,
label: String,
src: Source,
) -> Result<(h264::ExtraData, Box<dyn Stream>), Error> {
use ffmpeg::avformat::InputFormatContext;
let mut input = match src {
#[cfg(test)]
Source::File(filename) => {
let mut open_options = ffmpeg::avutil::Dictionary::new();
// Work around https://github.com/scottlamb/moonfire-nvr/issues/10
open_options
.set(cstr!("advanced_editlist"), cstr!("false"))
.unwrap();
let url = format!("file:{}", filename);
let i = InputFormatContext::open(
&CString::new(url.clone()).unwrap(),
&mut open_options,
)?;
if !open_options.empty() {
warn!(
"{}: While opening URL {}, some options were not understood: {}",
&label, url, open_options
);
}
i
}
Source::Rtsp {
url,
username,
password,
transport,
..
} => {
let mut open_options = ffmpeg::avutil::Dictionary::new();
open_options
.set(
cstr!("rtsp_transport"),
match transport {
Transport::Tcp => cstr!("tcp"),
Transport::Udp => cstr!("udp"),
},
)
.unwrap();
open_options
.set(cstr!("user-agent"), cstr!("moonfire-nvr"))
.unwrap();
// 10-second socket timeout, in microseconds.
open_options
.set(cstr!("stimeout"), cstr!("10000000"))
.unwrap();
// Without this option, the first packet has an incorrect pts.
// https://trac.ffmpeg.org/ticket/5018
open_options
.set(cstr!("fflags"), cstr!("nobuffer"))
.unwrap();
// Moonfire NVR currently only supports video, so receiving audio is wasteful.
// It also triggers <https://github.com/scottlamb/moonfire-nvr/issues/36>.
open_options
.set(cstr!("allowed_media_types"), cstr!("video"))
.unwrap();
let mut url_with_credentials = url.clone();
if let Some(u) = username.as_deref() {
url_with_credentials
.set_username(u)
.map_err(|_| format_err!("unable to set username on url {}", url))?;
}
url_with_credentials
.set_password(password.as_deref())
.map_err(|_| format_err!("unable to set password on url {}", url))?;
let i = InputFormatContext::open(
&CString::new(url_with_credentials.as_str())?,
&mut open_options,
)?;
if !open_options.empty() {
warn!(
"{}: While opening URL {}, some options were not understood: {}",
&label, url, open_options
);
}
i
}
};
input.find_stream_info()?;
// Find the video stream.
let mut video_i = None;
{
let s = input.streams();
for i in 0..s.len() {
if s.get(i).codecpar().codec_type().is_video() {
video_i = Some(i);
break;
}
}
}
let video_i = match video_i {
Some(i) => i,
None => bail!("no video stream"),
};
let video = input.streams().get(video_i);
let codec = video.codecpar();
let codec_id = codec.codec_id();
if !codec_id.is_h264() {
bail!("stream's video codec {:?} is not h264", codec_id);
}
let tb = video.time_base();
if tb.num != 1 || tb.den != 90000 {
bail!(
"video stream has timebase {}/{}; expected 1/90000",
tb.num,
tb.den
);
}
let dims = codec.dims();
let extra_data = h264::ExtraData::parse(
codec.extradata(),
u16::try_from(dims.width)?,
u16::try_from(dims.height)?,
)?;
let need_transform = extra_data.need_transform;
let stream = Box::new(FfmpegStream {
input,
video_i,
data: Vec::new(),
need_transform,
});
Ok((extra_data, stream))
}
}
struct FfmpegStream {
input: ffmpeg::avformat::InputFormatContext<'static>,
video_i: usize,
data: Vec<u8>,
need_transform: bool,
}
impl Stream for FfmpegStream {
fn next(&mut self) -> Result<VideoFrame, Error> {
let pkt = loop {
let pkt = self.input.read_frame()?;
if pkt.stream_index() == self.video_i {
break pkt;
}
};
let data = pkt
.data()
.ok_or_else(|| format_err!("packet with no data"))?;
if self.need_transform {
h264::transform_sample_data(data, &mut self.data)?;
} else {
// This copy isn't strictly necessary, but this path is only taken in testing anyway.
self.data.clear();
self.data.extend_from_slice(data);
}
let pts = pkt.pts().ok_or_else(|| format_err!("packet with no pts"))?;
Ok(VideoFrame {
pts,
is_key: pkt.is_key(),
duration: pkt.duration(),
data: &self.data,
})
}
}
pub struct RetinaOpener {}
pub const RETINA: RetinaOpener = RetinaOpener {};
impl Opener for RetinaOpener {
fn open(
&self,
label: String,
src: Source,
) -> Result<(h264::ExtraData, Box<dyn Stream>), Error> {
let (startup_tx, startup_rx) = tokio::sync::oneshot::channel();
let (frame_tx, frame_rx) = tokio::sync::mpsc::channel(1);
let handle = tokio::runtime::Handle::current();
let (url, options) = match src {
#[cfg(test)]
Source::File(_) => bail!("Retina doesn't support .mp4 files"),
Source::Rtsp {
url,
username,
password,
transport,
session_group,
} => (
url,
retina::client::SessionOptions::default()
.creds(match (username, password) {
(None, None) => None,
(Some(username), password) => Some(Credentials {
username,
password: password.unwrap_or_default(),
}),
_ => bail!("must supply username when supplying password"),
})
.transport(transport)
.session_group(session_group)
.user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION"))),
),
};
handle.spawn(async move {
let r = tokio::time::timeout(RETINA_TIMEOUT, RetinaOpener::play(url, options)).await;
let (mut session, video_params, first_frame) =
match r.unwrap_or_else(|_| Err(format_err!("timeout opening stream"))) {
Err(e) => {
let _ = startup_tx.send(Err(e));
return;
}
Ok((s, p, f)) => (s, p, f),
};
if startup_tx.send(Ok(video_params)).is_err() {
return;
}
if frame_tx.send(Ok(first_frame)).await.is_err() {
return;
}
// Read following frames.
let mut deadline = tokio::time::Instant::now() + RETINA_TIMEOUT;
loop {
match tokio::time::timeout_at(deadline, session.next()).await {
Err(_) => {
let _ = frame_tx
.send(Err(format_err!("timeout getting next frame")))
.await;
return;
}
Ok(Some(Err(e))) => {
let _ = frame_tx.send(Err(e.into())).await;
return;
}
Ok(None) => break,
Ok(Some(Ok(CodecItem::VideoFrame(v)))) => {
if let Some(p) = v.new_parameters {
// TODO: we could start a new recording without dropping the connection.
let _ = frame_tx.send(Err(format_err!("parameter; change: {:?}", p)));
return;
}
deadline = tokio::time::Instant::now() + RETINA_TIMEOUT;
if v.loss > 0 {
log::warn!(
"{}: lost {} RTP packets @ {}",
&label,
v.loss,
v.start_ctx()
);
}
if frame_tx.send(Ok(v)).await.is_err() {
return; // other end died.
}
}
Ok(Some(Ok(_))) => {}
}
}
});
let video_params = handle.block_on(startup_rx)??;
let dims = video_params.pixel_dimensions();
let extra_data = h264::ExtraData::parse(
video_params.extra_data(),
u16::try_from(dims.0)?,
u16::try_from(dims.1)?,
)?;
url: Url,
options: retina::client::SessionOptions,
) -> Result<(db::VideoSampleEntryToInsert, Box<dyn Stream + 'a>), Error> {
let options = options.user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION")));
let (session, video_params, first_frame) = rt.block_on(tokio::time::timeout(
RETINA_TIMEOUT,
RetinaStream::play(url, options),
))??;
let extra_data = h264::parse_extra_data(video_params.extra_data())?;
let stream = Box::new(RetinaStream {
frame_rx,
frame: None,
rt,
label,
session,
first_frame: Some(first_frame),
});
Ok((extra_data, stream))
}
}
impl RetinaOpener {
struct RetinaStream<'a> {
rt: &'a tokio::runtime::Runtime,
label: String,
session: Pin<Box<Demuxed>>,
/// The first frame, if not yet returned from `next`.
///
/// This frame is special because we sometimes need to fetch it as part of getting the video
/// parameters.
first_frame: Option<retina::codec::VideoFrame>,
}
impl<'a> RetinaStream<'a> {
/// Plays to first frame. No timeout; that's the caller's responsibility.
async fn play(
url: Url,
@@ -459,27 +146,119 @@ impl RetinaOpener {
first_frame,
))
}
/// Fetches a non-initial frame.
async fn fetch_next_frame(
label: &str,
mut session: Pin<&mut Demuxed>,
) -> Result<retina::codec::VideoFrame, Error> {
loop {
match session.next().await.transpose()? {
None => bail!("end of stream"),
Some(CodecItem::VideoFrame(v)) => {
if let Some(p) = v.new_parameters {
// TODO: we could start a new recording without dropping the connection.
bail!("parameter change: {:?}", p);
}
if v.loss > 0 {
log::warn!(
"{}: lost {} RTP packets @ {}",
&label,
v.loss,
v.start_ctx()
);
}
return Ok(v);
}
Some(_) => {}
}
}
}
}
struct RetinaStream {
frame_rx: tokio::sync::mpsc::Receiver<Result<retina::codec::VideoFrame, Error>>,
frame: Option<retina::codec::VideoFrame>,
}
impl Stream for RetinaStream {
impl<'a> Stream for RetinaStream<'a> {
fn next(&mut self) -> Result<VideoFrame, Error> {
// TODO: use Option::insert after bumping MSRV to 1.53.
self.frame = Some(
self.frame_rx
.blocking_recv()
.ok_or_else(|| format_err!("stream ended"))??,
);
let frame = self.frame.as_ref().unwrap();
let frame = self.first_frame.take().map(Ok).unwrap_or_else(|| {
self.rt
.block_on(tokio::time::timeout(
RETINA_TIMEOUT,
RetinaStream::fetch_next_frame(&self.label, self.session.as_mut()),
))
.map_err(|_| format_err!("timeout getting next frame"))?
})?;
Ok(VideoFrame {
pts: frame.timestamp.elapsed(),
duration: 0,
is_key: frame.is_random_access_point,
data: &frame.data()[..],
data: frame.into_data(),
})
}
}
#[cfg(test)]
pub mod testutil {
use super::*;
use std::convert::TryFrom;
use std::io::Cursor;
pub struct Mp4Stream {
reader: mp4::Mp4Reader<Cursor<Vec<u8>>>,
h264_track_id: u32,
next_sample_id: u32,
}
impl Mp4Stream {
/// Opens a stream, with a return matching that expected by [`Opener`].
pub fn open(path: &str) -> Result<(db::VideoSampleEntryToInsert, Self), Error> {
let f = std::fs::read(path)?;
let len = f.len();
let reader = mp4::Mp4Reader::read_header(Cursor::new(f), u64::try_from(len)?)?;
let h264_track = match reader
.tracks()
.values()
.find(|t| matches!(t.media_type(), Ok(mp4::MediaType::H264)))
{
None => bail!("expected a H.264 track"),
Some(t) => t,
};
let extra_data = h264::parse_extra_data(&h264_track.extra_data()?[..])?;
let h264_track_id = h264_track.track_id();
let stream = Mp4Stream {
reader,
h264_track_id,
next_sample_id: 1,
};
Ok((extra_data, stream))
}
pub fn duration(&self) -> u64 {
self.reader.moov.mvhd.duration
}
/// Returns the edit list from the H.264 stream, if any.
pub fn elst(&self) -> Option<&mp4::mp4box::elst::ElstBox> {
let h264_track = self.reader.tracks().get(&self.h264_track_id).unwrap();
h264_track
.trak
.edts
.as_ref()
.and_then(|edts| edts.elst.as_ref())
}
}
impl Stream for Mp4Stream {
fn next(&mut self) -> Result<VideoFrame, Error> {
let sample = self
.reader
.read_sample(self.h264_track_id, self.next_sample_id)?
.ok_or_else(|| format_err!("End of file"))?;
self.next_sample_id += 1;
Ok(VideoFrame {
pts: sample.start_time as i64,
duration: sample.duration as i32,
is_key: sample.is_sync,
data: sample.bytes,
})
}
}
}

View File

@@ -110,11 +110,15 @@ where
}
/// Runs the streamer; blocks.
/// Note that when using Retina as the RTSP library, this must be called
/// within a tokio runtime context; see [tokio::runtime::Handle].
pub fn run(&mut self) {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.unwrap();
let _guard = rt.enter();
while self.shutdown_rx.check().is_ok() {
if let Err(e) = self.run_once() {
if let Err(e) = self.run_once(&rt) {
let sleep_time = time::Duration::seconds(1);
warn!(
"{}: sleeping for {} after error: {}",
@@ -128,7 +132,7 @@ where
info!("{}: shutting down", self.short_name);
}
fn run_once(&mut self) -> Result<(), Error> {
fn run_once(&mut self, rt: &tokio::runtime::Runtime) -> Result<(), Error> {
info!("{}: Opening input: {}", self.short_name, self.url.as_str());
let clocks = self.db.clocks();
@@ -142,7 +146,7 @@ where
max_expires.saturating_duration_since(tokio::time::Instant::now()),
status.num_sessions
);
tokio::runtime::Handle::current().block_on(async {
rt.block_on(async {
tokio::select! {
_ = self.session_group.await_stale_sessions(&status) => Ok(()),
_ = self.shutdown_rx.as_future() => Err(base::shutdown::ShutdownError),
@@ -160,28 +164,26 @@ where
let (extra_data, mut stream) = {
let _t = TimerGuard::new(&clocks, || format!("opening {}", self.url.as_str()));
self.opener.open(
rt,
self.short_name.clone(),
stream::Source::Rtsp {
url: self.url.clone(),
username: if self.username.is_empty() {
self.url.clone(),
retina::client::SessionOptions::default()
.creds(if self.username.is_empty() {
None
} else {
Some(self.username.clone())
},
password: if self.password.is_empty() {
None
} else {
Some(self.password.clone())
},
transport: self.transport,
session_group: self.session_group.clone(),
},
Some(retina::client::Credentials {
username: self.username.clone(),
password: self.password.clone(),
})
})
.transport(self.transport)
.session_group(self.session_group.clone()),
)?
};
let realtime_offset = self.db.clocks().realtime() - clocks.monotonic();
let video_sample_entry_id = {
let _t = TimerGuard::new(&clocks, || "inserting video sample entry");
self.db.lock().insert_video_sample_entry(extra_data.entry)?
self.db.lock().insert_video_sample_entry(extra_data)?
};
let mut seen_key_frame = false;
@@ -253,7 +255,7 @@ where
let _t = TimerGuard::new(&clocks, || format!("writing {} bytes", pkt.data.len()));
w.write(
&mut self.shutdown_rx,
pkt.data,
&pkt.data[..],
local_time,
pkt.pts,
pkt.is_key,
@@ -270,8 +272,7 @@ where
#[cfg(test)]
mod tests {
use crate::h264;
use crate::stream::{self, Opener, Stream};
use crate::stream::{self, Stream};
use base::clock::{self, Clocks};
use db::{recording, testutil, CompositeId};
use failure::{bail, Error};
@@ -353,20 +354,19 @@ mod tests {
struct MockOpener {
expected_url: url::Url,
streams: Mutex<Vec<(h264::ExtraData, Box<dyn stream::Stream>)>>,
streams: Mutex<Vec<(db::VideoSampleEntryToInsert, Box<dyn stream::Stream>)>>,
shutdown_tx: Mutex<Option<base::shutdown::Sender>>,
}
impl stream::Opener for MockOpener {
fn open(
&self,
_rt: &tokio::runtime::Runtime,
_label: String,
src: stream::Source,
) -> Result<(h264::ExtraData, Box<dyn stream::Stream>), Error> {
match src {
stream::Source::Rtsp { url, .. } => assert_eq!(&url, &self.expected_url),
stream::Source::File(_) => panic!("expected rtsp url"),
};
url: url::Url,
_options: retina::client::SessionOptions,
) -> Result<(db::VideoSampleEntryToInsert, Box<dyn stream::Stream>), Error> {
assert_eq!(&url, &self.expected_url);
let mut l = self.streams.lock();
match l.pop() {
Some(stream) => {
@@ -412,13 +412,10 @@ mod tests {
let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0));
clocks.sleep(time::Duration::seconds(86400)); // to 2015-04-26 00:00:00 UTC
let (extra_data, stream) = stream::FFMPEG
.open(
"test".to_owned(),
stream::Source::File("src/testdata/clip.mp4"),
)
.unwrap();
let mut stream = ProxyingStream::new(clocks.clone(), time::Duration::seconds(2), stream);
let (extra_data, stream) =
stream::testutil::Mp4Stream::open("src/testdata/clip.mp4").unwrap();
let mut stream =
ProxyingStream::new(clocks.clone(), time::Duration::seconds(2), Box::new(stream));
stream.ts_offset = 123456; // starting pts of the input should be irrelevant
stream.ts_offset_pkts_left = u32::max_value();
stream.pkts_left = u32::max_value();