add a basic test of Streamer, fix it

This test is copied from the C++ implementation. It ensures the timestamps are
calculated accurately from the pts rather than using ffmpeg's estimated
duration. The Rust implementation was doing the easy-but-inaccurate thing, so
fix that to make the test pass.

Additionally, I did this with a code structure that should ensure the Rust
code never drops a Writer without indicating to the syncer that its uuid is
abandoned. Such a bug essentially leaks the partially-written file, although a
restart would cause it to be properly unlinked and marked as such. There are
no tests (yet) that exercise this scenario, though.
This commit is contained in:
Scott Lamb 2016-12-06 18:41:44 -08:00
parent 3332f817c0
commit 8df0eae567
7 changed files with 522 additions and 151 deletions

81
src/clock.rs Normal file
View File

@ -0,0 +1,81 @@
// This file is part of Moonfire NVR, a security camera digital video recorder.
// Copyright (C) 2016 Scott Lamb <slamb@slamb.org>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// In addition, as a special exception, the copyright holders give
// permission to link the code of portions of this program with the
// OpenSSL library under certain conditions as described in each
// individual source file, and distribute linked combinations including
// the two.
//
// You must obey the GNU General Public License in all respects for all
// of the code used other than OpenSSL. If you modify file(s) with this
// exception, you may extend this exception to your version of the
// file(s), but you are not obligated to do so. If you do not wish to do
// so, delete this exception statement from your version. If you delete
// this exception statement from all source files in the program, then
// also delete it here.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//! Clock interface and implementations for testability.
#[cfg(test)] use std::sync::Mutex;
use std::thread;
use time;
/// Abstract interface to the system clock. This is for testability.
pub trait Clock : Sync {
/// Gets the current time.
fn get_time(&self) -> time::Timespec;
/// Causes the current thread to sleep for the specified time.
fn sleep(&self, how_long: time::Duration);
}
/// Singleton "real" clock.
pub static REAL: RealClock = RealClock {};
/// Real clock; see static `REAL` instance.
pub struct RealClock {}
impl Clock for RealClock {
fn get_time(&self) -> time::Timespec { time::get_time() }
fn sleep(&self, how_long: time::Duration) {
match how_long.to_std() {
Ok(d) => thread::sleep(d),
Err(e) => warn!("Invalid duration {:?}: {}", how_long, e),
};
}
}
/// Simulated clock for testing.
#[cfg(test)]
pub struct SimulatedClock(Mutex<time::Timespec>);
#[cfg(test)]
impl SimulatedClock {
pub fn new() -> SimulatedClock { SimulatedClock(Mutex::new(time::Timespec::new(0, 0))) }
}
#[cfg(test)]
impl Clock for SimulatedClock {
fn get_time(&self) -> time::Timespec { *self.0.lock().unwrap() }
/// Advances the clock by the specified amount without actually sleeping.
fn sleep(&self, how_long: time::Duration) {
let mut l = self.0.lock().unwrap();
*l = *l + how_long;
}
}

View File

