diff --git a/src/clock.rs b/src/clock.rs new file mode 100644 index 0000000..f871d8d --- /dev/null +++ b/src/clock.rs @@ -0,0 +1,81 @@ +// This file is part of Moonfire NVR, a security camera digital video recorder. +// Copyright (C) 2016 Scott Lamb +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// In addition, as a special exception, the copyright holders give +// permission to link the code of portions of this program with the +// OpenSSL library under certain conditions as described in each +// individual source file, and distribute linked combinations including +// the two. +// +// You must obey the GNU General Public License in all respects for all +// of the code used other than OpenSSL. If you modify file(s) with this +// exception, you may extend this exception to your version of the +// file(s), but you are not obligated to do so. If you do not wish to do +// so, delete this exception statement from your version. If you delete +// this exception statement from all source files in the program, then +// also delete it here. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Clock interface and implementations for testability. + +#[cfg(test)] use std::sync::Mutex; +use std::thread; +use time; + +/// Abstract interface to the system clock. This is for testability. +pub trait Clock : Sync { + /// Gets the current time. + fn get_time(&self) -> time::Timespec; + + /// Causes the current thread to sleep for the specified time. + fn sleep(&self, how_long: time::Duration); +} + +/// Singleton "real" clock. +pub static REAL: RealClock = RealClock {}; + +/// Real clock; see static `REAL` instance. +pub struct RealClock {} + +impl Clock for RealClock { + fn get_time(&self) -> time::Timespec { time::get_time() } + + fn sleep(&self, how_long: time::Duration) { + match how_long.to_std() { + Ok(d) => thread::sleep(d), + Err(e) => warn!("Invalid duration {:?}: {}", how_long, e), + }; + } +} + +/// Simulated clock for testing. +#[cfg(test)] +pub struct SimulatedClock(Mutex); + +#[cfg(test)] +impl SimulatedClock { + pub fn new() -> SimulatedClock { SimulatedClock(Mutex::new(time::Timespec::new(0, 0))) } +} + +#[cfg(test)] +impl Clock for SimulatedClock { + fn get_time(&self) -> time::Timespec { *self.0.lock().unwrap() } + + /// Advances the clock by the specified amount without actually sleeping. + fn sleep(&self, how_long: time::Duration) { + let mut l = self.0.lock().unwrap(); + *l = *l + how_long; + } +} diff --git a/src/dir.rs b/src/dir.rs index e27f1bf..e5bc9c5 100644 --- a/src/dir.rs +++ b/src/dir.rs @@ -30,13 +30,13 @@ //! Sample file directory management. //! -//! This includes opening files for serving, rotating away old -//! files, and syncing new files to disk. +//! This includes opening files for serving, rotating away old files, and saving new files. use db; use libc; use recording; use error::Error; +use openssl::crypto::hash; use std::ffi; use std::fs; use std::io::{self, Write}; @@ -120,9 +120,9 @@ impl SampleFileDir { /// Note this doesn't wait for previous rotation to complete; it's assumed the sample file /// directory has sufficient space for a couple recordings per camera in addition to the /// cameras' total `retain_bytes`. - pub fn create_writer(&self, start: recording::Time, local_start: recording::Time, - camera_id: i32, video_sample_entry_id: i32) - -> Result { + pub fn create_writer<'a>(&self, channel: &'a SyncerChannel, start: recording::Time, + local_start: recording::Time, camera_id: i32, + video_sample_entry_id: i32) -> Result, Error> { // Grab the next uuid. Typically one is cached—a sync has usually completed since the last // writer was created, and syncs ensure `next_uuid` is filled while performing their // transaction. But if not, perform an extra database transaction to reserve a new one. @@ -145,7 +145,7 @@ impl SampleFileDir { return Err(e.into()); }, }; - recording::Writer::open(f, uuid, start, local_start, camera_id, video_sample_entry_id) + Writer::open(f, uuid, start, local_start, camera_id, video_sample_entry_id, channel) } /// Opens a sample file within this directory with the given flags and (if creating) mode. @@ -197,7 +197,8 @@ struct SharedMutableState { /// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct. enum SyncerCommand { - AsyncSaveWriter(db::RecordingToInsert, fs::File), + AsyncSaveRecording(db::RecordingToInsert, fs::File), + AsyncAbandonRecording(Uuid), #[cfg(test)] Flush(mpsc::SyncSender<()>), @@ -242,10 +243,12 @@ pub fn start_syncer(dir: Arc) impl SyncerChannel { /// Asynchronously syncs the given writer, closes it, records it into the database, and /// starts rotation. - pub fn async_save_writer(&self, w: recording::Writer) -> Result<(), Error> { - let (recording, f) = w.close()?; - self.0.send(SyncerCommand::AsyncSaveWriter(recording, f)).unwrap(); - Ok(()) + fn async_save_recording(&self, recording: db::RecordingToInsert, f: fs::File) { + self.0.send(SyncerCommand::AsyncSaveRecording(recording, f)).unwrap(); + } + + fn async_abandon_recording(&self, uuid: Uuid) { + self.0.send(SyncerCommand::AsyncAbandonRecording(uuid)).unwrap(); } /// For testing: flushes the syncer, waiting for all currently-queued commands to complete. @@ -262,8 +265,8 @@ impl SyncerState { loop { match self.cmds.recv() { Err(_) => return, // all senders have closed the channel; shutdown - Ok(SyncerCommand::AsyncSaveWriter(recording, f)) => self.save_writer(recording, f), - + Ok(SyncerCommand::AsyncSaveRecording(recording, f)) => self.save(recording, f), + Ok(SyncerCommand::AsyncAbandonRecording(uuid)) => self.abandon(uuid), #[cfg(test)] Ok(SyncerCommand::Flush(_)) => {}, // just drop the supplied sender, closing it. }; @@ -301,11 +304,11 @@ impl SyncerState { Ok(()) } - /// Saves the given writer and causes rotation to happen. + /// Saves the given recording and causes rotation to happen. /// Note that part of rotation is deferred for the next cycle (saved writing or program startup) /// so that there can be only one dir sync and database transaction per save. - fn save_writer(&mut self, recording: db::RecordingToInsert, f: fs::File) { - if let Err(e) = self.save_writer_helper(&recording, f) { + fn save(&mut self, recording: db::RecordingToInsert, f: fs::File) { + if let Err(e) = self.save_helper(&recording, f) { error!("camera {}: will discard recording {} due to error while saving: {}", recording.camera_id, recording.sample_file_uuid, e); self.to_unlink.push(recording.sample_file_uuid); @@ -313,10 +316,15 @@ impl SyncerState { } } - /// Internal helper for `save_writer`. This is separated out so that the question-mark operator + fn abandon(&mut self, uuid: Uuid) { + self.to_unlink.push(uuid); + self.try_unlink(); + } + + /// Internal helper for `save`. This is separated out so that the question-mark operator /// can be used in the many error paths. - fn save_writer_helper(&mut self, recording: &db::RecordingToInsert, f: fs::File) - -> Result<(), Error> { + fn save_helper(&mut self, recording: &db::RecordingToInsert, f: fs::File) + -> Result<(), Error> { self.try_unlink(); if !self.to_unlink.is_empty() { return Err(Error::new(format!("failed to unlink {} files.", self.to_unlink.len()))); @@ -398,3 +406,143 @@ impl SyncerState { }); } } + +/// Single-use struct to write a single recording to disk and commit its metadata to the database. +/// Use `SampleFileDir::create_writer` to create a new writer. `Writer` hands off its state to the +/// syncer when done. It either saves the recording to the database (if I/O errors do not prevent +/// this) or marks it as abandoned so that the syncer will attempt to unlink the file. +pub struct Writer<'a>(Option>); + +/// The state associated with a `Writer`. The indirection is for the `Drop` trait; `close` moves +/// `f` and `index.video_index` out of the `InnerWriter`, which is not allowed on a struct with +/// a `Drop` trait. To avoid this problem, the real state is surrounded by an `Option`. The +/// `Option` should none only after close is called, and thus never in a way visible to callers. +struct InnerWriter<'a> { + syncer_channel: &'a SyncerChannel, + f: fs::File, + index: recording::SampleIndexEncoder, + uuid: Uuid, + corrupt: bool, + hasher: hash::Hasher, + start_time: recording::Time, + local_time: recording::Time, + camera_id: i32, + video_sample_entry_id: i32, + + /// A sample which has been written to disk but not added to `index`. Index writes are one + /// sample behind disk writes because the duration of a sample is the difference between its + /// pts and the next sample's pts. A sample is flushed when the next sample is written, when + /// the writer is closed cleanly (the caller supplies the next pts), or when the writer is + /// closed uncleanly (with a zero duration, which the `.mp4` format allows only at the end). + unflushed_sample: Option, +} + +struct UnflushedSample { + pts_90k: i64, + len: i32, + is_key: bool, +} + +impl<'a> Writer<'a> { + /// Opens the writer; for use by `SampleFileDir` (which should supply `f`). + fn open(f: fs::File, uuid: Uuid, start_time: recording::Time, local_time: recording::Time, + camera_id: i32, video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel) + -> Result { + Ok(Writer(Some(InnerWriter{ + syncer_channel: syncer_channel, + f: f, + index: recording::SampleIndexEncoder::new(), + uuid: uuid, + corrupt: false, + hasher: hash::Hasher::new(hash::Type::SHA1)?, + start_time: start_time, + local_time: local_time, + camera_id: camera_id, + video_sample_entry_id: video_sample_entry_id, + unflushed_sample: None, + }))) + } + + pub fn write(&mut self, pkt: &[u8], pts_90k: i64, is_key: bool) -> Result<(), Error> { + let w = self.0.as_mut().unwrap(); + if let Some(unflushed) = w.unflushed_sample.take() { + let duration = (pts_90k - unflushed.pts_90k) as i32; + w.index.add_sample(duration, unflushed.len, unflushed.is_key); + } + let mut remaining = pkt; + while !remaining.is_empty() { + let written = match w.f.write(remaining) { + Ok(b) => b, + Err(e) => { + if remaining.len() < pkt.len() { + // Partially written packet. Truncate if possible. + if let Err(e2) = w.f.set_len(w.index.sample_file_bytes as u64) { + error!("After write to {} failed with {}, truncate failed with {}; \ + sample file is corrupt.", w.uuid.hyphenated(), e, e2); + w.corrupt = true; + } + } + return Err(Error::from(e)); + }, + }; + remaining = &remaining[written..]; + } + w.unflushed_sample = Some(UnflushedSample{ + pts_90k: pts_90k, + len: pkt.len() as i32, + is_key: is_key}); + w.hasher.update(pkt)?; + Ok(()) + } + + /// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's + /// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait, + /// swallowing errors and using a zero duration for the last sample. + pub fn close(mut self, next_pts: Option) -> Result { + self.0.take().unwrap().close(next_pts) + } +} + +impl<'a> InnerWriter<'a> { + fn close(mut self, next_pts: Option) -> Result { + if self.corrupt { + self.syncer_channel.async_abandon_recording(self.uuid); + return Err(Error::new(format!("recording {} is corrupt", self.uuid))); + } + if let Some(unflushed) = self.unflushed_sample.take() { + let duration = match next_pts { + None => 0, + Some(p) => (p - unflushed.pts_90k) as i32, + }; + self.index.add_sample(duration, unflushed.len, unflushed.is_key); + } + let mut sha1_bytes = [0u8; 20]; + sha1_bytes.copy_from_slice(&self.hasher.finish()?[..]); + let end = self.start_time + recording::Duration(self.index.total_duration_90k as i64); + let recording = db::RecordingToInsert{ + camera_id: self.camera_id, + sample_file_bytes: self.index.sample_file_bytes, + time: self.start_time .. end, + local_time: self.local_time, + video_samples: self.index.video_samples, + video_sync_samples: self.index.video_sync_samples, + video_sample_entry_id: self.video_sample_entry_id, + sample_file_uuid: self.uuid, + video_index: self.index.video_index, + sample_file_sha1: sha1_bytes, + }; + self.syncer_channel.async_save_recording(recording, self.f); + Ok(end) + } +} + +impl<'a> Drop for Writer<'a> { + fn drop(&mut self) { + if let Some(w) = self.0.take() { + // Swallow any error. The caller should only drop the Writer without calling close() + // if there's already been an error. The caller should report that. No point in + // complaining again. + let _ = w.close(None); + } + } +} diff --git a/src/main.rs b/src/main.rs index a5da2ae..b0ab6c0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -71,6 +71,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; +mod clock; mod db; mod dir; mod error; @@ -157,11 +158,18 @@ fn main() { let (syncer_channel, syncer_join) = dir::start_syncer(dir.clone()).unwrap(); let l = db.lock(); let cameras = l.cameras_by_id().len(); + let env = streamer::Environment{ + db: &db, + dir: &dir, + clock: &clock::REAL, + opener: &*stream::FFMPEG, + shutdown: &shutdown, + }; for (i, (id, camera)) in l.cameras_by_id().iter().enumerate() { let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / cameras as i64; - let mut streamer = streamer::Streamer::new( - db.clone(), dir.clone(), syncer_channel.clone(), shutdown.clone(), *id, camera, - rotate_offset_sec); + let mut streamer = streamer::Streamer::new(&env, syncer_channel.clone(), *id, camera, + rotate_offset_sec, + streamer::ROTATE_INTERVAL_SEC); let name = format!("stream-{}", streamer.short_name()); streamers.push(thread::Builder::new().name(name).spawn(move|| { streamer.run(); diff --git a/src/mp4.rs b/src/mp4.rs index 441b778..0058a0c 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -1200,7 +1200,7 @@ mod tests { use std::sync::Arc; use std::str; use super::*; - use stream::StreamSource; + use stream::{self, Opener, Stream}; use testutil::{self, TestDb}; use uuid::Uuid; @@ -1397,25 +1397,34 @@ mod tests { } fn copy_mp4_to_db(db: &TestDb) { - let mut input = StreamSource::File("src/testdata/clip.mp4").open().unwrap(); + let mut input = + stream::FFMPEG.open(stream::Source::File("src/testdata/clip.mp4")).unwrap(); // 2015-04-26 00:00:00 UTC. const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); let extra_data = input.get_extra_data().unwrap(); let video_sample_entry_id = db.db.lock().insert_video_sample_entry( extra_data.width, extra_data.height, &extra_data.sample_entry).unwrap(); - let mut output = db.dir.create_writer(START_TIME, START_TIME, TEST_CAMERA_ID, - video_sample_entry_id).unwrap(); + let mut output = db.dir.create_writer(&db.syncer_channel, START_TIME, START_TIME, + TEST_CAMERA_ID, video_sample_entry_id).unwrap(); + + // end_pts is the pts of the end of the most recent frame (start + duration). + // It's needed because dir::Writer calculates a packet's duration from its pts and the + // next packet's pts. That's more accurate for RTSP than ffmpeg's estimate of duration. + // To write the final packet of this sample .mp4 with a full duration, we need to fake a + // next packet's pts from the ffmpeg-supplied duration. + let mut end_pts = None; loop { let pkt = match input.get_next() { Ok(p) => p, Err(ffmpeg::Error::Eof) => { break; }, Err(e) => { panic!("unexpected input error: {}", e); }, }; - output.write(pkt.data().expect("packet without data"), pkt.duration() as i32, + output.write(pkt.data().expect("packet without data"), pkt.pts().unwrap(), pkt.is_key()).unwrap(); + end_pts = Some(pkt.pts().unwrap() + pkt.duration()); } - db.syncer_channel.async_save_writer(output).unwrap(); + output.close(end_pts).unwrap(); db.syncer_channel.flush(); } @@ -1475,8 +1484,8 @@ mod tests { } fn compare_mp4s(new_filename: &str, pts_offset: i64, shorten: i64) { - let mut orig = StreamSource::File("src/testdata/clip.mp4").open().unwrap(); - let mut new = StreamSource::File(new_filename).open().unwrap(); + let mut orig = stream::FFMPEG.open(stream::Source::File("src/testdata/clip.mp4")).unwrap(); + let mut new = stream::FFMPEG.open(stream::Source::File(new_filename)).unwrap(); assert_eq!(orig.get_extra_data().unwrap(), new.get_extra_data().unwrap()); let mut final_durations = None; loop { diff --git a/src/recording.rs b/src/recording.rs index 59502d1..8497d01 100644 --- a/src/recording.rs +++ b/src/recording.rs @@ -35,15 +35,11 @@ extern crate uuid; use db; use std::ops; use error::Error; -use openssl::crypto::hash; use std::fmt; -use std::fs; -use std::io::Write; use std::ops::Range; use std::string::String; use std::sync::MutexGuard; use time; -use uuid::Uuid; pub const TIME_UNITS_PER_SEC: i64 = 90000; pub const DESIRED_RECORDING_DURATION: i64 = 60 * TIME_UNITS_PER_SEC; @@ -170,18 +166,6 @@ pub struct SampleIndexEncoder { pub video_index: Vec, } -pub struct Writer { - f: fs::File, - index: SampleIndexEncoder, - uuid: Uuid, - corrupt: bool, - hasher: hash::Hasher, - start_time: Time, - local_time: Time, - camera_id: i32, - video_sample_entry_id: i32, -} - /// Zigzag-encodes a signed integer, as in [protocol buffer /// encoding](https://developers.google.com/protocol-buffers/docs/encoding#types). Uses the low bit /// to indicate signedness (1 = negative, 0 = non-negative). @@ -355,72 +339,6 @@ impl SampleIndexEncoder { } } -impl Writer { - pub fn open(f: fs::File, uuid: Uuid, start_time: Time, local_time: Time, - camera_id: i32, video_sample_entry_id: i32) -> Result { - Ok(Writer{ - f: f, - index: SampleIndexEncoder::new(), - uuid: uuid, - corrupt: false, - hasher: hash::Hasher::new(hash::Type::SHA1)?, - start_time: start_time, - local_time: local_time, - camera_id: camera_id, - video_sample_entry_id: video_sample_entry_id, - }) - } - - pub fn write(&mut self, pkt: &[u8], duration_90k: i32, is_key: bool) -> Result<(), Error> { - let mut remaining = pkt; - while !remaining.is_empty() { - let written = match self.f.write(remaining) { - Ok(b) => b, - Err(e) => { - if remaining.len() < pkt.len() { - // Partially written packet. Truncate if possible. - if let Err(e2) = self.f.set_len(self.index.sample_file_bytes as u64) { - error!("After write to {} failed with {}, truncate failed with {}; \ - sample file is corrupt.", self.uuid.hyphenated(), e, e2); - self.corrupt = true; - } - } - return Err(Error::from(e)); - }, - }; - remaining = &remaining[written..]; - } - self.index.add_sample(duration_90k, pkt.len() as i32, is_key); - self.hasher.update(pkt)?; - Ok(()) - } - - pub fn end(&self) -> Time { - self.start_time + Duration(self.index.total_duration_90k as i64) - } - - // TODO: clean up this interface. - pub fn close(mut self) -> Result<(db::RecordingToInsert, fs::File), Error> { - if self.corrupt { - return Err(Error::new(format!("recording {} is corrupt", self.uuid))); - } - let mut sha1_bytes = [0u8; 20]; - sha1_bytes.copy_from_slice(&self.hasher.finish()?[..]); - Ok((db::RecordingToInsert{ - camera_id: self.camera_id, - sample_file_bytes: self.index.sample_file_bytes, - time: self.start_time .. self.end(), - local_time: self.local_time, - video_samples: self.index.video_samples, - video_sync_samples: self.index.video_sync_samples, - video_sample_entry_id: self.video_sample_entry_id, - sample_file_uuid: self.uuid, - video_index: self.index.video_index, - sample_file_sha1: sha1_bytes, - }, self.f)) - } -} - /// A segment represents a view of some or all of a single recording, starting from a key frame. /// Used by the `Mp4FileBuilder` class to splice together recordings into a single virtual .mp4. pub struct Segment { diff --git a/src/stream.rs b/src/stream.rs index 7008059..a3896f7 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -41,7 +41,11 @@ use std::sync; static START: sync::Once = sync::ONCE_INIT; -pub enum StreamSource<'a> { +lazy_static! { + pub static ref FFMPEG: Ffmpeg = Ffmpeg::new(); +} + +pub enum Source<'a> { #[cfg(test)] File(&'a str), // filename, for testing. @@ -76,21 +80,36 @@ extern "C" fn lock_callback(untyped_ptr: *mut *mut c_void, op: AVLockOp) -> c_in 0 } -impl<'a> StreamSource<'a> { - pub fn open(&self) -> Result { +pub trait Opener : Sync { + fn open(&self, src: Source) -> Result; +} + +pub trait Stream { + fn get_extra_data(&self) -> Result; + fn get_next(&mut self) -> Result; +} + +pub struct Ffmpeg {} + +impl Ffmpeg { + fn new() -> Ffmpeg { START.call_once(|| { unsafe { ffmpeg_sys::av_lockmgr_register(lock_callback); }; ffmpeg::init().unwrap(); ffmpeg::format::network::init(); - }); + Ffmpeg{} + } +} - let (input, discard_first) = match *self { +impl Opener for Ffmpeg { + fn open(&self, src: Source) -> Result { + let (input, discard_first) = match src { #[cfg(test)] - StreamSource::File(filename) => + Source::File(filename) => (format::input_with(&format!("file:{}", filename), ffmpeg::Dictionary::new())?, false), - StreamSource::Rtsp(url) => { + Source::Rtsp(url) => { let open_options = dict![ "rtsp_transport" => "tcp", // https://trac.ffmpeg.org/ticket/5018 workaround attempt. @@ -117,7 +136,7 @@ impl<'a> StreamSource<'a> { None => { return Err(Error::new("no video stream".to_owned())) }, }; - let mut stream = Stream{ + let mut stream = FfmpegStream{ input: input, video_i: video_i, }; @@ -131,13 +150,13 @@ impl<'a> StreamSource<'a> { } } -pub struct Stream { +pub struct FfmpegStream { input: format::context::Input, video_i: usize, } -impl Stream { - pub fn get_extra_data(&self) -> Result { +impl Stream for FfmpegStream { + fn get_extra_data(&self) -> Result { let video = self.input.stream(self.video_i).expect("can't get video stream known to exist"); let codec = video.codec(); let (extradata, width, height) = unsafe { @@ -150,7 +169,7 @@ impl Stream { h264::ExtraData::parse(extradata, width, height) } - pub fn get_next(&mut self) -> Result { + fn get_next(&mut self) -> Result { let mut pkt = ffmpeg::Packet::empty(); loop { pkt.read(&mut self.input)?; diff --git a/src/streamer.rs b/src/streamer.rs index 213ff97..2734c8d 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -28,6 +28,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use clock::Clock; use db::{Camera, Database}; use dir; use error::Error; @@ -36,37 +37,50 @@ use recording; use std::result::Result; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::thread; -use std::time::Duration; -use stream::StreamSource; +use stream; use time; pub static ROTATE_INTERVAL_SEC: i64 = 60; -pub struct Streamer { +/// Common state that can be used by multiple `Streamer` instances. +pub struct Environment<'a, 'b, C, S> where C: 'a + Clock, S: 'a + stream::Stream { + pub clock: &'a C, + pub opener: &'a stream::Opener, + pub db: &'b Arc, + pub dir: &'b Arc, + pub shutdown: &'b Arc, +} + +pub struct Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { shutdown: Arc, // State below is only used by the thread in Run. rotate_offset_sec: i64, + rotate_interval_sec: i64, db: Arc, dir: Arc, syncer_channel: dir::SyncerChannel, + clock: &'a C, + opener: &'a stream::Opener, camera_id: i32, short_name: String, url: String, redacted_url: String, } -impl Streamer { - pub fn new(db: Arc, dir: Arc, syncer_channel: dir::SyncerChannel, - shutdown: Arc, camera_id: i32, c: &Camera, rotate_offset_sec: i64) - -> Self { +impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { + pub fn new<'b>(env: &Environment<'a, 'b, C, S>, syncer_channel: dir::SyncerChannel, + camera_id: i32, c: &Camera, rotate_offset_sec: i64, + rotate_interval_sec: i64) -> Self { Streamer{ - shutdown: shutdown, + shutdown: env.shutdown.clone(), rotate_offset_sec: rotate_offset_sec, - db: db, - dir: dir, + rotate_interval_sec: rotate_interval_sec, + db: env.db.clone(), + dir: env.dir.clone(), syncer_channel: syncer_channel, + clock: env.clock, + opener: env.opener, camera_id: camera_id, short_name: c.short_name.to_owned(), url: format!("rtsp://{}:{}@{}{}", c.username, c.password, c.host, c.main_rtsp_path), @@ -79,9 +93,9 @@ impl Streamer { pub fn run(&mut self) { while !self.shutdown.load(Ordering::SeqCst) { if let Err(e) = self.run_once() { - let sleep_time = Duration::from_secs(1); + let sleep_time = time::Duration::seconds(1); warn!("{}: sleeping for {:?} after error: {}", self.short_name, sleep_time, e); - thread::sleep(sleep_time); + self.clock.sleep(sleep_time); } } info!("{}: shutting down", self.short_name); @@ -90,8 +104,7 @@ impl Streamer { fn run_once(&mut self) -> Result<(), Error> { info!("{}: Opening input: {}", self.short_name, self.redacted_url); - // TODO: mockability? - let mut stream = StreamSource::Rtsp(&self.url).open()?; + let mut stream = self.opener.open(stream::Source::Rtsp(&self.url))?; // TODO: verify time base. // TODO: verify width/height. let extra_data = stream.get_extra_data()?; @@ -101,39 +114,38 @@ impl Streamer { debug!("{}: video_sample_entry_id={}", self.short_name, video_sample_entry_id); let mut seen_key_frame = false; let mut rotate = None; - let mut writer: Option = None; + let mut writer: Option = None; let mut transformed = Vec::new(); let mut next_start = None; while !self.shutdown.load(Ordering::SeqCst) { let pkt = stream.get_next()?; + let pts = pkt.pts().ok_or_else(|| Error::new("packet with no pts".to_owned()))?; if !seen_key_frame && !pkt.is_key() { continue; } else if !seen_key_frame { debug!("{}: have first key frame", self.short_name); seen_key_frame = true; } - let frame_realtime = time::get_time(); + let frame_realtime = self.clock.get_time(); if let Some(r) = rotate { if frame_realtime.sec > r && pkt.is_key() { let w = writer.take().expect("rotate set implies writer is set"); - next_start = Some(w.end()); - // TODO: restore this log message. - // info!("{}: wrote {}: [{}, {})", self.short_name, r.sample_file_uuid, - // r.time.start, r.time.end); - self.syncer_channel.async_save_writer(w)?; + trace!("{}: write on normal rotation", self.short_name); + next_start = Some(w.close(Some(pts))?); } }; let mut w = match writer { Some(w) => w, None => { let r = frame_realtime.sec - - (frame_realtime.sec % ROTATE_INTERVAL_SEC) + + (frame_realtime.sec % self.rotate_interval_sec) + self.rotate_offset_sec; rotate = Some( - if r <= frame_realtime.sec { r + ROTATE_INTERVAL_SEC } else { r }); + if r <= frame_realtime.sec { r + self.rotate_interval_sec } else { r }); let local_realtime = recording::Time::new(frame_realtime); - self.dir.create_writer(next_start.unwrap_or(local_realtime), local_realtime, + self.dir.create_writer(&self.syncer_channel, + next_start.unwrap_or(local_realtime), local_realtime, self.camera_id, video_sample_entry_id)? }, }; @@ -147,12 +159,188 @@ impl Streamer { } else { orig_data }; - w.write(transformed_data, pkt.duration() as i32, pkt.is_key())?; + w.write(transformed_data, pts, pkt.is_key())?; writer = Some(w); } if let Some(w) = writer { - self.syncer_channel.async_save_writer(w)?; + w.close(None)?; } Ok(()) } } + +#[cfg(test)] +mod tests { + use clock::{self, Clock}; + use db; + use error::Error; + use ffmpeg; + use ffmpeg::packet::Mut; + use h264; + use recording; + use std::sync::{Arc, Mutex, MutexGuard}; + use std::sync::atomic::{AtomicBool, Ordering}; + use stream::{self, Opener, Stream}; + use testutil; + use time; + + struct ProxyingStream<'a> { + clock: &'a clock::SimulatedClock, + inner: stream::FfmpegStream, + last_duration: time::Duration, + ts_offset: i64, + ts_offset_pkts_left: u32, + pkts_left: u32, + } + + impl<'a> ProxyingStream<'a> { + fn new(clock: &'a clock::SimulatedClock, inner: stream::FfmpegStream) -> ProxyingStream { + ProxyingStream { + clock: clock, + inner: inner, + last_duration: time::Duration::seconds(0), + ts_offset: 0, + ts_offset_pkts_left: 0, + pkts_left: 0, + } + } + } + + impl<'a> Stream for ProxyingStream<'a> { + fn get_next(&mut self) -> Result { + if self.pkts_left == 0 { + return Err(ffmpeg::Error::Eof); + } + self.pkts_left -= 1; + + // Advance clock to when this packet starts. + self.clock.sleep(self.last_duration); + + let mut pkt = self.inner.get_next()?; + + self.last_duration = time::Duration::nanoseconds( + pkt.duration() * 1_000_000_000 / recording::TIME_UNITS_PER_SEC); + + if self.ts_offset_pkts_left > 0 { + self.ts_offset_pkts_left -= 1; + let old_pts = pkt.pts().unwrap(); + let old_dts = pkt.dts(); + unsafe { + let pkt = pkt.as_mut_ptr(); + (*pkt).pts = old_pts + self.ts_offset; + (*pkt).dts = old_dts + self.ts_offset; + + // In a real rtsp stream, the duration of a packet is not known until the + // next packet. ffmpeg's duration is an unreliable estimate. + (*pkt).duration = recording::TIME_UNITS_PER_SEC as i32; + } + } + + Ok(pkt) + } + + fn get_extra_data(&self) -> Result { self.inner.get_extra_data() } + } + + struct MockOpener<'a> { + expected_url: String, + streams: Mutex>>, + shutdown: Arc, + } + + impl<'a> stream::Opener> for MockOpener<'a> { + fn open(&self, src: stream::Source) -> Result, Error> { + match src { + stream::Source::Rtsp(url) => assert_eq!(url, &self.expected_url), + stream::Source::File(_) => panic!("expected rtsp url"), + }; + let mut l = self.streams.lock().unwrap(); + match l.pop() { + Some(stream) => { + trace!("MockOpener returning next stream"); + Ok(stream) + }, + None => { + trace!("MockOpener shutting down"); + self.shutdown.store(true, Ordering::SeqCst); + Err(Error::new("done".to_owned())) + }, + } + } + } + + #[derive(Debug, Eq, PartialEq)] + struct Frame { + start_90k: i32, + duration_90k: i32, + is_key: bool, + } + + fn get_frames(db: &MutexGuard, recording_id: i64) -> Vec { + let rec = db.get_recording(recording_id).unwrap(); + let mut it = recording::SampleIndexIterator::new(); + let mut frames = Vec::new(); + while it.next(&rec.video_index).unwrap() { + frames.push(Frame{ + start_90k: it.start_90k, + duration_90k: it.duration_90k, + is_key: it.is_key, + }); + } + frames + } + + #[test] + fn basic() { + testutil::init(); + let clock = clock::SimulatedClock::new(); + clock.sleep(time::Duration::seconds(1430006400)); // 2015-04-26 00:00:00 UTC + let stream = stream::FFMPEG.open(stream::Source::File("src/testdata/clip.mp4")).unwrap(); + let mut stream = ProxyingStream::new(&clock, stream); + stream.ts_offset = 180000; // starting pts of the input should be irrelevant + stream.ts_offset_pkts_left = u32::max_value(); + stream.pkts_left = u32::max_value(); + let opener = MockOpener{ + expected_url: "rtsp://foo:bar@test-camera/main".to_owned(), + streams: Mutex::new(vec![stream]), + shutdown: Arc::new(AtomicBool::new(false)), + }; + let db = testutil::TestDb::new(); + let env = super::Environment{ + clock: &clock, + opener: &opener, + db: &db.db, + dir: &db.dir, + shutdown: &opener.shutdown, + }; + let mut stream; + { + let l = db.db.lock(); + let camera = l.cameras_by_id().get(&testutil::TEST_CAMERA_ID).unwrap(); + stream = super::Streamer::new(&env, db.syncer_channel.clone(), testutil::TEST_CAMERA_ID, + camera, 0, 5); + } + stream.run(); + assert!(opener.streams.lock().unwrap().is_empty()); + db.syncer_channel.flush(); + let db = db.db.lock(); + + // Compare frame-by-frame. Note below that while the rotation is scheduled to happen near + // 5-second boundaries (such as 2016-04-26 00:00:05), it gets deferred until the next key + // frame, which in this case is 00:00:07. + assert_eq!(get_frames(&db, 1), &[ + Frame{start_90k: 0, duration_90k: 90379, is_key: true}, + Frame{start_90k: 90379, duration_90k: 89884, is_key: false}, + Frame{start_90k: 180263, duration_90k: 89749, is_key: false}, + Frame{start_90k: 270012, duration_90k: 89981, is_key: false}, + Frame{start_90k: 359993, duration_90k: 90055, is_key: true}, + Frame{start_90k: 450048, duration_90k: 89967, is_key: false}, // pts_time 5.0005333... + Frame{start_90k: 540015, duration_90k: 90021, is_key: false}, + Frame{start_90k: 630036, duration_90k: 89958, is_key: false}, + ]); + assert_eq!(get_frames(&db, 2), &[ + Frame{start_90k: 0, duration_90k: 90011, is_key: true}, + Frame{start_90k: 90011, duration_90k: 0, is_key: false}, + ]); + } +}