From 036e8427e6a2d35a992f84e91a6963d1d5be421a Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Thu, 6 Aug 2020 05:16:38 -0700 Subject: [PATCH] complete wall/media time split (for #34) --- db/recording.rs | 40 +++++------ db/testutil.rs | 1 + db/writer.rs | 181 ++++++++++++------------------------------------ design/time.md | 149 ++++++++++++++++++++++----------------- src/mp4.rs | 22 +++--- src/streamer.rs | 11 +-- 6 files changed, 165 insertions(+), 239 deletions(-) diff --git a/db/recording.rs b/db/recording.rs index 450c929..7b08a26 100644 --- a/db/recording.rs +++ b/db/recording.rs @@ -165,14 +165,9 @@ impl SampleIndexEncoder { } pub fn add_sample(&mut self, duration_90k: i32, bytes: i32, is_key: bool, - r: &mut db::RecordingToInsert) -> Result<(), Error> { + r: &mut db::RecordingToInsert) { let duration_delta = duration_90k - self.prev_duration_90k; self.prev_duration_90k = duration_90k; - let new_duration_90k = r.wall_duration_90k + duration_90k; - if i64::from(new_duration_90k) > MAX_RECORDING_WALL_DURATION { - bail!("Duration {} exceeds maximum {}", new_duration_90k, MAX_RECORDING_WALL_DURATION); - } - r.wall_duration_90k += duration_90k; r.media_duration_90k += duration_90k; r.sample_file_bytes += bytes; r.video_samples += 1; @@ -188,7 +183,6 @@ impl SampleIndexEncoder { }; append_varint32((zigzag32(duration_delta) << 1) | (is_key as u32), &mut r.video_index); append_varint32(zigzag32(bytes_delta), &mut r.video_index); - Ok(()) } } @@ -380,11 +374,11 @@ mod tests { testutil::init(); let mut r = db::RecordingToInsert::default(); let mut e = SampleIndexEncoder::new(); - e.add_sample(10, 1000, true, &mut r).unwrap(); - e.add_sample(9, 10, false, &mut r).unwrap(); - e.add_sample(11, 15, false, &mut r).unwrap(); - e.add_sample(10, 12, false, &mut r).unwrap(); - e.add_sample(10, 1050, true, &mut r).unwrap(); + e.add_sample(10, 1000, true, &mut r); + e.add_sample(9, 10, false, &mut r); + e.add_sample(11, 15, false, &mut r); + e.add_sample(10, 12, false, &mut r); + e.add_sample(10, 1050, true, &mut r); assert_eq!(r.video_index, b"\x29\xd0\x0f\x02\x14\x08\x0a\x02\x05\x01\x64"); assert_eq!(10 + 9 + 11 + 10 + 10, r.media_duration_90k); assert_eq!(5, r.video_samples); @@ -411,7 +405,7 @@ mod tests { let mut r = db::RecordingToInsert::default(); let mut e = SampleIndexEncoder::new(); for sample in &samples { - e.add_sample(sample.duration_90k, sample.bytes, sample.is_key, &mut r).unwrap(); + e.add_sample(sample.duration_90k, sample.bytes, sample.is_key, &mut r); } let mut it = SampleIndexIterator::new(); for sample in &samples { @@ -468,7 +462,7 @@ mod tests { for i in 1..6 { let duration_90k = 2 * i; let bytes = 3 * i; - encoder.add_sample(duration_90k, bytes, true, &mut r).unwrap(); + encoder.add_sample(duration_90k, bytes, true, &mut r); } let db = TestDb::new(RealClocks {}); let row = db.insert_recording_from_encoder(r); @@ -487,7 +481,7 @@ mod tests { for i in 1..6 { let duration_90k = 2 * i; let bytes = 3 * i; - encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r).unwrap(); + encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r); } let db = TestDb::new(RealClocks {}); let row = db.insert_recording_from_encoder(r); @@ -502,9 +496,9 @@ mod tests { testutil::init(); let mut r = db::RecordingToInsert::default(); let mut encoder = SampleIndexEncoder::new(); - encoder.add_sample(1, 1, true, &mut r).unwrap(); - encoder.add_sample(1, 2, true, &mut r).unwrap(); - encoder.add_sample(0, 3, true, &mut r).unwrap(); + encoder.add_sample(1, 1, true, &mut r); + encoder.add_sample(1, 2, true, &mut r); + encoder.add_sample(0, 3, true, &mut r); let db = TestDb::new(RealClocks {}); let row = db.insert_recording_from_encoder(r); let segment = Segment::new(&db.db.lock(), &row, 1 .. 2).unwrap(); @@ -517,7 +511,7 @@ mod tests { testutil::init(); let mut r = db::RecordingToInsert::default(); let mut encoder = SampleIndexEncoder::new(); - encoder.add_sample(1, 1, true, &mut r).unwrap(); + encoder.add_sample(1, 1, true, &mut r); let db = TestDb::new(RealClocks {}); let row = db.insert_recording_from_encoder(r); let segment = Segment::new(&db.db.lock(), &row, 0 .. 0).unwrap(); @@ -534,7 +528,7 @@ mod tests { for i in 1..6 { let duration_90k = 2 * i; let bytes = 3 * i; - encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r).unwrap(); + encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r); } let db = TestDb::new(RealClocks {}); let row = db.insert_recording_from_encoder(r); @@ -547,9 +541,9 @@ mod tests { testutil::init(); let mut r = db::RecordingToInsert::default(); let mut encoder = SampleIndexEncoder::new(); - encoder.add_sample(1, 1, true, &mut r).unwrap(); - encoder.add_sample(1, 2, true, &mut r).unwrap(); - encoder.add_sample(0, 3, true, &mut r).unwrap(); + encoder.add_sample(1, 1, true, &mut r); + encoder.add_sample(1, 2, true, &mut r); + encoder.add_sample(0, 3, true, &mut r); let db = TestDb::new(RealClocks {}); let row = db.insert_recording_from_encoder(r); let segment = Segment::new(&db.db.lock(), &row, 0 .. 2).unwrap(); diff --git a/db/testutil.rs b/db/testutil.rs index 12c70bc..a13354d 100644 --- a/db/testutil.rs +++ b/db/testutil.rs @@ -157,6 +157,7 @@ impl TestDb { let (id, _) = db.add_recording(TEST_STREAM_ID, db::RecordingToInsert { start: recording::Time(1430006400i64 * TIME_UNITS_PER_SEC), video_sample_entry_id, + wall_duration_90k: r.media_duration_90k, ..r }).unwrap(); db.mark_synced(id).unwrap(); diff --git a/db/writer.rs b/db/writer.rs index 1d28a02..9afacdf 100644 --- a/db/writer.rs +++ b/db/writer.rs @@ -35,7 +35,7 @@ use base::clock::{self, Clocks}; use crate::db::{self, CompositeId}; use crate::dir; -use crate::recording; +use crate::recording::{self, MAX_RECORDING_WALL_DURATION}; use failure::{Error, bail, format_err}; use fnv::FnvHashMap; use parking_lot::Mutex; @@ -240,8 +240,8 @@ fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32, impl SyncerChannel { /// Asynchronously syncs the given writer, closes it, records it into the database, and /// starts rotation. - fn async_save_recording(&self, id: CompositeId, duration: recording::Duration, f: F) { - self.0.send(SyncerCommand::AsyncSaveRecording(id, duration, f)).unwrap(); + fn async_save_recording(&self, id: CompositeId, wall_duration: recording::Duration, f: F) { + self.0.send(SyncerCommand::AsyncSaveRecording(id, wall_duration, f)).unwrap(); } /// For testing: flushes the syncer, waiting for all currently-queued commands to complete, @@ -400,7 +400,7 @@ impl Syncer { // Have a command; handle it. match cmd { - SyncerCommand::AsyncSaveRecording(id, dur, f) => self.save(id, dur, f), + SyncerCommand::AsyncSaveRecording(id, wall_dur, f) => self.save(id, wall_dur, f), SyncerCommand::DatabaseFlushed => self.collect_garbage(), SyncerCommand::Flush(flush) => { // The sender is waiting for the supplied writer to be dropped. If there's no @@ -448,7 +448,7 @@ impl Syncer { /// so that there can be only one dir sync and database transaction per save. /// Internal helper for `save`. This is separated out so that the question-mark operator /// can be used in the many error paths. - fn save(&mut self, id: CompositeId, duration: recording::Duration, f: D::File) { + fn save(&mut self, id: CompositeId, wall_duration: recording::Duration, f: D::File) { trace!("Processing save for {}", id); let stream_id = id.stream(); @@ -462,11 +462,11 @@ impl Syncer { let c = db.cameras_by_id().get(&s.camera_id).unwrap(); // Schedule a flush. - let how_soon = Duration::seconds(s.flush_if_sec) - duration.to_tm_duration(); + let how_soon = Duration::seconds(s.flush_if_sec) - wall_duration.to_tm_duration(); let now = self.db.clocks().monotonic(); let when = now + how_soon; let reason = format!("{} sec after start of {} {}-{} recording {}", - s.flush_if_sec, duration, c.short_name, s.type_.as_str(), id); + s.flush_if_sec, wall_duration, c.short_name, s.type_.as_str(), id); trace!("scheduling flush in {} because {}", how_soon, &reason); self.planned_flushes.push(PlannedFlush { when, @@ -571,8 +571,6 @@ struct InnerWriter { /// are discovered. See design/time.md for details. local_start: recording::Time, - adjuster: ClockAdjuster, - /// A sample which has been written to disk but not added to `index`. Index writes are one /// sample behind disk writes because the duration of a sample is the difference between its /// pts and the next sample's pts. A sample is flushed when the next sample is written, when @@ -583,55 +581,6 @@ struct InnerWriter { unindexed_sample: Option, } -/// Adjusts durations given by the camera to correct its clock frequency error. -#[derive(Copy, Clone, Debug)] -struct ClockAdjuster { - /// Every `every_minus_1 + 1` units, add `-ndir`. - /// Note i32::max_value() disables adjustment. - every_minus_1: i32, - - /// Should be 1 or -1 (unless disabled). - ndir: i32, - - /// Keeps accumulated difference from previous values. - cur: i32, -} - -impl ClockAdjuster { - fn new(local_time_delta: Option) -> Self { - // Pick an adjustment rate to correct local_time_delta over the next minute (the - // desired duration of a single recording). Cap the rate at 500 ppm (which corrects - // 2,700/90,000ths of a second over a minute) to prevent noticeably speeding up or slowing - // down playback. - let (every_minus_1, ndir) = match local_time_delta { - Some(d) if d <= -2700 => (1999, 1), - Some(d) if d >= 2700 => (1999, -1), - Some(d) if d < -60 => ((60 * 90000) / -i32::try_from(d).unwrap() - 1, 1), - Some(d) if d > 60 => ((60 * 90000) / i32::try_from(d).unwrap() - 1, -1), - _ => (i32::max_value(), 0), - }; - ClockAdjuster{ - every_minus_1, - ndir, - cur: 0, - } - } - - fn adjust(&mut self, mut val: i32) -> i32 { - self.cur += val; - - // The "val > self.ndir" here is so that if decreasing durations (ndir == 1), we don't - // cause a duration of 1 to become a duration of 0. It has no effect when increasing - // durations. (There's no danger of a duration of 0 becoming a duration of 1; cur wouldn't - // be newly > self.every_minus_1.) - while self.cur > self.every_minus_1 && val > self.ndir { - val -= self.ndir; - self.cur -= self.every_minus_1 + 1; - } - val - } -} - #[derive(Copy, Clone)] struct UnindexedSample { local_time: recording::Time, @@ -644,7 +593,6 @@ struct UnindexedSample { #[derive(Copy, Clone)] struct PreviousWriter { end: recording::Time, - local_time_delta: recording::Duration, run_offset: i32, } @@ -689,7 +637,6 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { completed_live_segment_off_90k: 0, hasher: blake3::Hasher::new(), local_start: recording::Time(i64::max_value()), - adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)), unindexed_sample: None, }); Ok(()) @@ -724,7 +671,6 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { bail!("pts not monotonically increasing; got {} then {}", unindexed.pts_90k, pts_90k); } - let duration = w.adjuster.adjust(duration); let d = match w.add_sample(duration, unindexed.len, unindexed.is_key, unindexed.local_time) { Ok(d) => d, @@ -775,29 +721,51 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { } } +fn clamp(v: i64, min: i64, max: i64) -> i64 { + std::cmp::min(std::cmp::max(v, min), max) +} + impl InnerWriter { /// Returns the total duration of the `RecordingToInsert` (needed for live view path). fn add_sample(&mut self, duration_90k: i32, bytes: i32, is_key: bool, pkt_local_time: recording::Time) -> Result { let mut l = self.r.lock(); - self.e.add_sample(duration_90k, bytes, is_key, &mut l)?; - let new = pkt_local_time - recording::Duration(i64::from(l.media_duration_90k)); - self.local_start = cmp::min(self.local_start, new); - if l.run_offset == 0 { // start time isn't anchored to previous recording's end; adjust. - l.start = self.local_start; + + // design/time.md explains these time manipulations in detail. + let media_duration_90k = l.media_duration_90k + duration_90k; + let local_start = + cmp::min(self.local_start, + pkt_local_time - recording::Duration(i64::from(media_duration_90k))); + let limit = i64::from(media_duration_90k / 2000); // 1/2000th, aka 500 ppm. + let start = if l.run_offset == 0 { + // Start time isn't anchored to previous recording's end; adjust. + local_start + } else { + l.start + }; + let wall_duration_90k = + media_duration_90k + + i32::try_from(clamp(local_start.0 - start.0, -limit, limit)).unwrap(); + if wall_duration_90k > i32::try_from(MAX_RECORDING_WALL_DURATION).unwrap() { + bail!("Duration {} exceeds maximum {}", wall_duration_90k, + MAX_RECORDING_WALL_DURATION); } - Ok(l.media_duration_90k) + l.wall_duration_90k = wall_duration_90k; + l.start = start; + self.local_start = local_start; + self.e.add_sample(duration_90k, bytes, is_key, &mut l); + Ok(media_duration_90k) } fn close(mut self, channel: &SyncerChannel, next_pts: Option, db: &db::Database, stream_id: i32) -> Result { let unindexed = self.unindexed_sample.take().expect("should always be an unindexed sample"); let (last_sample_duration, flags) = match next_pts { - None => (self.adjuster.adjust(0), db::RecordingFlags::TrailingZero as i32), - Some(p) => (self.adjuster.adjust(i32::try_from(p - unindexed.pts_90k)?), 0), + None => (0, db::RecordingFlags::TrailingZero as i32), + Some(p) => (i32::try_from(p - unindexed.pts_90k)?, 0), }; let blake3 = self.hasher.finalize(); - let (local_time_delta, run_offset, end); + let (run_offset, end); let d = self.add_sample(last_sample_duration, unindexed.len, unindexed.is_key, unindexed.local_time)?; @@ -806,22 +774,20 @@ impl InnerWriter { recording: self.id.recording(), off_90k: self.completed_live_segment_off_90k .. d, }).unwrap(); - let total_duration; + let wall_duration; { let mut l = self.r.lock(); l.flags = flags; - local_time_delta = self.local_start - l.start; - l.local_time_delta = local_time_delta; + l.local_time_delta = self.local_start - l.start; l.sample_file_blake3 = Some(blake3.as_bytes().clone()); - total_duration = recording::Duration(i64::from(l.wall_duration_90k)); + wall_duration = recording::Duration(i64::from(l.wall_duration_90k)); run_offset = l.run_offset; - end = l.start + total_duration; + end = l.start + wall_duration; } drop(self.r); - channel.async_save_recording(self.id, total_duration, self.f); + channel.async_save_recording(self.id, wall_duration, self.f); Ok(PreviousWriter { end, - local_time_delta, run_offset, }) } @@ -853,7 +819,7 @@ mod tests { use std::io; use std::sync::Arc; use std::sync::mpsc; - use super::{ClockAdjuster, Writer}; + use super::Writer; use crate::testutil; #[derive(Clone)] @@ -1322,63 +1288,4 @@ mod tests { Some(std::sync::mpsc::TryRecvError::Disconnected)); assert!(h.syncer.planned_flushes.is_empty()); } - - #[test] - fn adjust() { - testutil::init(); - - // no-ops. - for v in &[None, Some(0), Some(-10), Some(10)] { - let mut a = ClockAdjuster::new(*v); - for _ in 0..1800 { - assert_eq!(3000, a.adjust(3000), "v={:?}", *v); - } - } - - // typical, 100 ppm adjustment. - let mut a = ClockAdjuster::new(Some(-540)); - let mut total = 0; - for _ in 0..1800 { - let new = a.adjust(3000); - assert!(new == 2999 || new == 3000); - total += new; - } - let expected = 1800*3000 - 540; - assert!(total == expected || total == expected + 1, "total={} vs expected={}", - total, expected); - - a = ClockAdjuster::new(Some(540)); - let mut total = 0; - for _ in 0..1800 { - let new = a.adjust(3000); - assert!(new == 3000 || new == 3001); - total += new; - } - let expected = 1800*3000 + 540; - assert!(total == expected || total == expected + 1, "total={} vs expected={}", - total, expected); - - // capped at 500 ppm (change of 2,700/90,000ths over 1 minute). - a = ClockAdjuster::new(Some(-1_000_000)); - total = 0; - for _ in 0..1800 { - let new = a.adjust(3000); - assert!(new == 2998 || new == 2999, "new={}", new); - total += new; - } - let expected = 1800*3000 - 2700; - assert!(total == expected || total == expected + 1, "total={} vs expected={}", - total, expected); - - a = ClockAdjuster::new(Some(1_000_000)); - total = 0; - for _ in 0..1800 { - let new = a.adjust(3000); - assert!(new == 3001 || new == 3002, "new={}", new); - total += new; - } - let expected = 1800*3000 + 2700; - assert!(total == expected || total == expected + 1, "total={} vs expected={}", - total, expected); - } } diff --git a/design/time.md b/design/time.md index e4dbd17..3afa79a 100644 --- a/design/time.md +++ b/design/time.md @@ -1,6 +1,6 @@ # Moonfire NVR Time Handling -Status: **in flux**. The approach below works well for video, but audio frames' +Status: **current**. The approach below works well for video, but audio frames' durations can't be adjusted as easily. As part of implementing audio support, the implementation is changing to instead decouple "wall time" and "media time", as described in @@ -33,9 +33,9 @@ from other sources: purpose of determining chronology, to the extent those persons use accurate clocks. -Two segments of video recorded from the same stream of the same camera should -not overlap. This would make it impossible for a user interface to present a -simple timeline for accessing all recorded video. +Two recordings from the same stream should not overlap. This would make it +impossible for a user interface to present a simple timeline for accessing all +recorded video. Durations should be useful over short timescales: @@ -103,20 +103,22 @@ information: interface to determine if the clock is currently synchronized. This document's author owns several cameras with clocks that run roughly 20 ppm fast (2 seconds per day) and are adjusted via steps. - * the RTP timestamps from each of a camera's streams. As described in [RFC - 3550 section 5.1](https://tools.ietf.org/html/rfc3550#section-5.1), these - are monotonically increasing with an unspecified reference point. They - can't be directly compared to other cameras or other streams from the - same camera. Emperically, budget cameras don't appear to do any frequency - correction on these timestamps. - * in some cases, RTCP sender reports, as described in [RFC 3550 section - 6.4](https://tools.ietf.org/html/rfc3550#section-6.4). These correlate - RTP timestamps with the camera's real time clock. However, these are only - sent periodically, not necessarily at the beginning of the session. - Some cameras omit them entirely depending on firmware version, as noted - in [this forum post](http://www.cctvforum.com/viewtopic.php). Additionally, - Moonfire NVR currently uses ffmpeg's libavformat for RTSP protocol - handling; this library exposes these reports in a limited fashion. + * the RTP timestamps from each of a camera's streams. As described in + [RFC 3550 section 5.1](https://tools.ietf.org/html/rfc3550#section-5.1), + these are monotonically increasing with an unspecified reference point. + They can't be directly compared to other cameras or other streams from + the same camera. Emperically, budget cameras don't appear to do any + frequency correction on these timestamps. + * in some cases, RTCP sender reports, as described in + [RFC 3550 section 6.4](https://tools.ietf.org/html/rfc3550#section-6.4). + These correlate RTP timestamps with the camera's real time clock. + However, these are only sent periodically, not necessarily at the + beginning of the session. Some cameras omit them entirely depending on + firmware version, as noted in + [this forum post](https://www.cctvforum.com/topic/40914-video-sync-with-hikvision-ipcams-tech-query-about-rtcp/). + Additionally, Moonfire NVR currently uses ffmpeg's libavformat for RTSP + protocol handling; this library exposes these reports in a limited + fashion. The camera records video frames as in the diagram below: @@ -134,17 +136,22 @@ from the timestamp of the following frame. This means that if a stream is terminated, the final frame has unknown duration. As described in [schema.md](schema.md), Moonfire NVR saves RTSP video streams -into roughly one-minute "recordings", with a fixed rotation offset after the +into roughly one-minute *recordings,* with a fixed rotation offset after the minute in the NVR's wall time. +See the [glossary](glossary.md) for additional terminology. Glossary terms +are italicized on first use. + ## Overview -Moonfire NVR will use the RTP timestamps to calculate video frames' durations. -For the first segment of video, it will trust these completely. It will use -them and the NVR's wall clock time to establish the start time of the -recording. For following segments, it will slightly adjust durations to -compensate for difference between the frequencies of the camera and NVR -clock, trusting the latter to be accurate. +Moonfire NVR will use the RTP timestamps to calculate video frames' durations, +relying on the camera's clock for the *media duration* of frames and +recordings. In the first recording in a *run*, it will use these durations +and the NVR's wall clock time to establish the start time of the run. In +subsequent recordings of the run, it will calculate a *wall duration* which +is up to 500 ppm different from the media duration to gently correct the +camera's clock toward the NVR's clock, trusting the latter to be more +accurate. ## Detailed design @@ -156,49 +163,55 @@ _local frame time_. Assuming the local clock is accurate, this time is an upper bound on when the frame was generated. The difference is the sum of the following items: - * H.264 encoding - * buffering on the camera (particularly when starting the stream—some - cameras apparently send frames that were captured before the RTSP session - was established) - * network transmission time +* H.264 encoding +* buffering on the camera (particularly when starting the stream—some + cameras apparently send frames that were captured before the RTSP session + was established) +* network transmission time -These values may produce some jitter, so the local frame time is not directly -used to calculate frame durations. Instead, they are primarily taken from -differences in RTP timestamps from one frame to the next. During the first -segment of video, these RTP timestamp differences are used directly, without -correcting for incorrect camera frequency. At the design limit of 500 ppm -camera frequency error, and an upper bound of two minutes of recording for the -initial segment (explained below), this causes a maximum of 60 milliseconds of -error. +The _local start time_ of a recording is calculated when ending it. It's +defined as the minimum for all frames of the local frame time minus the +duration of all previous frames. If there are many frames, this means neither +initial buffering nor spikes of delay in H.264 encoding or network +transmission cause the local start time to become inaccurate. The least +delayed frame wins. -The _local start time_ of a segment is calculated when ending it. It's defined -as the minimum for all frames of the local frame time minus the duration of -all previous frames. If there are many frames, this means neither initial -buffering nor spikes of delay in H.264 encoding or network transmission cause -the local start time to become inaccurate. The least delayed frame wins. +The start time of a recording is calculated as follows: -The first segment either ends with the RTSP session (due to error/shutdown) or -on rotation. In the former case, there may not be many samples to use in -calculating the local start time; accuracy may suffer but the system degrades -gracefully. Rotation doesn't happen until the second time the rotation offset -is passed, so rotation happens after 1–2 minutes rather than 0–1 minutes to -maximize accuracy. +* For the first recording in a *run*: the start time is the local start + time. +* For subsequent recordings: the start time is the end time of the previous + recording. -The _start time_ of the first segment is its local start time. The start time -of following segments is the end time of the previous segment. +The *media duration* of video and audio samples is simply taken from the RTSP +timestamps. For video, this is superior to the local frame time because the +latter is vulnerable to jitter. For audio, this is the only realistic option; +it's infeasible to adjust the duration of audio samples. -The duration of following segments is adjusted to compensate for camera -frequency error, assuming the NVR clock's frequency is more trustworthy. This -is done as follows. The _local duration_ of segment _i_ is calculated as the -local start time of segment _i+1_ minus the local start time of segment _i_. -The _cumulative error_ as of segment _i_ is defined as the local duration of -all previous segments minus the duration of all previous segments. The -duration of segment _i_ should be adjusted by up to 500 ppm to eliminate -cumulative error. (For a one-minute segment, this is 0.3 ms, or 27 90kHz units.) -This correction should be spread out across the segment to minimize jitter. +The media duration of recordings and runs are simply taken from the media +durations of the samples they contain. -Each segment's local start time is also stored in the database as a delta to -the segment's start time. These stored values aren't for normal system +Over a long run, the start time plus the media duration may drift +significantly from the actual time samples were recorded because of +inaccuracies in the camera's clock. Therefore, Moonfire NVR also calculates +a *wall duration* of recordings which more closely matches the NVR's clock. +It is calculated as follows: + +* For the first recording in a run: the wall duration is the media duration. + At the design limit of 500 ppm camera frequency error and an upper + bound of two minutes duration for the initial recording, this causes + a maximum of 60 milliseconds of error. +* For subsequent recordings, the wall duration is the media duration + adjusted by up to 500 ppm to reduce differences between the "local start + time" and the start time, as follows: + ``` + limit = media_duration / 2000 + wall_duration = media_duration + clamp(local_start - start, -limit, +limit) + ``` + Note that for a 1-minute recording, 500 ppm is 0.3 ms, or 27 90kHz units. + +Each recording's local start time is also stored in the database as a delta to +the recording's start time. These stored values aren't used for normal system operation but may be handy in understanding and correcting errors. ## Caveats @@ -212,8 +225,8 @@ could be used in some circumstances: the _camera start time_. The first RTCP sender report could be used to correlate a RTP timestamp with the camera's wall clock, and thus calculate the camera's time as of the first frame. -The _start time_ of the first segment could be either its local start time or -its camera start time, determined via the following rules: +The _start time_ of the first recording could be either its local start time +or its camera start time, determined via the following rules: 1. if there is no camera start time (due to the lack of a RTCP sender report), the local start time wins by default. @@ -329,3 +342,11 @@ attempt to correct the time into TAI with a leap second table. This behavior would work well on a system with the expected configuration and produce surprising results on other systems. It's unfortunate that there's no standard way to determine if a system is using a leap smear and with what policy. + +## Alternatives considered + +Schema versions prior to 6 used a simpler database schema which didn't +distinguish between "wall" and "media" time. Instead, the durations of video +samples were adjusted for clock correction. This approach worked well for +video. It couldn't be extended to audio without decoding and re-encoding to +adjust same lengths and pitch. diff --git a/src/mp4.rs b/src/mp4.rs index bee6ecf..99ec142 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -2056,7 +2056,7 @@ mod tests { for i in 1..6 { let duration_90k = 2 * i; let bytes = 3 * i; - encoder.add_sample(duration_90k, bytes, true, &mut r).unwrap(); + encoder.add_sample(duration_90k, bytes, true, &mut r); } // Time range [2, 2+4+6+8) means the 2nd, 3rd, and 4th samples should be included. @@ -2110,7 +2110,7 @@ mod tests { for i in 1..6 { let duration_90k = 2 * i; let bytes = 3 * i; - encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r).unwrap(); + encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r); } // Time range [2+4+6, 2+4+6+8) means the 4th sample should be included. @@ -2179,14 +2179,14 @@ mod tests { let mut encoders = Vec::new(); let mut r = db::RecordingToInsert::default(); let mut encoder = recording::SampleIndexEncoder::new(); - encoder.add_sample(1, 1, true, &mut r).unwrap(); - encoder.add_sample(2, 2, false, &mut r).unwrap(); - encoder.add_sample(3, 3, true, &mut r).unwrap(); + encoder.add_sample(1, 1, true, &mut r); + encoder.add_sample(2, 2, false, &mut r); + encoder.add_sample(3, 3, true, &mut r); encoders.push(r); let mut r = db::RecordingToInsert::default(); let mut encoder = recording::SampleIndexEncoder::new(); - encoder.add_sample(4, 4, true, &mut r).unwrap(); - encoder.add_sample(5, 5, false, &mut r).unwrap(); + encoder.add_sample(4, 4, true, &mut r); + encoder.add_sample(5, 5, false, &mut r); encoders.push(r); // This should include samples 3 and 4 only, both sync frames. @@ -2216,12 +2216,12 @@ mod tests { let mut encoders = Vec::new(); let mut r = db::RecordingToInsert::default(); let mut encoder = recording::SampleIndexEncoder::new(); - encoder.add_sample(2, 1, true, &mut r).unwrap(); - encoder.add_sample(3, 2, false, &mut r).unwrap(); + encoder.add_sample(2, 1, true, &mut r); + encoder.add_sample(3, 2, false, &mut r); encoders.push(r); let mut r = db::RecordingToInsert::default(); let mut encoder = recording::SampleIndexEncoder::new(); - encoder.add_sample(0, 3, true, &mut r).unwrap(); + encoder.add_sample(0, 3, true, &mut r); encoders.push(r); // Multi-segment recording with an edit list, encoding with a zero-duration recording. @@ -2244,7 +2244,7 @@ mod tests { for i in 1..6 { let duration_90k = 2 * i; let bytes = 3 * i; - encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r).unwrap(); + encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r); } // Time range [2+4+6, 2+4+6+8+1) means the 4th sample and part of the 5th are included. diff --git a/src/streamer.rs b/src/streamer.rs index 2a47126..369a2c8 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -217,6 +217,7 @@ mod tests { use log::trace; use parking_lot::Mutex; use std::cmp; + use std::convert::TryFrom; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use time; @@ -256,7 +257,8 @@ mod tests { let mut pkt = self.inner.get_next()?; - // Advance clock to the end of this frame. + // Emulate the behavior of real cameras that send some pre-buffered frames immediately + // on connect. After that, advance clock to the end of this frame. // Avoid accumulating conversion error by tracking the total amount to sleep and how // much we've already slept, rather than considering each frame in isolation. { @@ -278,8 +280,9 @@ mod tests { pkt.set_dts(old_dts + self.ts_offset); // In a real rtsp stream, the duration of a packet is not known until the - // next packet. ffmpeg's duration is an unreliable estimate. - pkt.set_duration(recording::TIME_UNITS_PER_SEC as i32); + // next packet. ffmpeg's duration is an unreliable estimate. Set it to something + // ridiculous. + pkt.set_duration(i32::try_from(3600 * recording::TIME_UNITS_PER_SEC).unwrap()); } Ok(pkt) @@ -350,7 +353,7 @@ mod tests { let stream = stream::FFMPEG.open(stream::Source::File("src/testdata/clip.mp4")).unwrap(); let mut stream = ProxyingStream::new(&clocks, time::Duration::seconds(2), stream); - stream.ts_offset = 180000; // starting pts of the input should be irrelevant + stream.ts_offset = 123456; // starting pts of the input should be irrelevant stream.ts_offset_pkts_left = u32::max_value(); stream.pkts_left = u32::max_value(); let opener = MockOpener{