use my own ffmpeg crate

This significantly improves safety of the ffmpeg interface. The complex
ABIs aren't accessed directly from Rust. Instead, I have a small C
wrapper which uses the ffmpeg C API and the C headers at compile-time to
determine the proper ABI in the same way any C program using ffmpeg
would, so that the ABI doesn't have to be duplicated in Rust code.
I've tested with ffmpeg 2.x and ffmpeg 3.x; it seems to work properly
with both where before ffmpeg 3.x caused segfaults.

It still depends on ABI compatibility between the compiled and running
versions. C programs need this, too, and normal shared library
versioning practices provide this guarantee. But log both versions on
startup for diagnosing problems with this.

Fixes #7
This commit is contained in:
Scott Lamb
2017-09-20 21:06:06 -07:00
parent 8ff1d0dcb8
commit 857a66f29c
14 changed files with 768 additions and 170 deletions

View File

@@ -182,8 +182,6 @@ impl SampleFileDir {
write!(&mut buf[..36], "{}", uuid.hyphenated()).expect("can't format uuid to pathname buf");
// libc::c_char seems to be i8 on some platforms (Linux/arm) and u8 on others (Linux/amd64).
// Transmute, suppressing the warning that happens on platforms in which it's already u8.
#[allow(useless_transmute)]
unsafe { mem::transmute::<[u8; 37], [libc::c_char; 37]>(buf) }
}

View File

