new logic for calculating a recording's start time

This is as described in design/time.md. Other aspects of that design
(including using the monotonic clock and adjusting the durations to compensate
for camera clock frequency error) are not implemented yet. No new tests yet.
Just trying to get some flight miles on these ideas as soon as I can.
This commit is contained in:
Scott Lamb 2016-12-28 20:56:08 -08:00
parent 063708c9ab
commit d001e4893c
3 changed files with 95 additions and 46 deletions

View File

@ -37,6 +37,7 @@ use error::Error;
use libc; use libc;
use recording; use recording;
use openssl::crypto::hash; use openssl::crypto::hash;
use std::cmp;
use std::ffi; use std::ffi;
use std::fs; use std::fs;
use std::io::{self, Write}; use std::io::{self, Write};
@ -120,8 +121,12 @@ impl SampleFileDir {
/// Note this doesn't wait for previous rotation to complete; it's assumed the sample file /// Note this doesn't wait for previous rotation to complete; it's assumed the sample file
/// directory has sufficient space for a couple recordings per camera in addition to the /// directory has sufficient space for a couple recordings per camera in addition to the
/// cameras' total `retain_bytes`. /// cameras' total `retain_bytes`.
pub fn create_writer<'a>(&self, channel: &'a SyncerChannel, start: recording::Time, ///
local_start: recording::Time, run_offset: i32, camera_id: i32, /// The new recording will start at `prev_end` if specified; this should be the end time
/// of the previous segment of the same run. `run_offset` should be the offset of this segment
/// within the run; 0 iff `prev_end` is None.
pub fn create_writer<'a>(&self, channel: &'a SyncerChannel, prev_end: Option<recording::Time>,
run_offset: i32, camera_id: i32,
video_sample_entry_id: i32) -> Result<Writer<'a>, Error> { video_sample_entry_id: i32) -> Result<Writer<'a>, Error> {
// Grab the next uuid. Typically one is cached—a sync has usually completed since the last // Grab the next uuid. Typically one is cached—a sync has usually completed since the last
// writer was created, and syncs ensure `next_uuid` is filled while performing their // writer was created, and syncs ensure `next_uuid` is filled while performing their
@ -145,8 +150,7 @@ impl SampleFileDir {
return Err(e.into()); return Err(e.into());
}, },
}; };
Writer::open(f, uuid, start, local_start, run_offset, camera_id, video_sample_entry_id, Writer::open(f, uuid, prev_end, run_offset, camera_id, video_sample_entry_id, channel)
channel)
} }
/// Opens a sample file within this directory with the given flags and (if creating) mode. /// Opens a sample file within this directory with the given flags and (if creating) mode.
@ -425,8 +429,16 @@ struct InnerWriter<'a> {
uuid: Uuid, uuid: Uuid,
corrupt: bool, corrupt: bool,
hasher: hash::Hasher, hasher: hash::Hasher,
start_time: recording::Time,
local_time: recording::Time, /// The end time of the previous segment in this run, if any.
prev_end: Option<recording::Time>,
/// The start time of this segment, based solely on examining the local clock after frames in
/// this segment were received. This is initially `None`, filled in on the second packet in
/// the segment, and refined as more packets are received. See `design/time.md` for more
/// information. This will be used as the official start time iff `prev_end` is None.
local_start: Option<recording::Time>,
camera_id: i32, camera_id: i32,
video_sample_entry_id: i32, video_sample_entry_id: i32,
run_offset: i32, run_offset: i32,
@ -440,6 +452,7 @@ struct InnerWriter<'a> {
} }
struct UnflushedSample { struct UnflushedSample {
local_time: recording::Time,
pts_90k: i64, pts_90k: i64,
len: i32, len: i32,
is_key: bool, is_key: bool,
@ -447,7 +460,7 @@ struct UnflushedSample {
impl<'a> Writer<'a> { impl<'a> Writer<'a> {
/// Opens the writer; for use by `SampleFileDir` (which should supply `f`). /// Opens the writer; for use by `SampleFileDir` (which should supply `f`).
fn open(f: fs::File, uuid: Uuid, start_time: recording::Time, local_time: recording::Time, fn open(f: fs::File, uuid: Uuid, prev_end: Option<recording::Time>,
run_offset: i32, camera_id: i32, video_sample_entry_id: i32, run_offset: i32, camera_id: i32, video_sample_entry_id: i32,
syncer_channel: &'a SyncerChannel) -> Result<Self, Error> { syncer_channel: &'a SyncerChannel) -> Result<Self, Error> {
Ok(Writer(Some(InnerWriter{ Ok(Writer(Some(InnerWriter{
@ -457,8 +470,8 @@ impl<'a> Writer<'a> {
uuid: uuid, uuid: uuid,
corrupt: false, corrupt: false,
hasher: hash::Hasher::new(hash::Type::SHA1)?, hasher: hash::Hasher::new(hash::Type::SHA1)?,
start_time: start_time, prev_end: prev_end,
local_time: local_time, local_start: None,
camera_id: camera_id, camera_id: camera_id,
video_sample_entry_id: video_sample_entry_id, video_sample_entry_id: video_sample_entry_id,
run_offset: run_offset, run_offset: run_offset,
@ -466,11 +479,15 @@ impl<'a> Writer<'a> {
}))) })))
} }
pub fn write(&mut self, pkt: &[u8], pts_90k: i64, is_key: bool) -> Result<(), Error> { /// Writes a new frame to this segment.
/// `local_time` should be the local clock's time as of when this packet was received.
pub fn write(&mut self, pkt: &[u8], local_time: recording::Time, pts_90k: i64,
is_key: bool) -> Result<(), Error> {
let w = self.0.as_mut().unwrap(); let w = self.0.as_mut().unwrap();
if let Some(unflushed) = w.unflushed_sample.take() { if let Some(unflushed) = w.unflushed_sample.take() {
let duration = (pts_90k - unflushed.pts_90k) as i32; let duration = (pts_90k - unflushed.pts_90k) as i32;
w.index.add_sample(duration, unflushed.len, unflushed.is_key); w.index.add_sample(duration, unflushed.len, unflushed.is_key);
w.local_start = Some(w.extend_local_start(unflushed.local_time));
} }
let mut remaining = pkt; let mut remaining = pkt;
while !remaining.is_empty() { while !remaining.is_empty() {
@ -491,6 +508,7 @@ impl<'a> Writer<'a> {
remaining = &remaining[written..]; remaining = &remaining[written..];
} }
w.unflushed_sample = Some(UnflushedSample{ w.unflushed_sample = Some(UnflushedSample{
local_time: local_time,
pts_90k: pts_90k, pts_90k: pts_90k,
len: pkt.len() as i32, len: pkt.len() as i32,
is_key: is_key}); is_key: is_key});
@ -507,26 +525,36 @@ impl<'a> Writer<'a> {
} }
impl<'a> InnerWriter<'a> { impl<'a> InnerWriter<'a> {
fn extend_local_start(&mut self, pkt_local_time: recording::Time) -> recording::Time {
let new = pkt_local_time - recording::Duration(self.index.total_duration_90k as i64);
match self.local_start {
None => new,
Some(old) => cmp::min(old, new),
}
}
fn close(mut self, next_pts: Option<i64>) -> Result<recording::Time, Error> { fn close(mut self, next_pts: Option<i64>) -> Result<recording::Time, Error> {
if self.corrupt { if self.corrupt {
self.syncer_channel.async_abandon_recording(self.uuid); self.syncer_channel.async_abandon_recording(self.uuid);
return Err(Error::new(format!("recording {} is corrupt", self.uuid))); return Err(Error::new(format!("recording {} is corrupt", self.uuid)));
} }
if let Some(unflushed) = self.unflushed_sample.take() { let unflushed =
self.unflushed_sample.take().ok_or_else(|| Error::new("no packets!".to_owned()))?;
let duration = match next_pts { let duration = match next_pts {
None => 0, None => 0,
Some(p) => (p - unflushed.pts_90k) as i32, Some(p) => (p - unflushed.pts_90k) as i32,
}; };
self.index.add_sample(duration, unflushed.len, unflushed.is_key); self.index.add_sample(duration, unflushed.len, unflushed.is_key);
} let local_start = self.extend_local_start(unflushed.local_time);
let mut sha1_bytes = [0u8; 20]; let mut sha1_bytes = [0u8; 20];
sha1_bytes.copy_from_slice(&self.hasher.finish()?[..]); sha1_bytes.copy_from_slice(&self.hasher.finish()?[..]);
let end = self.start_time + recording::Duration(self.index.total_duration_90k as i64); let start_time = self.prev_end.unwrap_or(local_start);
let end = start_time + recording::Duration(self.index.total_duration_90k as i64);
let recording = db::RecordingToInsert{ let recording = db::RecordingToInsert{
camera_id: self.camera_id, camera_id: self.camera_id,
sample_file_bytes: self.index.sample_file_bytes, sample_file_bytes: self.index.sample_file_bytes,
time: self.start_time .. end, time: start_time .. end,
local_time: self.local_time, local_time: local_start,
video_samples: self.index.video_samples, video_samples: self.index.video_samples,
video_sync_samples: self.index.video_sync_samples, video_sync_samples: self.index.video_sync_samples,
video_sample_entry_id: self.video_sample_entry_id, video_sample_entry_id: self.video_sample_entry_id,

View File

@ -1408,7 +1408,7 @@ mod tests {
let extra_data = input.get_extra_data().unwrap(); let extra_data = input.get_extra_data().unwrap();
let video_sample_entry_id = db.db.lock().insert_video_sample_entry( let video_sample_entry_id = db.db.lock().insert_video_sample_entry(
extra_data.width, extra_data.height, &extra_data.sample_entry).unwrap(); extra_data.width, extra_data.height, &extra_data.sample_entry).unwrap();
let mut output = db.dir.create_writer(&db.syncer_channel, START_TIME, START_TIME, 0, let mut output = db.dir.create_writer(&db.syncer_channel, None, 0,
TEST_CAMERA_ID, video_sample_entry_id).unwrap(); TEST_CAMERA_ID, video_sample_entry_id).unwrap();
// end_pts is the pts of the end of the most recent frame (start + duration). // end_pts is the pts of the end of the most recent frame (start + duration).
@ -1417,15 +1417,20 @@ mod tests {
// To write the final packet of this sample .mp4 with a full duration, we need to fake a // To write the final packet of this sample .mp4 with a full duration, we need to fake a
// next packet's pts from the ffmpeg-supplied duration. // next packet's pts from the ffmpeg-supplied duration.
let mut end_pts = None; let mut end_pts = None;
let mut frame_time = START_TIME;
loop { loop {
let pkt = match input.get_next() { let pkt = match input.get_next() {
Ok(p) => p, Ok(p) => p,
Err(ffmpeg::Error::Eof) => { break; }, Err(ffmpeg::Error::Eof) => { break; },
Err(e) => { panic!("unexpected input error: {}", e); }, Err(e) => { panic!("unexpected input error: {}", e); },
}; };
output.write(pkt.data().expect("packet without data"), pkt.pts().unwrap(), let pts = pkt.pts().unwrap();
frame_time += recording::Duration(pkt.duration());
output.write(pkt.data().expect("packet without data"), frame_time, pts,
pkt.is_key()).unwrap(); pkt.is_key()).unwrap();
end_pts = Some(pkt.pts().unwrap() + pkt.duration()); end_pts = Some(pts + pkt.duration());
} }
output.close(end_pts).unwrap(); output.close(end_pts).unwrap();
db.syncer_channel.flush(); db.syncer_channel.flush();

View File

@ -68,6 +68,13 @@ pub struct Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
redacted_url: String, redacted_url: String,
} }
struct WriterState<'a> {
writer: dir::Writer<'a>,
/// Seconds since epoch at which to next rotate.
rotate: i64,
}
impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
pub fn new<'b>(env: &Environment<'a, 'b, C, S>, syncer_channel: dir::SyncerChannel, pub fn new<'b>(env: &Environment<'a, 'b, C, S>, syncer_channel: dir::SyncerChannel,
camera_id: i32, c: &Camera, rotate_offset_sec: i64, camera_id: i32, c: &Camera, rotate_offset_sec: i64,
@ -113,10 +120,9 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
&extra_data.sample_entry)?; &extra_data.sample_entry)?;
debug!("{}: video_sample_entry_id={}", self.short_name, video_sample_entry_id); debug!("{}: video_sample_entry_id={}", self.short_name, video_sample_entry_id);
let mut seen_key_frame = false; let mut seen_key_frame = false;
let mut rotate = None; let mut state: Option<WriterState> = None;
let mut writer: Option<dir::Writer> = None;
let mut transformed = Vec::new(); let mut transformed = Vec::new();
let mut next_start = None; let mut prev_end = None;
let mut run_index = -1; let mut run_index = -1;
while !self.shutdown.load(Ordering::SeqCst) { while !self.shutdown.load(Ordering::SeqCst) {
let pkt = stream.get_next()?; let pkt = stream.get_next()?;
@ -128,27 +134,37 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
seen_key_frame = true; seen_key_frame = true;
} }
let frame_realtime = self.clock.get_time(); let frame_realtime = self.clock.get_time();
if let Some(r) = rotate { let local_time = recording::Time::new(frame_realtime);
if frame_realtime.sec > r && pkt.is_key() { state = if let Some(s) = state {
let w = writer.take().expect("rotate set implies writer is set"); if frame_realtime.sec > s.rotate && pkt.is_key() {
trace!("{}: write on normal rotation", self.short_name); trace!("{}: write on normal rotation", self.short_name);
next_start = Some(w.close(Some(pts))?); prev_end = Some(s.writer.close(Some(pts))?);
None
} else {
Some(s)
} }
}; } else { None };
let mut w = match writer { let mut s = match state {
Some(w) => w, Some(s) => s,
None => { None => {
let r = frame_realtime.sec - // Set rotate time to not the next rotate offset, but the one after.
(frame_realtime.sec % self.rotate_interval_sec) + // The first recording interval is longer than usual rather than shorter
self.rotate_offset_sec; // than usual so that there's plenty of frame times to use when calculating the
rotate = Some( // start time.
if r <= frame_realtime.sec { r + self.rotate_interval_sec } else { r }); let sec = frame_realtime.sec;
let local_realtime = recording::Time::new(frame_realtime); let r = sec - (sec % self.rotate_interval_sec) + self.rotate_offset_sec;
let r = r + if r <= sec { /*2**/self.rotate_interval_sec }
else { 0/*self.rotate_interval_sec*/ };
run_index += 1; run_index += 1;
self.dir.create_writer(&self.syncer_channel,
next_start.unwrap_or(local_realtime), local_realtime, let w = self.dir.create_writer(&self.syncer_channel, prev_end,
run_index, self.camera_id, video_sample_entry_id)? run_index, self.camera_id,
video_sample_entry_id)?;
WriterState{
writer: w,
rotate: r,
}
}, },
}; };
let orig_data = match pkt.data() { let orig_data = match pkt.data() {
@ -161,11 +177,11 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
} else { } else {
orig_data orig_data
}; };
w.write(transformed_data, pts, pkt.is_key())?; s.writer.write(transformed_data, local_time, pts, pkt.is_key())?;
writer = Some(w); state = Some(s);
} }
if let Some(w) = writer { if let Some(s) = state {
w.close(None)?; s.writer.close(None)?;
} }
Ok(()) Ok(())
} }