diff --git a/src/clock.rs b/src/clock.rs index f871d8d..c746e70 100644 --- a/src/clock.rs +++ b/src/clock.rs @@ -30,29 +30,45 @@ //! Clock interface and implementations for testability. +use libc; #[cfg(test)] use std::sync::Mutex; +use std::mem; use std::thread; -use time; +use time::{Duration, Timespec}; -/// Abstract interface to the system clock. This is for testability. -pub trait Clock : Sync { - /// Gets the current time. - fn get_time(&self) -> time::Timespec; +/// Abstract interface to the system clocks. This is for testability. +pub trait Clocks : Sync { + /// Gets the current time from `CLOCK_REALTIME`. + fn realtime(&self) -> Timespec; + + /// Gets the current time from `CLOCK_MONOTONIC`. + fn monotonic(&self) -> Timespec; /// Causes the current thread to sleep for the specified time. - fn sleep(&self, how_long: time::Duration); + fn sleep(&self, how_long: Duration); } -/// Singleton "real" clock. -pub static REAL: RealClock = RealClock {}; +/// Singleton "real" clocks. +pub static REAL: RealClocks = RealClocks {}; -/// Real clock; see static `REAL` instance. -pub struct RealClock {} +/// Real clocks; see static `REAL` instance. +pub struct RealClocks {} -impl Clock for RealClock { - fn get_time(&self) -> time::Timespec { time::get_time() } +impl RealClocks { + fn get(&self, clock: libc::clockid_t) -> Timespec { + unsafe { + let mut ts = mem::uninitialized(); + assert_eq!(0, libc::clock_gettime(clock, &mut ts)); + Timespec::new(ts.tv_sec, ts.tv_nsec as i32) + } + } +} - fn sleep(&self, how_long: time::Duration) { +impl Clocks for RealClocks { + fn realtime(&self) -> Timespec { self.get(libc::CLOCK_REALTIME) } + fn monotonic(&self) -> Timespec { self.get(libc::CLOCK_MONOTONIC) } + + fn sleep(&self, how_long: Duration) { match how_long.to_std() { Ok(d) => thread::sleep(d), Err(e) => warn!("Invalid duration {:?}: {}", how_long, e), @@ -62,20 +78,29 @@ impl Clock for RealClock { /// Simulated clock for testing. #[cfg(test)] -pub struct SimulatedClock(Mutex); - -#[cfg(test)] -impl SimulatedClock { - pub fn new() -> SimulatedClock { SimulatedClock(Mutex::new(time::Timespec::new(0, 0))) } +pub struct SimulatedClocks { + boot: Timespec, + uptime: Mutex, } #[cfg(test)] -impl Clock for SimulatedClock { - fn get_time(&self) -> time::Timespec { *self.0.lock().unwrap() } +impl SimulatedClocks { + pub fn new(boot: Timespec) -> SimulatedClocks { + SimulatedClocks { + boot: boot, + uptime: Mutex::new(Duration::seconds(0)), + } + } +} + +#[cfg(test)] +impl Clocks for SimulatedClocks { + fn realtime(&self) -> Timespec { self.boot + *self.uptime.lock().unwrap() } + fn monotonic(&self) -> Timespec { Timespec::new(0, 0) + *self.uptime.lock().unwrap() } /// Advances the clock by the specified amount without actually sleeping. - fn sleep(&self, how_long: time::Duration) { - let mut l = self.0.lock().unwrap(); + fn sleep(&self, how_long: Duration) { + let mut l = self.uptime.lock().unwrap(); *l = *l + how_long; } } diff --git a/src/db.rs b/src/db.rs index 408b237..77b4092 100644 --- a/src/db.rs +++ b/src/db.rs @@ -253,7 +253,7 @@ pub struct RecordingToInsert { pub flags: i32, pub sample_file_bytes: i32, pub time: Range, - pub local_time: recording::Time, + pub local_time_delta: recording::Duration, pub video_samples: i32, pub video_sync_samples: i32, pub video_sample_entry_id: i32, @@ -640,7 +640,7 @@ impl<'a> Transaction<'a> { (":sample_file_bytes", &r.sample_file_bytes), (":start_time_90k", &r.time.start.0), (":duration_90k", &(r.time.end.0 - r.time.start.0)), - (":local_time_delta_90k", &(r.local_time.0 - r.time.start.0)), + (":local_time_delta_90k", &r.local_time_delta.0), (":video_samples", &r.video_samples), (":video_sync_samples", &r.video_sync_samples), (":video_sample_entry_id", &r.video_sample_entry_id), @@ -1460,7 +1460,7 @@ mod tests { run_offset: 0, flags: 0, time: start .. start + recording::Duration(TIME_UNITS_PER_SEC), - local_time: start, + local_time_delta: recording::Duration(0), video_samples: 1, video_sync_samples: 1, video_sample_entry_id: vse_id, diff --git a/src/dir.rs b/src/dir.rs index 4398069..82571ea 100644 --- a/src/dir.rs +++ b/src/dir.rs @@ -438,6 +438,8 @@ struct InnerWriter<'a> { /// information. This will be used as the official start time iff `prev_end` is None. local_start: Option, + adjuster: ClockAdjuster, + camera_id: i32, video_sample_entry_id: i32, run_offset: i32, @@ -450,6 +452,51 @@ struct InnerWriter<'a> { unflushed_sample: Option, } +/// Adjusts durations given by the camera to correct its clock frequency error. +struct ClockAdjuster { + /// Every `every_minus_1 + 1` units, add `-ndir`. + /// Note i32::max_value() disables adjustment. + every_minus_1: i32, + + /// Should be 1 or -1 (unless disabled). + ndir: i32, + + /// Keeps accumulated difference from previous values. + cur: i32, +} + +impl ClockAdjuster { + fn new(local_time_delta: Option) -> Self { + // Correct up to 500 ppm, or 2,700/90,000ths of a second over the course of a minute. + let (every, ndir) = match local_time_delta { + None | Some(0) => (i32::max_value(), 0), + Some(d) if d <= -2700 => (2000, 1), + Some(d) if d >= 2700 => (2000, -1), + Some(d) if d < -60 => ((60 * 90000) / -(d as i32), 1), + Some(d) => ((60 * 90000) / (d as i32), -1), + }; + ClockAdjuster{ + every_minus_1: every - 1, + ndir: ndir, + cur: 0, + } + } + + fn adjust(&mut self, mut val: i32) -> i32 { + self.cur += val; + + // The "val > self.ndir" here is so that if decreasing durations (ndir == 1), we don't + // cause a duration of 1 to become a duration of 0. It has no effect when increasing + // durations. (There's no danger of a duration of 0 becoming a duration of 1; cur wouldn't + // be newly > self.every_minus_1.) + while self.cur > self.every_minus_1 && val > self.ndir { + val -= self.ndir; + self.cur -= self.every_minus_1 + 1; + } + val + } +} + struct UnflushedSample { local_time: recording::Time, pts_90k: i64, @@ -460,6 +507,7 @@ struct UnflushedSample { #[derive(Copy, Clone)] pub struct PreviousWriter { end_time: recording::Time, + local_time_delta: recording::Duration, run_offset: i32, } @@ -476,9 +524,10 @@ impl<'a> Writer<'a> { hasher: hash::Hasher::new(hash::Type::SHA1)?, prev_end: prev.map(|p| p.end_time), local_start: None, + adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)), camera_id: camera_id, video_sample_entry_id: video_sample_entry_id, - run_offset: prev.map(|p| p.run_offset).unwrap_or(0), + run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0), unflushed_sample: None, }))) } @@ -489,7 +538,7 @@ impl<'a> Writer<'a> { is_key: bool) -> Result<(), Error> { let w = self.0.as_mut().unwrap(); if let Some(unflushed) = w.unflushed_sample.take() { - let duration = (pts_90k - unflushed.pts_90k) as i32; + let duration = w.adjuster.adjust((pts_90k - unflushed.pts_90k) as i32); w.index.add_sample(duration, unflushed.len, unflushed.is_key); w.local_start = Some(w.extend_local_start(unflushed.local_time)); } @@ -544,23 +593,24 @@ impl<'a> InnerWriter<'a> { } let unflushed = self.unflushed_sample.take().ok_or_else(|| Error::new("no packets!".to_owned()))?; - let duration = match next_pts { + let duration = self.adjuster.adjust(match next_pts { None => 0, Some(p) => (p - unflushed.pts_90k) as i32, - }; + }); self.index.add_sample(duration, unflushed.len, unflushed.is_key); let local_start = self.extend_local_start(unflushed.local_time); let mut sha1_bytes = [0u8; 20]; sha1_bytes.copy_from_slice(&self.hasher.finish()?[..]); - let start_time = self.prev_end.unwrap_or(local_start); - let end = start_time + recording::Duration(self.index.total_duration_90k as i64); + let start = self.prev_end.unwrap_or(local_start); + let end = start + recording::Duration(self.index.total_duration_90k as i64); let flags = if self.index.has_trailing_zero() { db::RecordingFlags::TrailingZero as i32 } else { 0 }; + let local_start_delta = local_start - start; let recording = db::RecordingToInsert{ camera_id: self.camera_id, sample_file_bytes: self.index.sample_file_bytes, - time: start_time .. end, - local_time: local_start, + time: start .. end, + local_time_delta: local_start_delta, video_samples: self.index.video_samples, video_sync_samples: self.index.video_sync_samples, video_sample_entry_id: self.video_sample_entry_id, @@ -573,6 +623,7 @@ impl<'a> InnerWriter<'a> { self.syncer_channel.async_save_recording(recording, self.f); Ok(PreviousWriter{ end_time: end, + local_time_delta: local_start_delta, run_offset: self.run_offset, }) } @@ -588,3 +639,48 @@ impl<'a> Drop for Writer<'a> { } } } + +#[cfg(test)] +mod tests { + use super::ClockAdjuster; + use testutil; + + #[test] + fn adjust() { + testutil::init(); + + // no-ops. + let mut a = ClockAdjuster::new(None); + for _ in 0..1800 { + assert_eq!(3000, a.adjust(3000)); + } + a = ClockAdjuster::new(Some(0)); + for _ in 0..1800 { + assert_eq!(3000, a.adjust(3000)); + } + + // typical, 100 ppm adjustment. + a = ClockAdjuster::new(Some(-540)); + let mut total = 0; + for _ in 0..1800 { + let new = a.adjust(3000); + assert!(new == 2999 || new == 3000); + total += new; + } + let expected = 1800*3000 - 540; + assert!(total == expected || total == expected + 1, "total={} vs expected={}", + total, expected); + + // capped at 500 ppm (change of 2,700/90,000ths over 1 minute). + a = ClockAdjuster::new(Some(-1_000_000)); + total = 0; + for _ in 0..1800 { + let new = a.adjust(3000); + assert!(new == 2998 || new == 2999, "new={}", new); + total += new; + } + let expected = 1800*3000 - 2700; + assert!(total == expected || total == expected + 1, "total={} vs expected={}", + total, expected); + } +} diff --git a/src/main.rs b/src/main.rs index f6b44fe..6ef3e66 100644 --- a/src/main.rs +++ b/src/main.rs @@ -181,7 +181,7 @@ fn run(args: Args, conn: rusqlite::Connection, signal: &chan::Receiver. -use clock::Clock; +use clock::Clocks; use db::{Camera, Database}; use dir; use error::Error; @@ -43,15 +43,15 @@ use time; pub static ROTATE_INTERVAL_SEC: i64 = 60; /// Common state that can be used by multiple `Streamer` instances. -pub struct Environment<'a, 'b, C, S> where C: 'a + Clock, S: 'a + stream::Stream { - pub clock: &'a C, +pub struct Environment<'a, 'b, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { + pub clocks: &'a C, pub opener: &'a stream::Opener, pub db: &'b Arc, pub dir: &'b Arc, pub shutdown: &'b Arc, } -pub struct Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { +pub struct Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { shutdown: Arc, // State below is only used by the thread in Run. @@ -60,7 +60,7 @@ pub struct Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { db: Arc, dir: Arc, syncer_channel: dir::SyncerChannel, - clock: &'a C, + clocks: &'a C, opener: &'a stream::Opener, camera_id: i32, short_name: String, @@ -75,7 +75,7 @@ struct WriterState<'a> { rotate: i64, } -impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { +impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { pub fn new<'b>(env: &Environment<'a, 'b, C, S>, syncer_channel: dir::SyncerChannel, camera_id: i32, c: &Camera, rotate_offset_sec: i64, rotate_interval_sec: i64) -> Self { @@ -86,7 +86,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { db: env.db.clone(), dir: env.dir.clone(), syncer_channel: syncer_channel, - clock: env.clock, + clocks: env.clocks, opener: env.opener, camera_id: camera_id, short_name: c.short_name.to_owned(), @@ -102,7 +102,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { if let Err(e) = self.run_once() { let sleep_time = time::Duration::seconds(1); warn!("{}: sleeping for {:?} after error: {}", self.short_name, sleep_time, e); - self.clock.sleep(sleep_time); + self.clocks.sleep(sleep_time); } } info!("{}: shutting down", self.short_name); @@ -112,6 +112,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { info!("{}: Opening input: {}", self.short_name, self.redacted_url); let mut stream = self.opener.open(stream::Source::Rtsp(&self.url))?; + let realtime_offset = self.clocks.realtime() - self.clocks.monotonic(); // TODO: verify time base. // TODO: verify width/height. let extra_data = stream.get_extra_data()?; @@ -132,7 +133,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { debug!("{}: have first key frame", self.short_name); seen_key_frame = true; } - let frame_realtime = self.clock.get_time(); + let frame_realtime = self.clocks.monotonic() + realtime_offset; let local_time = recording::Time::new(frame_realtime); state = if let Some(s) = state { if frame_realtime.sec > s.rotate && pkt.is_key() { @@ -185,7 +186,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { #[cfg(test)] mod tests { - use clock::{self, Clock}; + use clock::{self, Clocks}; use db; use error::Error; use ffmpeg; @@ -200,7 +201,7 @@ mod tests { use time; struct ProxyingStream<'a> { - clock: &'a clock::SimulatedClock, + clocks: &'a clock::SimulatedClocks, inner: stream::FfmpegStream, buffered: time::Duration, slept: time::Duration, @@ -210,11 +211,11 @@ mod tests { } impl<'a> ProxyingStream<'a> { - fn new(clock: &'a clock::SimulatedClock, buffered: time::Duration, + fn new(clocks: &'a clock::SimulatedClocks, buffered: time::Duration, inner: stream::FfmpegStream) -> ProxyingStream { - clock.sleep(buffered); + clocks.sleep(buffered); ProxyingStream { - clock: clock, + clocks: clocks, inner: inner, buffered: buffered, slept: time::Duration::seconds(0), @@ -244,7 +245,7 @@ mod tests { let duration = goal - self.slept; let buf_part = cmp::min(self.buffered, duration); self.buffered = self.buffered - buf_part; - self.clock.sleep(duration - buf_part); + self.clocks.sleep(duration - buf_part); self.slept = goal; } @@ -321,11 +322,12 @@ mod tests { #[test] fn basic() { testutil::init(); - let clock = clock::SimulatedClock::new(); + // 2015-04-25 00:00:00 UTC + let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0)); + clocks.sleep(time::Duration::seconds(86400)); // to 2015-04-26 00:00:00 UTC - clock.sleep(time::Duration::seconds(1430006400)); // 2015-04-26 00:00:00 UTC let stream = stream::FFMPEG.open(stream::Source::File("src/testdata/clip.mp4")).unwrap(); - let mut stream = ProxyingStream::new(&clock, time::Duration::seconds(2), stream); + let mut stream = ProxyingStream::new(&clocks, time::Duration::seconds(2), stream); stream.ts_offset = 180000; // starting pts of the input should be irrelevant stream.ts_offset_pkts_left = u32::max_value(); stream.pkts_left = u32::max_value(); @@ -336,7 +338,7 @@ mod tests { }; let db = testutil::TestDb::new(); let env = super::Environment{ - clock: &clock, + clocks: &clocks, opener: &opener, db: &db.db, dir: &db.dir, diff --git a/src/testutil.rs b/src/testutil.rs index a9e5dd2..18fab16 100644 --- a/src/testutil.rs +++ b/src/testutil.rs @@ -131,7 +131,7 @@ impl TestDb { sample_file_bytes: encoder.sample_file_bytes, time: START_TIME .. START_TIME + recording::Duration(encoder.total_duration_90k as i64), - local_time: START_TIME, + local_time_delta: recording::Duration(0), video_samples: encoder.video_samples, video_sync_samples: encoder.video_sync_samples, video_sample_entry_id: video_sample_entry_id,