@@ -34,8 +34,8 @@ extern crate uuid;
use core::ops::Deref;
use core::num;
use ffmpeg;
use openssl::error::ErrorStack;
use moonfire_ffmpeg;
use serde_json;
use std::boxed::Box;
use std::convert::From;
@@ -127,9 +127,9 @@ impl From<serde_json::Error> for Error {
}
}
impl From<ffmpeg::Error> for Error {
fn from(err: ffmpeg::Error) -> Self {
Error{description: format!("{} ({})", err.description(), err), cause: Some(Box::new(err))}
impl From<moonfire_ffmpeg::Error> for Error {
fn from(err: moonfire_ffmpeg::Error) -> Self {
Error{description: format!("ffmpeg: {}", err), cause: Some(Box::new(err))}
}
}

View File

@@ -33,8 +33,6 @@
extern crate byteorder;
extern crate core;
extern crate docopt;
#[macro_use] extern crate ffmpeg;
extern crate ffmpeg_sys;
extern crate futures;
extern crate fnv;
extern crate http_entity;
@@ -47,6 +45,7 @@ extern crate reffers;
extern crate rusqlite;
extern crate memmap;
extern crate mime;
extern crate moonfire_ffmpeg;
extern crate mylog;
extern crate openssl;
extern crate parking_lot;

View File

@@ -83,7 +83,7 @@ use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use db;
use dir;
use error::Error;
use futures::{Future, Stream};
use futures::Stream;
use futures::stream;
use http_entity;
use hyper::header;
@@ -1242,8 +1242,8 @@ mod tests {
use byteorder::{BigEndian, ByteOrder};
use db;
use dir;
use futures::Future;
use futures::Stream as FuturesStream;
use ffmpeg;
use hyper::header;
use openssl::hash;
use recording::{self, TIME_UNITS_PER_SEC};
@@ -1475,14 +1475,14 @@ mod tests {
loop {
let pkt = match input.get_next() {
Ok(p) => p,
Err(ffmpeg::Error::Eof) => { break; },
Err(e) if e.is_eof() => { break; },
Err(e) => { panic!("unexpected input error: {}", e); },
};
let pts = pkt.pts().unwrap();
frame_time += recording::Duration(pkt.duration());
frame_time += recording::Duration(pkt.duration() as i64);
output.write(pkt.data().expect("packet without data"), frame_time, pts,
pkt.is_key()).unwrap();
end_pts = Some(pts + pkt.duration());
end_pts = Some(pts + pkt.duration() as i64);
}
output.close(end_pts).unwrap();
db.syncer_channel.flush();
@@ -1528,12 +1528,12 @@ mod tests {
loop {
let orig_pkt = match orig.get_next() {
Ok(p) => Some(p),
Err(ffmpeg::Error::Eof) => None,
Err(e) if e.is_eof() => None,
Err(e) => { panic!("unexpected input error: {}", e); },
};
let new_pkt = match new.get_next() {
Ok(p) => Some(p),
Err(ffmpeg::Error::Eof) => { break; },
Err(e) if e.is_eof() => { break; },
Err(e) => { panic!("unexpected input error: {}", e); },
};
let (orig_pkt, new_pkt) = match (orig_pkt, new_pkt) {
@@ -1545,7 +1545,7 @@ mod tests {
assert_eq!(orig_pkt.dts(), new_pkt.dts() + pts_offset);
assert_eq!(orig_pkt.data(), new_pkt.data());
assert_eq!(orig_pkt.is_key(), new_pkt.is_key());
final_durations = Some((orig_pkt.duration(), new_pkt.duration()));
final_durations = Some((orig_pkt.duration() as i64, new_pkt.duration() as i64));
}
if let Some((orig_dur, new_dur)) = final_durations {

View File

@@ -29,14 +29,10 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use error::Error;
use ffmpeg::{self, format, media};
use ffmpeg_sys::{self, AVLockOp};
use h264;
use libc::{self, c_int, c_void};
use std::mem;
use std::ptr;
use moonfire_ffmpeg;
use std::ffi::{CStr, CString};
use std::result::Result;
use std::slice;
use std::sync;
static START: sync::Once = sync::ONCE_INIT;
@@ -52,41 +48,13 @@ pub enum Source<'a> {
Rtsp(&'a str), // url, for production use.
}
// TODO: I think this should be provided by ffmpeg-sys. Otherwise, ffmpeg-sys is thread-hostile,
// which I believe is not allowed at all in Rust. (Also, this method's signature should include
// unsafe.)
extern "C" fn lock_callback(untyped_ptr: *mut *mut c_void, op: AVLockOp) -> c_int {
unsafe {
let ptr = mem::transmute::<*mut *mut c_void, *mut *mut libc::pthread_mutex_t>(untyped_ptr);
match op {
AVLockOp::AV_LOCK_CREATE => {
let m = Box::<libc::pthread_mutex_t>::new(mem::uninitialized());
*ptr = Box::into_raw(m);
libc::pthread_mutex_init(*ptr, ptr::null());
},
AVLockOp::AV_LOCK_DESTROY => {
libc::pthread_mutex_destroy(*ptr);
Box::from_raw(*ptr); // delete.
*ptr = ptr::null_mut();
},
AVLockOp::AV_LOCK_OBTAIN => {
libc::pthread_mutex_lock(*ptr);
},
AVLockOp::AV_LOCK_RELEASE => {
libc::pthread_mutex_unlock(*ptr);
},
};
};
0
}
pub trait Opener<S : Stream> : Sync {
fn open(&self, src: Source) -> Result<S, Error>;
}
pub trait Stream {
fn get_extra_data(&self) -> Result<h264::ExtraData, Error>;
fn get_next(&mut self) -> Result<ffmpeg::Packet, ffmpeg::Error>;
fn get_next<'p>(&'p mut self) -> Result<moonfire_ffmpeg::Packet<'p>, moonfire_ffmpeg::Error>;
}
pub struct Ffmpeg {}
@@ -94,41 +62,58 @@ pub struct Ffmpeg {}
impl Ffmpeg {
fn new() -> Ffmpeg {
START.call_once(|| {
unsafe { ffmpeg_sys::av_lockmgr_register(lock_callback); };
ffmpeg::init().unwrap();
ffmpeg::format::network::init();
moonfire_ffmpeg::Ffmpeg::new();
//ffmpeg::init().unwrap();
//ffmpeg::format::network::init();
});
Ffmpeg{}
}
}
macro_rules! c_str {
($s:expr) => { {
unsafe { CStr::from_ptr(concat!($s, "\0").as_ptr() as *const i8) }
} }
}
impl Opener<FfmpegStream> for Ffmpeg {
fn open(&self, src: Source) -> Result<FfmpegStream, Error> {
let (input, discard_first) = match src {
use moonfire_ffmpeg::InputFormatContext;
let (mut input, discard_first) = match src {
#[cfg(test)]
Source::File(filename) =>
(format::input_with(&format!("file:{}", filename), ffmpeg::Dictionary::new())?,
(InputFormatContext::open(&CString::new(format!("file:{}", filename)).unwrap(),
&mut moonfire_ffmpeg::Dictionary::new())?,
false),
Source::Rtsp(url) => {
let open_options = dict![
"rtsp_transport" => "tcp",
// https://trac.ffmpeg.org/ticket/5018 workaround attempt.
"probesize" => "262144",
"user-agent" => "moonfire-nvr",
// 10-second socket timeout, in microseconds.
"stimeout" => "10000000"
];
(format::input_with(&url, open_options)?, true)
let mut open_options = moonfire_ffmpeg::Dictionary::new();
open_options.set(c_str!("rtsp_transport"), c_str!("tcp")).unwrap();
// https://trac.ffmpeg.org/ticket/5018 workaround attempt.
open_options.set(c_str!("probesize"), c_str!("262144")).unwrap();
open_options.set(c_str!("user-agent"), c_str!("moonfire-nvr")).unwrap();
// 10-second socket timeout, in microseconds.
open_options.set(c_str!("stimeout"), c_str!("10000000")).unwrap();
let i = InputFormatContext::open(&CString::new(url).unwrap(), &mut open_options)?;
if !open_options.empty() {
warn!("While opening URL {}, some options were not understood: {}",
url, open_options);
}
(i, true)
},
};
input.find_stream_info()?;
// Find the video stream.
let mut video_i = None;
for (i, stream) in input.streams().enumerate() {
if stream.codec().medium() == media::Type::Video {
debug!("Video stream index is {}", i);
video_i = Some(i);
break;
{
let s = input.streams();
for i in 0 .. s.len() {
if s.get(i).codec().codec_type().is_video() {
debug!("Video stream index is {}", i);
video_i = Some(i);
break;
}
}
}
let video_i = match video_i {
@@ -137,8 +122,8 @@ impl Opener<FfmpegStream> for Ffmpeg {
};
let mut stream = FfmpegStream{
input: input,
video_i: video_i,
input,
video_i,
};
if discard_first {
@@ -151,30 +136,31 @@ impl Opener<FfmpegStream> for Ffmpeg {
}
pub struct FfmpegStream {
input: format::context::Input,
input: moonfire_ffmpeg::InputFormatContext,
video_i: usize,
}
impl Stream for FfmpegStream {
fn get_extra_data(&self) -> Result<h264::ExtraData, Error> {
let video = self.input.stream(self.video_i).expect("can't get video stream known to exist");
let video = self.input.streams().get(self.video_i);
let tb = video.time_base();
if tb.num != 1 || tb.den != 90000 {
return Err(Error::new(format!("video stream has timebase {}/{}; expected 1/90000",
tb.num, tb.den)));
}
let codec = video.codec();
let (extradata, width, height) = unsafe {
let ptr = codec.as_ptr();
(slice::from_raw_parts((*ptr).extradata, (*ptr).extradata_size as usize),
(*ptr).width as u16,
(*ptr).height as u16)
};
// TODO: verify video stream is h264.
h264::ExtraData::parse(extradata, width, height)
let codec_id = codec.codec_id();
if !codec_id.is_h264() {
return Err(Error::new(format!("stream's video codec {:?} is not h264", codec_id)));
}
h264::ExtraData::parse(codec.extradata(), codec.width() as u16, codec.height() as u16)
}
fn get_next(&mut self) -> Result<ffmpeg::Packet, ffmpeg::Error> {
let mut pkt = ffmpeg::Packet::empty();
fn get_next<'i>(&'i mut self) -> Result<moonfire_ffmpeg::Packet<'i>, moonfire_ffmpeg::Error> {
loop {
pkt.read(&mut self.input)?;
if pkt.stream() == self.video_i {
return Ok(pkt);
let p = self.input.read_frame()?;
if p.stream_index() == self.video_i {
return Ok(p);
}
}
}

View File

@@ -113,7 +113,6 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
let mut stream = self.opener.open(stream::Source::Rtsp(&self.url))?;
let realtime_offset = self.clocks.realtime() - self.clocks.monotonic();
// TODO: verify time base.
// TODO: verify width/height.
let extra_data = stream.get_extra_data()?;
let video_sample_entry_id =
@@ -190,9 +189,8 @@ mod tests {
use clock::{self, Clocks};
use db;
use error::Error;
use ffmpeg;
use ffmpeg::packet::Mut;
use h264;
use moonfire_ffmpeg;
use recording;
use std::cmp;
use std::sync::{Arc, Mutex};
@@ -228,9 +226,9 @@ mod tests {
}
impl<'a> Stream for ProxyingStream<'a> {
fn get_next(&mut self) -> Result<ffmpeg::Packet, ffmpeg::Error> {
fn get_next(&mut self) -> Result<moonfire_ffmpeg::Packet, moonfire_ffmpeg::Error> {
if self.pkts_left == 0 {
return Err(ffmpeg::Error::Eof);
return Err(moonfire_ffmpeg::Error::eof());
}
self.pkts_left -= 1;
@@ -240,7 +238,7 @@ mod tests {
// Avoid accumulating conversion error by tracking the total amount to sleep and how
// much we've already slept, rather than considering each frame in isolation.
{
let goal = pkt.pts().unwrap() + pkt.duration();
let goal = pkt.pts().unwrap() + pkt.duration() as i64;
let goal = time::Duration::nanoseconds(
goal * 1_000_000_000 / recording::TIME_UNITS_PER_SEC);
let duration = goal - self.slept;
@@ -254,15 +252,12 @@ mod tests {
self.ts_offset_pkts_left -= 1;
let old_pts = pkt.pts().unwrap();
let old_dts = pkt.dts();
unsafe {
let pkt = pkt.as_mut_ptr();
(*pkt).pts = old_pts + self.ts_offset;
(*pkt).dts = old_dts + self.ts_offset;
pkt.set_pts(Some(old_pts + self.ts_offset));
pkt.set_dts(old_dts + self.ts_offset);
// In a real rtsp stream, the duration of a packet is not known until the
// next packet. ffmpeg's duration is an unreliable estimate.
(*pkt).duration = recording::TIME_UNITS_PER_SEC as i32;
}
// In a real rtsp stream, the duration of a packet is not known until the
// next packet. ffmpeg's duration is an unreliable estimate.
pkt.set_duration(recording::TIME_UNITS_PER_SEC as i32);
}
Ok(pkt)