camera clock frequency correction

As described in design/time.md:

* get the realtime-monotonic once at the start of a run and use the
  monotonic clock afterward to avoid problems with local time steps

* on every recording, try to correct the latest local_time_delta at up
  to 500 ppm

Let's see how this works...
This commit is contained in:
Scott Lamb 2016-12-29 21:05:57 -08:00
parent a71f6e66d8
commit 938d8a752f
6 changed files with 177 additions and 54 deletions

View File

@ -30,29 +30,45 @@
//! Clock interface and implementations for testability.
use libc;
#[cfg(test)] use std::sync::Mutex;
use std::mem;
use std::thread;
use time;
use time::{Duration, Timespec};
/// Abstract interface to the system clock. This is for testability.
pub trait Clock : Sync {
/// Gets the current time.
fn get_time(&self) -> time::Timespec;
/// Abstract interface to the system clocks. This is for testability.
pub trait Clocks : Sync {
/// Gets the current time from `CLOCK_REALTIME`.
fn realtime(&self) -> Timespec;
/// Gets the current time from `CLOCK_MONOTONIC`.
fn monotonic(&self) -> Timespec;
/// Causes the current thread to sleep for the specified time.
fn sleep(&self, how_long: time::Duration);
fn sleep(&self, how_long: Duration);
}
/// Singleton "real" clock.
pub static REAL: RealClock = RealClock {};
/// Singleton "real" clocks.
pub static REAL: RealClocks = RealClocks {};
/// Real clock; see static `REAL` instance.
pub struct RealClock {}
/// Real clocks; see static `REAL` instance.
pub struct RealClocks {}
impl Clock for RealClock {
fn get_time(&self) -> time::Timespec { time::get_time() }
impl RealClocks {
fn get(&self, clock: libc::clockid_t) -> Timespec {
unsafe {
let mut ts = mem::uninitialized();
assert_eq!(0, libc::clock_gettime(clock, &mut ts));
Timespec::new(ts.tv_sec, ts.tv_nsec as i32)
}
}
}
fn sleep(&self, how_long: time::Duration) {
impl Clocks for RealClocks {
fn realtime(&self) -> Timespec { self.get(libc::CLOCK_REALTIME) }
fn monotonic(&self) -> Timespec { self.get(libc::CLOCK_MONOTONIC) }
fn sleep(&self, how_long: Duration) {
match how_long.to_std() {
Ok(d) => thread::sleep(d),
Err(e) => warn!("Invalid duration {:?}: {}", how_long, e),
@ -62,20 +78,29 @@ impl Clock for RealClock {
/// Simulated clock for testing.
#[cfg(test)]
pub struct SimulatedClock(Mutex<time::Timespec>);
#[cfg(test)]
impl SimulatedClock {
pub fn new() -> SimulatedClock { SimulatedClock(Mutex::new(time::Timespec::new(0, 0))) }
pub struct SimulatedClocks {
boot: Timespec,
uptime: Mutex<Duration>,
}
#[cfg(test)]
impl Clock for SimulatedClock {
fn get_time(&self) -> time::Timespec { *self.0.lock().unwrap() }
impl SimulatedClocks {
pub fn new(boot: Timespec) -> SimulatedClocks {
SimulatedClocks {
boot: boot,
uptime: Mutex::new(Duration::seconds(0)),
}
}
}
#[cfg(test)]
impl Clocks for SimulatedClocks {
fn realtime(&self) -> Timespec { self.boot + *self.uptime.lock().unwrap() }
fn monotonic(&self) -> Timespec { Timespec::new(0, 0) + *self.uptime.lock().unwrap() }
/// Advances the clock by the specified amount without actually sleeping.
fn sleep(&self, how_long: time::Duration) {
let mut l = self.0.lock().unwrap();
fn sleep(&self, how_long: Duration) {
let mut l = self.uptime.lock().unwrap();
*l = *l + how_long;
}
}

View File

@ -253,7 +253,7 @@ pub struct RecordingToInsert {
pub flags: i32,
pub sample_file_bytes: i32,
pub time: Range<recording::Time>,
pub local_time: recording::Time,
pub local_time_delta: recording::Duration,
pub video_samples: i32,
pub video_sync_samples: i32,
pub video_sample_entry_id: i32,
@ -640,7 +640,7 @@ impl<'a> Transaction<'a> {
(":sample_file_bytes", &r.sample_file_bytes),
(":start_time_90k", &r.time.start.0),
(":duration_90k", &(r.time.end.0 - r.time.start.0)),
(":local_time_delta_90k", &(r.local_time.0 - r.time.start.0)),
(":local_time_delta_90k", &r.local_time_delta.0),
(":video_samples", &r.video_samples),
(":video_sync_samples", &r.video_sync_samples),
(":video_sample_entry_id", &r.video_sample_entry_id),
@ -1460,7 +1460,7 @@ mod tests {
run_offset: 0,
flags: 0,
time: start .. start + recording::Duration(TIME_UNITS_PER_SEC),
local_time: start,
local_time_delta: recording::Duration(0),
video_samples: 1,
video_sync_samples: 1,
video_sample_entry_id: vse_id,

View File

@ -438,6 +438,8 @@ struct InnerWriter<'a> {
/// information. This will be used as the official start time iff `prev_end` is None.
local_start: Option<recording::Time>,
adjuster: ClockAdjuster,
camera_id: i32,
video_sample_entry_id: i32,
run_offset: i32,
@ -450,6 +452,51 @@ struct InnerWriter<'a> {
unflushed_sample: Option<UnflushedSample>,
}
/// Adjusts durations given by the camera to correct its clock frequency error.
struct ClockAdjuster {
/// Every `every_minus_1 + 1` units, add `-ndir`.
/// Note i32::max_value() disables adjustment.
every_minus_1: i32,
/// Should be 1 or -1 (unless disabled).
ndir: i32,
/// Keeps accumulated difference from previous values.
cur: i32,
}
impl ClockAdjuster {
fn new(local_time_delta: Option<i64>) -> Self {
// Correct up to 500 ppm, or 2,700/90,000ths of a second over the course of a minute.
let (every, ndir) = match local_time_delta {
None | Some(0) => (i32::max_value(), 0),
Some(d) if d <= -2700 => (2000, 1),
Some(d) if d >= 2700 => (2000, -1),
Some(d) if d < -60 => ((60 * 90000) / -(d as i32), 1),
Some(d) => ((60 * 90000) / (d as i32), -1),
};
ClockAdjuster{
every_minus_1: every - 1,
ndir: ndir,
cur: 0,
}
}
fn adjust(&mut self, mut val: i32) -> i32 {
self.cur += val;
// The "val > self.ndir" here is so that if decreasing durations (ndir == 1), we don't
// cause a duration of 1 to become a duration of 0. It has no effect when increasing
// durations. (There's no danger of a duration of 0 becoming a duration of 1; cur wouldn't
// be newly > self.every_minus_1.)
while self.cur > self.every_minus_1 && val > self.ndir {
val -= self.ndir;
self.cur -= self.every_minus_1 + 1;
}
val
}
}
struct UnflushedSample {
local_time: recording::Time,
pts_90k: i64,
@ -460,6 +507,7 @@ struct UnflushedSample {
#[derive(Copy, Clone)]
pub struct PreviousWriter {
end_time: recording::Time,
local_time_delta: recording::Duration,
run_offset: i32,
}
@ -476,9 +524,10 @@ impl<'a> Writer<'a> {
hasher: hash::Hasher::new(hash::Type::SHA1)?,
prev_end: prev.map(|p| p.end_time),
local_start: None,
adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)),
camera_id: camera_id,
video_sample_entry_id: video_sample_entry_id,
run_offset: prev.map(|p| p.run_offset).unwrap_or(0),
run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0),
unflushed_sample: None,
})))
}
@ -489,7 +538,7 @@ impl<'a> Writer<'a> {
is_key: bool) -> Result<(), Error> {
let w = self.0.as_mut().unwrap();
if let Some(unflushed) = w.unflushed_sample.take() {
let duration = (pts_90k - unflushed.pts_90k) as i32;
let duration = w.adjuster.adjust((pts_90k - unflushed.pts_90k) as i32);
w.index.add_sample(duration, unflushed.len, unflushed.is_key);
w.local_start = Some(w.extend_local_start(unflushed.local_time));
}
@ -544,23 +593,24 @@ impl<'a> InnerWriter<'a> {
}
let unflushed =
self.unflushed_sample.take().ok_or_else(|| Error::new("no packets!".to_owned()))?;
let duration = match next_pts {
let duration = self.adjuster.adjust(match next_pts {
None => 0,
Some(p) => (p - unflushed.pts_90k) as i32,
};
});
self.index.add_sample(duration, unflushed.len, unflushed.is_key);
let local_start = self.extend_local_start(unflushed.local_time);
let mut sha1_bytes = [0u8; 20];
sha1_bytes.copy_from_slice(&self.hasher.finish()?[..]);
let start_time = self.prev_end.unwrap_or(local_start);
let end = start_time + recording::Duration(self.index.total_duration_90k as i64);
let start = self.prev_end.unwrap_or(local_start);
let end = start + recording::Duration(self.index.total_duration_90k as i64);
let flags = if self.index.has_trailing_zero() { db::RecordingFlags::TrailingZero as i32 }
else { 0 };
let local_start_delta = local_start - start;
let recording = db::RecordingToInsert{
camera_id: self.camera_id,
sample_file_bytes: self.index.sample_file_bytes,
time: start_time .. end,
local_time: local_start,
time: start .. end,
local_time_delta: local_start_delta,
video_samples: self.index.video_samples,
video_sync_samples: self.index.video_sync_samples,
video_sample_entry_id: self.video_sample_entry_id,
@ -573,6 +623,7 @@ impl<'a> InnerWriter<'a> {
self.syncer_channel.async_save_recording(recording, self.f);
Ok(PreviousWriter{
end_time: end,
local_time_delta: local_start_delta,
run_offset: self.run_offset,
})
}
@ -588,3 +639,48 @@ impl<'a> Drop for Writer<'a> {
}
}
}
#[cfg(test)]
mod tests {
use super::ClockAdjuster;
use testutil;
#[test]
fn adjust() {
testutil::init();
// no-ops.
let mut a = ClockAdjuster::new(None);
for _ in 0..1800 {
assert_eq!(3000, a.adjust(3000));
}
a = ClockAdjuster::new(Some(0));
for _ in 0..1800 {
assert_eq!(3000, a.adjust(3000));
}
// typical, 100 ppm adjustment.
a = ClockAdjuster::new(Some(-540));
let mut total = 0;
for _ in 0..1800 {
let new = a.adjust(3000);
assert!(new == 2999 || new == 3000);
total += new;
}
let expected = 1800*3000 - 540;
assert!(total == expected || total == expected + 1, "total={} vs expected={}",
total, expected);
// capped at 500 ppm (change of 2,700/90,000ths over 1 minute).
a = ClockAdjuster::new(Some(-1_000_000));
total = 0;
for _ in 0..1800 {
let new = a.adjust(3000);
assert!(new == 2998 || new == 2999, "new={}", new);
total += new;
}
let expected = 1800*3000 - 2700;
assert!(total == expected || total == expected + 1, "total={} vs expected={}",
total, expected);
}
}

View File

@ -181,7 +181,7 @@ fn run(args: Args, conn: rusqlite::Connection, signal: &chan::Receiver<chan_sign
let env = streamer::Environment{
db: &db,
dir: &dir,
clock: &clock::REAL,
clocks: &clock::REAL,
opener: &*stream::FFMPEG,
shutdown: &shutdown,
};

View File

@ -28,7 +28,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use clock::Clock;
use clock::Clocks;
use db::{Camera, Database};
use dir;
use error::Error;
@ -43,15 +43,15 @@ use time;
pub static ROTATE_INTERVAL_SEC: i64 = 60;
/// Common state that can be used by multiple `Streamer` instances.
pub struct Environment<'a, 'b, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
pub clock: &'a C,
pub struct Environment<'a, 'b, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
pub clocks: &'a C,
pub opener: &'a stream::Opener<S>,
pub db: &'b Arc<Database>,
pub dir: &'b Arc<dir::SampleFileDir>,
pub shutdown: &'b Arc<AtomicBool>,
}
pub struct Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
pub struct Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
shutdown: Arc<AtomicBool>,
// State below is only used by the thread in Run.
@ -60,7 +60,7 @@ pub struct Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
db: Arc<Database>,
dir: Arc<dir::SampleFileDir>,
syncer_channel: dir::SyncerChannel,
clock: &'a C,
clocks: &'a C,
opener: &'a stream::Opener<S>,
camera_id: i32,
short_name: String,
@ -75,7 +75,7 @@ struct WriterState<'a> {
rotate: i64,
}
impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream {
pub fn new<'b>(env: &Environment<'a, 'b, C, S>, syncer_channel: dir::SyncerChannel,
camera_id: i32, c: &Camera, rotate_offset_sec: i64,
rotate_interval_sec: i64) -> Self {
@ -86,7 +86,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
db: env.db.clone(),
dir: env.dir.clone(),
syncer_channel: syncer_channel,
clock: env.clock,
clocks: env.clocks,
opener: env.opener,
camera_id: camera_id,
short_name: c.short_name.to_owned(),
@ -102,7 +102,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
if let Err(e) = self.run_once() {
let sleep_time = time::Duration::seconds(1);
warn!("{}: sleeping for {:?} after error: {}", self.short_name, sleep_time, e);
self.clock.sleep(sleep_time);
self.clocks.sleep(sleep_time);
}
}
info!("{}: shutting down", self.short_name);
@ -112,6 +112,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
info!("{}: Opening input: {}", self.short_name, self.redacted_url);
let mut stream = self.opener.open(stream::Source::Rtsp(&self.url))?;
let realtime_offset = self.clocks.realtime() - self.clocks.monotonic();
// TODO: verify time base.
// TODO: verify width/height.
let extra_data = stream.get_extra_data()?;
@ -132,7 +133,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
debug!("{}: have first key frame", self.short_name);
seen_key_frame = true;
}
let frame_realtime = self.clock.get_time();
let frame_realtime = self.clocks.monotonic() + realtime_offset;
let local_time = recording::Time::new(frame_realtime);
state = if let Some(s) = state {
if frame_realtime.sec > s.rotate && pkt.is_key() {
@ -185,7 +186,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
#[cfg(test)]
mod tests {
use clock::{self, Clock};
use clock::{self, Clocks};
use db;
use error::Error;
use ffmpeg;
@ -200,7 +201,7 @@ mod tests {
use time;
struct ProxyingStream<'a> {
clock: &'a clock::SimulatedClock,
clocks: &'a clock::SimulatedClocks,
inner: stream::FfmpegStream,
buffered: time::Duration,
slept: time::Duration,
@ -210,11 +211,11 @@ mod tests {
}
impl<'a> ProxyingStream<'a> {
fn new(clock: &'a clock::SimulatedClock, buffered: time::Duration,
fn new(clocks: &'a clock::SimulatedClocks, buffered: time::Duration,
inner: stream::FfmpegStream) -> ProxyingStream {
clock.sleep(buffered);
clocks.sleep(buffered);
ProxyingStream {
clock: clock,
clocks: clocks,
inner: inner,
buffered: buffered,
slept: time::Duration::seconds(0),
@ -244,7 +245,7 @@ mod tests {
let duration = goal - self.slept;
let buf_part = cmp::min(self.buffered, duration);
self.buffered = self.buffered - buf_part;
self.clock.sleep(duration - buf_part);
self.clocks.sleep(duration - buf_part);
self.slept = goal;
}
@ -321,11 +322,12 @@ mod tests {
#[test]
fn basic() {
testutil::init();
let clock = clock::SimulatedClock::new();
// 2015-04-25 00:00:00 UTC
let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0));
clocks.sleep(time::Duration::seconds(86400)); // to 2015-04-26 00:00:00 UTC
clock.sleep(time::Duration::seconds(1430006400)); // 2015-04-26 00:00:00 UTC
let stream = stream::FFMPEG.open(stream::Source::File("src/testdata/clip.mp4")).unwrap();
let mut stream = ProxyingStream::new(&clock, time::Duration::seconds(2), stream);
let mut stream = ProxyingStream::new(&clocks, time::Duration::seconds(2), stream);
stream.ts_offset = 180000; // starting pts of the input should be irrelevant
stream.ts_offset_pkts_left = u32::max_value();
stream.pkts_left = u32::max_value();
@ -336,7 +338,7 @@ mod tests {
};
let db = testutil::TestDb::new();
let env = super::Environment{
clock: &clock,
clocks: &clocks,
opener: &opener,
db: &db.db,
dir: &db.dir,

View File

@ -131,7 +131,7 @@ impl TestDb {
sample_file_bytes: encoder.sample_file_bytes,
time: START_TIME ..
START_TIME + recording::Duration(encoder.total_duration_90k as i64),
local_time: START_TIME,
local_time_delta: recording::Duration(0),
video_samples: encoder.video_samples,
video_sync_samples: encoder.video_sync_samples,
video_sample_entry_id: video_sample_entry_id,