@ -30,13 +30,13 @@
//! Sample file directory management.
//!
//! This includes opening files for serving, rotating away old
//! files, and syncing new files to disk.
//! This includes opening files for serving, rotating away old files, and saving new files.
use db;
use libc;
use recording;
use error::Error;
use openssl::crypto::hash;
use std::ffi;
use std::fs;
use std::io::{self, Write};
@ -120,9 +120,9 @@ impl SampleFileDir {
/// Note this doesn't wait for previous rotation to complete; it's assumed the sample file
/// directory has sufficient space for a couple recordings per camera in addition to the
/// cameras' total `retain_bytes`.
pub fn create_writer(&self, start: recording::Time, local_start: recording::Time,
camera_id: i32, video_sample_entry_id: i32)
-> Result<recording::Writer, Error> {
pub fn create_writer<'a>(&self, channel: &'a SyncerChannel, start: recording::Time,
local_start: recording::Time, camera_id: i32,
video_sample_entry_id: i32) -> Result<Writer<'a>, Error> {
// Grab the next uuid. Typically one is cached—a sync has usually completed since the last
// writer was created, and syncs ensure `next_uuid` is filled while performing their
// transaction. But if not, perform an extra database transaction to reserve a new one.
@ -145,7 +145,7 @@ impl SampleFileDir {
return Err(e.into());
},
};
recording::Writer::open(f, uuid, start, local_start, camera_id, video_sample_entry_id)
Writer::open(f, uuid, start, local_start, camera_id, video_sample_entry_id, channel)
}
/// Opens a sample file within this directory with the given flags and (if creating) mode.
@ -197,7 +197,8 @@ struct SharedMutableState {
/// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct.
enum SyncerCommand {
AsyncSaveWriter(db::RecordingToInsert, fs::File),
AsyncSaveRecording(db::RecordingToInsert, fs::File),
AsyncAbandonRecording(Uuid),
#[cfg(test)]
Flush(mpsc::SyncSender<()>),
@ -242,10 +243,12 @@ pub fn start_syncer(dir: Arc<SampleFileDir>)
impl SyncerChannel {
/// Asynchronously syncs the given writer, closes it, records it into the database, and
/// starts rotation.
pub fn async_save_writer(&self, w: recording::Writer) -> Result<(), Error> {
let (recording, f) = w.close()?;
self.0.send(SyncerCommand::AsyncSaveWriter(recording, f)).unwrap();
Ok(())
fn async_save_recording(&self, recording: db::RecordingToInsert, f: fs::File) {
self.0.send(SyncerCommand::AsyncSaveRecording(recording, f)).unwrap();
}
fn async_abandon_recording(&self, uuid: Uuid) {
self.0.send(SyncerCommand::AsyncAbandonRecording(uuid)).unwrap();
}
/// For testing: flushes the syncer, waiting for all currently-queued commands to complete.
@ -262,8 +265,8 @@ impl SyncerState {
loop {
match self.cmds.recv() {
Err(_) => return, // all senders have closed the channel; shutdown
Ok(SyncerCommand::AsyncSaveWriter(recording, f)) => self.save_writer(recording, f),
Ok(SyncerCommand::AsyncSaveRecording(recording, f)) => self.save(recording, f),
Ok(SyncerCommand::AsyncAbandonRecording(uuid)) => self.abandon(uuid),
#[cfg(test)]
Ok(SyncerCommand::Flush(_)) => {}, // just drop the supplied sender, closing it.
};
@ -301,11 +304,11 @@ impl SyncerState {
Ok(())
}
/// Saves the given writer and causes rotation to happen.
/// Saves the given recording and causes rotation to happen.
/// Note that part of rotation is deferred for the next cycle (saved writing or program startup)
/// so that there can be only one dir sync and database transaction per save.
fn save_writer(&mut self, recording: db::RecordingToInsert, f: fs::File) {
if let Err(e) = self.save_writer_helper(&recording, f) {
fn save(&mut self, recording: db::RecordingToInsert, f: fs::File) {
if let Err(e) = self.save_helper(&recording, f) {
error!("camera {}: will discard recording {} due to error while saving: {}",
recording.camera_id, recording.sample_file_uuid, e);
self.to_unlink.push(recording.sample_file_uuid);
@ -313,9 +316,14 @@ impl SyncerState {
}
}
/// Internal helper for `save_writer`. This is separated out so that the question-mark operator
fn abandon(&mut self, uuid: Uuid) {
self.to_unlink.push(uuid);
self.try_unlink();
}
/// Internal helper for `save`. This is separated out so that the question-mark operator
/// can be used in the many error paths.
fn save_writer_helper(&mut self, recording: &db::RecordingToInsert, f: fs::File)
fn save_helper(&mut self, recording: &db::RecordingToInsert, f: fs::File)
-> Result<(), Error> {
self.try_unlink();
if !self.to_unlink.is_empty() {
@ -398,3 +406,143 @@ impl SyncerState {
});
}
}
/// Single-use struct to write a single recording to disk and commit its metadata to the database.
/// Use `SampleFileDir::create_writer` to create a new writer. `Writer` hands off its state to the
/// syncer when done. It either saves the recording to the database (if I/O errors do not prevent
/// this) or marks it as abandoned so that the syncer will attempt to unlink the file.
pub struct Writer<'a>(Option<InnerWriter<'a>>);
/// The state associated with a `Writer`. The indirection is for the `Drop` trait; `close` moves
/// `f` and `index.video_index` out of the `InnerWriter`, which is not allowed on a struct with
/// a `Drop` trait. To avoid this problem, the real state is surrounded by an `Option`. The
/// `Option` should none only after close is called, and thus never in a way visible to callers.
struct InnerWriter<'a> {
syncer_channel: &'a SyncerChannel,
f: fs::File,
index: recording::SampleIndexEncoder,
uuid: Uuid,
corrupt: bool,
hasher: hash::Hasher,
start_time: recording::Time,
local_time: recording::Time,
camera_id: i32,
video_sample_entry_id: i32,
/// A sample which has been written to disk but not added to `index`. Index writes are one
/// sample behind disk writes because the duration of a sample is the difference between its
/// pts and the next sample's pts. A sample is flushed when the next sample is written, when
/// the writer is closed cleanly (the caller supplies the next pts), or when the writer is
/// closed uncleanly (with a zero duration, which the `.mp4` format allows only at the end).
unflushed_sample: Option<UnflushedSample>,
}
struct UnflushedSample {
pts_90k: i64,
len: i32,
is_key: bool,
}
impl<'a> Writer<'a> {
/// Opens the writer; for use by `SampleFileDir` (which should supply `f`).
fn open(f: fs::File, uuid: Uuid, start_time: recording::Time, local_time: recording::Time,
camera_id: i32, video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel)
-> Result<Self, Error> {
Ok(Writer(Some(InnerWriter{
syncer_channel: syncer_channel,
f: f,
index: recording::SampleIndexEncoder::new(),
uuid: uuid,
corrupt: false,
hasher: hash::Hasher::new(hash::Type::SHA1)?,
start_time: start_time,
local_time: local_time,
camera_id: camera_id,
video_sample_entry_id: video_sample_entry_id,
unflushed_sample: None,
})))
}
pub fn write(&mut self, pkt: &[u8], pts_90k: i64, is_key: bool) -> Result<(), Error> {
let w = self.0.as_mut().unwrap();
if let Some(unflushed) = w.unflushed_sample.take() {
let duration = (pts_90k - unflushed.pts_90k) as i32;
w.index.add_sample(duration, unflushed.len, unflushed.is_key);
}
let mut remaining = pkt;
while !remaining.is_empty() {
let written = match w.f.write(remaining) {
Ok(b) => b,
Err(e) => {
if remaining.len() < pkt.len() {
// Partially written packet. Truncate if possible.
if let Err(e2) = w.f.set_len(w.index.sample_file_bytes as u64) {
error!("After write to {} failed with {}, truncate failed with {}; \
sample file is corrupt.", w.uuid.hyphenated(), e, e2);
w.corrupt = true;
}
}
return Err(Error::from(e));
},
};
remaining = &remaining[written..];
}
w.unflushed_sample = Some(UnflushedSample{
pts_90k: pts_90k,
len: pkt.len() as i32,
is_key: is_key});
w.hasher.update(pkt)?;
Ok(())
}
/// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's
/// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait,
/// swallowing errors and using a zero duration for the last sample.
pub fn close(mut self, next_pts: Option<i64>) -> Result<recording::Time, Error> {
self.0.take().unwrap().close(next_pts)
}
}
impl<'a> InnerWriter<'a> {
fn close(mut self, next_pts: Option<i64>) -> Result<recording::Time, Error> {
if self.corrupt {
self.syncer_channel.async_abandon_recording(self.uuid);
return Err(Error::new(format!("recording {} is corrupt", self.uuid)));
}
if let Some(unflushed) = self.unflushed_sample.take() {
let duration = match next_pts {
None => 0,
Some(p) => (p - unflushed.pts_90k) as i32,
};
self.index.add_sample(duration, unflushed.len, unflushed.is_key);
}
let mut sha1_bytes = [0u8; 20];
sha1_bytes.copy_from_slice(&self.hasher.finish()?[..]);
let end = self.start_time + recording::Duration(self.index.total_duration_90k as i64);
let recording = db::RecordingToInsert{
camera_id: self.camera_id,
sample_file_bytes: self.index.sample_file_bytes,
time: self.start_time .. end,
local_time: self.local_time,
video_samples: self.index.video_samples,
video_sync_samples: self.index.video_sync_samples,
video_sample_entry_id: self.video_sample_entry_id,
sample_file_uuid: self.uuid,
video_index: self.index.video_index,
sample_file_sha1: sha1_bytes,
};
self.syncer_channel.async_save_recording(recording, self.f);
Ok(end)
}
}
impl<'a> Drop for Writer<'a> {
fn drop(&mut self) {
if let Some(w) = self.0.take() {
// Swallow any error. The caller should only drop the Writer without calling close()
// if there's already been an error. The caller should report that. No point in
// complaining again.
let _ = w.close(None);
}
}
}

View File

@ -71,6 +71,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
mod clock;
mod db;
mod dir;
mod error;
@ -157,11 +158,18 @@ fn main() {
let (syncer_channel, syncer_join) = dir::start_syncer(dir.clone()).unwrap();
let l = db.lock();
let cameras = l.cameras_by_id().len();
let env = streamer::Environment{
db: &db,
dir: &dir,
clock: &clock::REAL,
opener: &*stream::FFMPEG,
shutdown: &shutdown,
};
for (i, (id, camera)) in l.cameras_by_id().iter().enumerate() {
let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / cameras as i64;
let mut streamer = streamer::Streamer::new(
db.clone(), dir.clone(), syncer_channel.clone(), shutdown.clone(), *id, camera,
rotate_offset_sec);
let mut streamer = streamer::Streamer::new(&env, syncer_channel.clone(), *id, camera,
rotate_offset_sec,
streamer::ROTATE_INTERVAL_SEC);
let name = format!("stream-{}", streamer.short_name());
streamers.push(thread::Builder::new().name(name).spawn(move|| {
streamer.run();

View File

@ -1200,7 +1200,7 @@ mod tests {
use std::sync::Arc;
use std::str;
use super::*;
use stream::StreamSource;
use stream::{self, Opener, Stream};
use testutil::{self, TestDb};
use uuid::Uuid;
@ -1397,25 +1397,34 @@ mod tests {
}
fn copy_mp4_to_db(db: &TestDb) {
let mut input = StreamSource::File("src/testdata/clip.mp4").open().unwrap();
let mut input =
stream::FFMPEG.open(stream::Source::File("src/testdata/clip.mp4")).unwrap();
// 2015-04-26 00:00:00 UTC.
const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC);
let extra_data = input.get_extra_data().unwrap();
let video_sample_entry_id = db.db.lock().insert_video_sample_entry(
extra_data.width, extra_data.height, &extra_data.sample_entry).unwrap();
let mut output = db.dir.create_writer(START_TIME, START_TIME, TEST_CAMERA_ID,
video_sample_entry_id).unwrap();
let mut output = db.dir.create_writer(&db.syncer_channel, START_TIME, START_TIME,
TEST_CAMERA_ID, video_sample_entry_id).unwrap();
// end_pts is the pts of the end of the most recent frame (start + duration).
// It's needed because dir::Writer calculates a packet's duration from its pts and the
// next packet's pts. That's more accurate for RTSP than ffmpeg's estimate of duration.
// To write the final packet of this sample .mp4 with a full duration, we need to fake a
// next packet's pts from the ffmpeg-supplied duration.
let mut end_pts = None;
loop {
let pkt = match input.get_next() {
Ok(p) => p,
Err(ffmpeg::Error::Eof) => { break; },
Err(e) => { panic!("unexpected input error: {}", e); },
};
output.write(pkt.data().expect("packet without data"), pkt.duration() as i32,
output.write(pkt.data().expect("packet without data"), pkt.pts().unwrap(),
pkt.is_key()).unwrap();
end_pts = Some(pkt.pts().unwrap() + pkt.duration());
}
db.syncer_channel.async_save_writer(output).unwrap();
output.close(end_pts).unwrap();
db.syncer_channel.flush();
}
@ -1475,8 +1484,8 @@ mod tests {
}
fn compare_mp4s(new_filename: &str, pts_offset: i64, shorten: i64) {
let mut orig = StreamSource::File("src/testdata/clip.mp4").open().unwrap();
let mut new = StreamSource::File(new_filename).open().unwrap();
let mut orig = stream::FFMPEG.open(stream::Source::File("src/testdata/clip.mp4")).unwrap();
let mut new = stream::FFMPEG.open(stream::Source::File(new_filename)).unwrap();
assert_eq!(orig.get_extra_data().unwrap(), new.get_extra_data().unwrap());
let mut final_durations = None;
loop {

View File

@ -35,15 +35,11 @@ extern crate uuid;
use db;
use std::ops;
use error::Error;
use openssl::crypto::hash;
use std::fmt;
use std::fs;
use std::io::Write;
use std::ops::Range;
use std::string::String;
use std::sync::MutexGuard;
use time;
use uuid::Uuid;
pub const TIME_UNITS_PER_SEC: i64 = 90000;
pub const DESIRED_RECORDING_DURATION: i64 = 60 * TIME_UNITS_PER_SEC;
@ -170,18 +166,6 @@ pub struct SampleIndexEncoder {
pub video_index: Vec<u8>,
}
pub struct Writer {
f: fs::File,
index: SampleIndexEncoder,
uuid: Uuid,
corrupt: bool,
hasher: hash::Hasher,
start_time: Time,
local_time: Time,
camera_id: i32,
video_sample_entry_id: i32,
}
/// Zigzag-encodes a signed integer, as in [protocol buffer
/// encoding](https://developers.google.com/protocol-buffers/docs/encoding#types). Uses the low bit
/// to indicate signedness (1 = negative, 0 = non-negative).
@ -355,72 +339,6 @@ impl SampleIndexEncoder {
}
}
impl Writer {
pub fn open(f: fs::File, uuid: Uuid, start_time: Time, local_time: Time,
camera_id: i32, video_sample_entry_id: i32) -> Result<Self, Error> {
Ok(Writer{
f: f,
index: SampleIndexEncoder::new(),
uuid: uuid,
corrupt: false,
hasher: hash::Hasher::new(hash::Type::SHA1)?,
start_time: start_time,
local_time: local_time,
camera_id: camera_id,
video_sample_entry_id: video_sample_entry_id,
})
}
pub fn write(&mut self, pkt: &[u8], duration_90k: i32, is_key: bool) -> Result<(), Error> {
let mut remaining = pkt;
while !remaining.is_empty() {
let written = match self.f.write(remaining) {
Ok(b) => b,
Err(e) => {
if remaining.len() < pkt.len() {
// Partially written packet. Truncate if possible.
if let Err(e2) = self.f.set_len(self.index.sample_file_bytes as u64) {
error!("After write to {} failed with {}, truncate failed with {}; \
sample file is corrupt.", self.uuid.hyphenated(), e, e2);
self.corrupt = true;
}
}
return Err(Error::from(e));
},
};
remaining = &remaining[written..];
}
self.index.add_sample(duration_90k, pkt.len() as i32, is_key);
self.hasher.update(pkt)?;
Ok(())
}
pub fn end(&self) -> Time {
self.start_time + Duration(self.index.total_duration_90k as i64)
}
// TODO: clean up this interface.
pub fn close(mut self) -> Result<(db::RecordingToInsert, fs::File), Error> {
if self.corrupt {
return Err(Error::new(format!("recording {} is corrupt", self.uuid)));
}
let mut sha1_bytes = [0u8; 20];
sha1_bytes.copy_from_slice(&self.hasher.finish()?[..]);
Ok((db::RecordingToInsert{
camera_id: self.camera_id,
sample_file_bytes: self.index.sample_file_bytes,
time: self.start_time .. self.end(),
local_time: self.local_time,
video_samples: self.index.video_samples,
video_sync_samples: self.index.video_sync_samples,
video_sample_entry_id: self.video_sample_entry_id,
sample_file_uuid: self.uuid,
video_index: self.index.video_index,
sample_file_sha1: sha1_bytes,
}, self.f))
}
}
/// A segment represents a view of some or all of a single recording, starting from a key frame.
/// Used by the `Mp4FileBuilder` class to splice together recordings into a single virtual .mp4.
pub struct Segment {

View File

@ -41,7 +41,11 @@ use std::sync;
static START: sync::Once = sync::ONCE_INIT;
pub enum StreamSource<'a> {
lazy_static! {
pub static ref FFMPEG: Ffmpeg = Ffmpeg::new();
}
pub enum Source<'a> {
#[cfg(test)]
File(&'a str), // filename, for testing.
@ -76,21 +80,36 @@ extern "C" fn lock_callback(untyped_ptr: *mut *mut c_void, op: AVLockOp) -> c_in
0
}
impl<'a> StreamSource<'a> {
pub fn open(&self) -> Result<Stream, Error> {
pub trait Opener<S : Stream> : Sync {
fn open(&self, src: Source) -> Result<S, Error>;
}
pub trait Stream {
fn get_extra_data(&self) -> Result<h264::ExtraData, Error>;
fn get_next(&mut self) -> Result<ffmpeg::Packet, ffmpeg::Error>;
}
pub struct Ffmpeg {}
impl Ffmpeg {
fn new() -> Ffmpeg {
START.call_once(|| {
unsafe { ffmpeg_sys::av_lockmgr_register(lock_callback); };
ffmpeg::init().unwrap();
ffmpeg::format::network::init();
});
Ffmpeg{}
}
}
let (input, discard_first) = match *self {
impl Opener<FfmpegStream> for Ffmpeg {
fn open(&self, src: Source) -> Result<FfmpegStream, Error> {
let (input, discard_first) = match src {
#[cfg(test)]
StreamSource::File(filename) =>
Source::File(filename) =>
(format::input_with(&format!("file:{}", filename), ffmpeg::Dictionary::new())?,
false),
StreamSource::Rtsp(url) => {
Source::Rtsp(url) => {
let open_options = dict![
"rtsp_transport" => "tcp",
// https://trac.ffmpeg.org/ticket/5018 workaround attempt.
@ -117,7 +136,7 @@ impl<'a> StreamSource<'a> {
None => { return Err(Error::new("no video stream".to_owned())) },
};
let mut stream = Stream{
let mut stream = FfmpegStream{
input: input,
video_i: video_i,
};
@ -131,13 +150,13 @@ impl<'a> StreamSource<'a> {
}
}
pub struct Stream {
pub struct FfmpegStream {
input: format::context::Input,
video_i: usize,
}
impl Stream {
pub fn get_extra_data(&self) -> Result<h264::ExtraData, Error> {
impl Stream for FfmpegStream {
fn get_extra_data(&self) -> Result<h264::ExtraData, Error> {
let video = self.input.stream(self.video_i).expect("can't get video stream known to exist");
let codec = video.codec();
let (extradata, width, height) = unsafe {
@ -150,7 +169,7 @@ impl Stream {
h264::ExtraData::parse(extradata, width, height)
}
pub fn get_next(&mut self) -> Result<ffmpeg::Packet, ffmpeg::Error> {
fn get_next(&mut self) -> Result<ffmpeg::Packet, ffmpeg::Error> {
let mut pkt = ffmpeg::Packet::empty();
loop {
pkt.read(&mut self.input)?;

View File

@ -28,6 +28,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use clock::Clock;
use db::{Camera, Database};
use dir;
use error::Error;
@ -36,37 +37,50 @@ use recording;
use std::result::Result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use stream::StreamSource;
use stream;
use time;
pub static ROTATE_INTERVAL_SEC: i64 = 60;
pub struct Streamer {
/// Common state that can be used by multiple `Streamer` instances.
pub struct Environment<'a, 'b, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
pub clock: &'a C,
pub opener: &'a stream::Opener<S>,
pub db: &'b Arc<Database>,
pub dir: &'b Arc<dir::SampleFileDir>,
pub shutdown: &'b Arc<AtomicBool>,
}
pub struct Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
shutdown: Arc<AtomicBool>,
// State below is only used by the thread in Run.
rotate_offset_sec: i64,
rotate_interval_sec: i64,
db: Arc<Database>,
dir: Arc<dir::SampleFileDir>,
syncer_channel: dir::SyncerChannel,
clock: &'a C,
opener: &'a stream::Opener<S>,
camera_id: i32,
short_name: String,
url: String,
redacted_url: String,
}
impl Streamer {
pub fn new(db: Arc<Database>, dir: Arc<dir::SampleFileDir>, syncer_channel: dir::SyncerChannel,
shutdown: Arc<AtomicBool>, camera_id: i32, c: &Camera, rotate_offset_sec: i64)
-> Self {
impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
pub fn new<'b>(env: &Environment<'a, 'b, C, S>, syncer_channel: dir::SyncerChannel,
camera_id: i32, c: &Camera, rotate_offset_sec: i64,
rotate_interval_sec: i64) -> Self {
Streamer{
shutdown: shutdown,
shutdown: env.shutdown.clone(),
rotate_offset_sec: rotate_offset_sec,
db: db,
dir: dir,
rotate_interval_sec: rotate_interval_sec,
db: env.db.clone(),
dir: env.dir.clone(),
syncer_channel: syncer_channel,
clock: env.clock,
opener: env.opener,
camera_id: camera_id,
short_name: c.short_name.to_owned(),
url: format!("rtsp://{}:{}@{}{}", c.username, c.password, c.host, c.main_rtsp_path),
@ -79,9 +93,9 @@ impl Streamer {
pub fn run(&mut self) {
while !self.shutdown.load(Ordering::SeqCst) {
if let Err(e) = self.run_once() {
let sleep_time = Duration::from_secs(1);
let sleep_time = time::Duration::seconds(1);
warn!("{}: sleeping for {:?} after error: {}", self.short_name, sleep_time, e);
thread::sleep(sleep_time);
self.clock.sleep(sleep_time);
}
}
info!("{}: shutting down", self.short_name);
@ -90,8 +104,7 @@ impl Streamer {
fn run_once(&mut self) -> Result<(), Error> {
info!("{}: Opening input: {}", self.short_name, self.redacted_url);
// TODO: mockability?
let mut stream = StreamSource::Rtsp(&self.url).open()?;
let mut stream = self.opener.open(stream::Source::Rtsp(&self.url))?;
// TODO: verify time base.
// TODO: verify width/height.
let extra_data = stream.get_extra_data()?;
@ -101,39 +114,38 @@ impl Streamer {
debug!("{}: video_sample_entry_id={}", self.short_name, video_sample_entry_id);
let mut seen_key_frame = false;
let mut rotate = None;
let mut writer: Option<recording::Writer> = None;
let mut writer: Option<dir::Writer> = None;
let mut transformed = Vec::new();
let mut next_start = None;
while !self.shutdown.load(Ordering::SeqCst) {
let pkt = stream.get_next()?;
let pts = pkt.pts().ok_or_else(|| Error::new("packet with no pts".to_owned()))?;
if !seen_key_frame && !pkt.is_key() {
continue;
} else if !seen_key_frame {
debug!("{}: have first key frame", self.short_name);
seen_key_frame = true;
}
let frame_realtime = time::get_time();
let frame_realtime = self.clock.get_time();
if let Some(r) = rotate {
if frame_realtime.sec > r && pkt.is_key() {
let w = writer.take().expect("rotate set implies writer is set");
next_start = Some(w.end());
// TODO: restore this log message.
// info!("{}: wrote {}: [{}, {})", self.short_name, r.sample_file_uuid,
// r.time.start, r.time.end);
self.syncer_channel.async_save_writer(w)?;
trace!("{}: write on normal rotation", self.short_name);
next_start = Some(w.close(Some(pts))?);
}
};
let mut w = match writer {
Some(w) => w,
None => {
let r = frame_realtime.sec -
(frame_realtime.sec % ROTATE_INTERVAL_SEC) +
(frame_realtime.sec % self.rotate_interval_sec) +
self.rotate_offset_sec;
rotate = Some(
if r <= frame_realtime.sec { r + ROTATE_INTERVAL_SEC } else { r });
if r <= frame_realtime.sec { r + self.rotate_interval_sec } else { r });
let local_realtime = recording::Time::new(frame_realtime);
self.dir.create_writer(next_start.unwrap_or(local_realtime), local_realtime,
self.dir.create_writer(&self.syncer_channel,
next_start.unwrap_or(local_realtime), local_realtime,
self.camera_id, video_sample_entry_id)?
},
};
@ -147,12 +159,188 @@ impl Streamer {
} else {
orig_data
};
w.write(transformed_data, pkt.duration() as i32, pkt.is_key())?;
w.write(transformed_data, pts, pkt.is_key())?;
writer = Some(w);
}
if let Some(w) = writer {
self.syncer_channel.async_save_writer(w)?;
w.close(None)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use clock::{self, Clock};
use db;
use error::Error;
use ffmpeg;
use ffmpeg::packet::Mut;
use h264;
use recording;
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::atomic::{AtomicBool, Ordering};
use stream::{self, Opener, Stream};
use testutil;
use time;
struct ProxyingStream<'a> {
clock: &'a clock::SimulatedClock,
inner: stream::FfmpegStream,
last_duration: time::Duration,
ts_offset: i64,
ts_offset_pkts_left: u32,
pkts_left: u32,
}
impl<'a> ProxyingStream<'a> {
fn new(clock: &'a clock::SimulatedClock, inner: stream::FfmpegStream) -> ProxyingStream {
ProxyingStream {
clock: clock,
inner: inner,
last_duration: time::Duration::seconds(0),
ts_offset: 0,
ts_offset_pkts_left: 0,
pkts_left: 0,
}
}
}
impl<'a> Stream for ProxyingStream<'a> {
fn get_next(&mut self) -> Result<ffmpeg::Packet, ffmpeg::Error> {
if self.pkts_left == 0 {
return Err(ffmpeg::Error::Eof);
}
self.pkts_left -= 1;
// Advance clock to when this packet starts.
self.clock.sleep(self.last_duration);
let mut pkt = self.inner.get_next()?;
self.last_duration = time::Duration::nanoseconds(
pkt.duration() * 1_000_000_000 / recording::TIME_UNITS_PER_SEC);
if self.ts_offset_pkts_left > 0 {
self.ts_offset_pkts_left -= 1;
let old_pts = pkt.pts().unwrap();
let old_dts = pkt.dts();
unsafe {
let pkt = pkt.as_mut_ptr();
(*pkt).pts = old_pts + self.ts_offset;
(*pkt).dts = old_dts + self.ts_offset;
// In a real rtsp stream, the duration of a packet is not known until the
// next packet. ffmpeg's duration is an unreliable estimate.
(*pkt).duration = recording::TIME_UNITS_PER_SEC as i32;
}
}
Ok(pkt)
}
fn get_extra_data(&self) -> Result<h264::ExtraData, Error> { self.inner.get_extra_data() }
}
struct MockOpener<'a> {
expected_url: String,
streams: Mutex<Vec<ProxyingStream<'a>>>,
shutdown: Arc<AtomicBool>,
}
impl<'a> stream::Opener<ProxyingStream<'a>> for MockOpener<'a> {
fn open(&self, src: stream::Source) -> Result<ProxyingStream<'a>, Error> {
match src {
stream::Source::Rtsp(url) => assert_eq!(url, &self.expected_url),
stream::Source::File(_) => panic!("expected rtsp url"),
};
let mut l = self.streams.lock().unwrap();
match l.pop() {
Some(stream) => {
trace!("MockOpener returning next stream");
Ok(stream)
},
None => {
trace!("MockOpener shutting down");
self.shutdown.store(true, Ordering::SeqCst);
Err(Error::new("done".to_owned()))
},
}
}
}
#[derive(Debug, Eq, PartialEq)]
struct Frame {
start_90k: i32,
duration_90k: i32,
is_key: bool,
}
fn get_frames(db: &MutexGuard<db::LockedDatabase>, recording_id: i64) -> Vec<Frame> {
let rec = db.get_recording(recording_id).unwrap();
let mut it = recording::SampleIndexIterator::new();
let mut frames = Vec::new();
while it.next(&rec.video_index).unwrap() {
frames.push(Frame{
start_90k: it.start_90k,
duration_90k: it.duration_90k,
is_key: it.is_key,
});
}
frames
}
#[test]
fn basic() {
testutil::init();
let clock = clock::SimulatedClock::new();
clock.sleep(time::Duration::seconds(1430006400)); // 2015-04-26 00:00:00 UTC
let stream = stream::FFMPEG.open(stream::Source::File("src/testdata/clip.mp4")).unwrap();
let mut stream = ProxyingStream::new(&clock, stream);
stream.ts_offset = 180000; // starting pts of the input should be irrelevant
stream.ts_offset_pkts_left = u32::max_value();
stream.pkts_left = u32::max_value();
let opener = MockOpener{
expected_url: "rtsp://foo:bar@test-camera/main".to_owned(),
streams: Mutex::new(vec![stream]),
shutdown: Arc::new(AtomicBool::new(false)),
};
let db = testutil::TestDb::new();
let env = super::Environment{
clock: &clock,
opener: &opener,
db: &db.db,
dir: &db.dir,
shutdown: &opener.shutdown,
};
let mut stream;
{
let l = db.db.lock();
let camera = l.cameras_by_id().get(&testutil::TEST_CAMERA_ID).unwrap();
stream = super::Streamer::new(&env, db.syncer_channel.clone(), testutil::TEST_CAMERA_ID,
camera, 0, 5);
}
stream.run();
assert!(opener.streams.lock().unwrap().is_empty());
db.syncer_channel.flush();
let db = db.db.lock();
// Compare frame-by-frame. Note below that while the rotation is scheduled to happen near
// 5-second boundaries (such as 2016-04-26 00:00:05), it gets deferred until the next key
// frame, which in this case is 00:00:07.
assert_eq!(get_frames(&db, 1), &[
Frame{start_90k: 0, duration_90k: 90379, is_key: true},
Frame{start_90k: 90379, duration_90k: 89884, is_key: false},
Frame{start_90k: 180263, duration_90k: 89749, is_key: false},
Frame{start_90k: 270012, duration_90k: 89981, is_key: false},
Frame{start_90k: 359993, duration_90k: 90055, is_key: true},
Frame{start_90k: 450048, duration_90k: 89967, is_key: false}, // pts_time 5.0005333...
Frame{start_90k: 540015, duration_90k: 90021, is_key: false},
Frame{start_90k: 630036, duration_90k: 89958, is_key: false},
]);
assert_eq!(get_frames(&db, 2), &[
Frame{start_90k: 0, duration_90k: 90011, is_key: true},
Frame{start_90k: 90011, duration_90k: 0, is_key: false},
]);
}
}