diff --git a/src/dir.rs b/src/dir.rs index 291714a..b640b94 100644 --- a/src/dir.rs +++ b/src/dir.rs @@ -37,6 +37,7 @@ use error::Error; use libc; use recording; use openssl::crypto::hash; +use std::cmp; use std::ffi; use std::fs; use std::io::{self, Write}; @@ -120,8 +121,12 @@ impl SampleFileDir { /// Note this doesn't wait for previous rotation to complete; it's assumed the sample file /// directory has sufficient space for a couple recordings per camera in addition to the /// cameras' total `retain_bytes`. - pub fn create_writer<'a>(&self, channel: &'a SyncerChannel, start: recording::Time, - local_start: recording::Time, run_offset: i32, camera_id: i32, + /// + /// The new recording will start at `prev_end` if specified; this should be the end time + /// of the previous segment of the same run. `run_offset` should be the offset of this segment + /// within the run; 0 iff `prev_end` is None. + pub fn create_writer<'a>(&self, channel: &'a SyncerChannel, prev_end: Option, + run_offset: i32, camera_id: i32, video_sample_entry_id: i32) -> Result, Error> { // Grab the next uuid. Typically one is cached—a sync has usually completed since the last // writer was created, and syncs ensure `next_uuid` is filled while performing their @@ -145,8 +150,7 @@ impl SampleFileDir { return Err(e.into()); }, }; - Writer::open(f, uuid, start, local_start, run_offset, camera_id, video_sample_entry_id, - channel) + Writer::open(f, uuid, prev_end, run_offset, camera_id, video_sample_entry_id, channel) } /// Opens a sample file within this directory with the given flags and (if creating) mode. @@ -425,8 +429,16 @@ struct InnerWriter<'a> { uuid: Uuid, corrupt: bool, hasher: hash::Hasher, - start_time: recording::Time, - local_time: recording::Time, + + /// The end time of the previous segment in this run, if any. + prev_end: Option, + + /// The start time of this segment, based solely on examining the local clock after frames in + /// this segment were received. This is initially `None`, filled in on the second packet in + /// the segment, and refined as more packets are received. See `design/time.md` for more + /// information. This will be used as the official start time iff `prev_end` is None. + local_start: Option, + camera_id: i32, video_sample_entry_id: i32, run_offset: i32, @@ -440,6 +452,7 @@ struct InnerWriter<'a> { } struct UnflushedSample { + local_time: recording::Time, pts_90k: i64, len: i32, is_key: bool, @@ -447,7 +460,7 @@ struct UnflushedSample { impl<'a> Writer<'a> { /// Opens the writer; for use by `SampleFileDir` (which should supply `f`). - fn open(f: fs::File, uuid: Uuid, start_time: recording::Time, local_time: recording::Time, + fn open(f: fs::File, uuid: Uuid, prev_end: Option, run_offset: i32, camera_id: i32, video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel) -> Result { Ok(Writer(Some(InnerWriter{ @@ -457,8 +470,8 @@ impl<'a> Writer<'a> { uuid: uuid, corrupt: false, hasher: hash::Hasher::new(hash::Type::SHA1)?, - start_time: start_time, - local_time: local_time, + prev_end: prev_end, + local_start: None, camera_id: camera_id, video_sample_entry_id: video_sample_entry_id, run_offset: run_offset, @@ -466,11 +479,15 @@ impl<'a> Writer<'a> { }))) } - pub fn write(&mut self, pkt: &[u8], pts_90k: i64, is_key: bool) -> Result<(), Error> { + /// Writes a new frame to this segment. + /// `local_time` should be the local clock's time as of when this packet was received. + pub fn write(&mut self, pkt: &[u8], local_time: recording::Time, pts_90k: i64, + is_key: bool) -> Result<(), Error> { let w = self.0.as_mut().unwrap(); if let Some(unflushed) = w.unflushed_sample.take() { let duration = (pts_90k - unflushed.pts_90k) as i32; w.index.add_sample(duration, unflushed.len, unflushed.is_key); + w.local_start = Some(w.extend_local_start(unflushed.local_time)); } let mut remaining = pkt; while !remaining.is_empty() { @@ -491,6 +508,7 @@ impl<'a> Writer<'a> { remaining = &remaining[written..]; } w.unflushed_sample = Some(UnflushedSample{ + local_time: local_time, pts_90k: pts_90k, len: pkt.len() as i32, is_key: is_key}); @@ -507,26 +525,36 @@ impl<'a> Writer<'a> { } impl<'a> InnerWriter<'a> { + fn extend_local_start(&mut self, pkt_local_time: recording::Time) -> recording::Time { + let new = pkt_local_time - recording::Duration(self.index.total_duration_90k as i64); + match self.local_start { + None => new, + Some(old) => cmp::min(old, new), + } + } + fn close(mut self, next_pts: Option) -> Result { if self.corrupt { self.syncer_channel.async_abandon_recording(self.uuid); return Err(Error::new(format!("recording {} is corrupt", self.uuid))); } - if let Some(unflushed) = self.unflushed_sample.take() { - let duration = match next_pts { - None => 0, - Some(p) => (p - unflushed.pts_90k) as i32, - }; - self.index.add_sample(duration, unflushed.len, unflushed.is_key); - } + let unflushed = + self.unflushed_sample.take().ok_or_else(|| Error::new("no packets!".to_owned()))?; + let duration = match next_pts { + None => 0, + Some(p) => (p - unflushed.pts_90k) as i32, + }; + self.index.add_sample(duration, unflushed.len, unflushed.is_key); + let local_start = self.extend_local_start(unflushed.local_time); let mut sha1_bytes = [0u8; 20]; sha1_bytes.copy_from_slice(&self.hasher.finish()?[..]); - let end = self.start_time + recording::Duration(self.index.total_duration_90k as i64); + let start_time = self.prev_end.unwrap_or(local_start); + let end = start_time + recording::Duration(self.index.total_duration_90k as i64); let recording = db::RecordingToInsert{ camera_id: self.camera_id, sample_file_bytes: self.index.sample_file_bytes, - time: self.start_time .. end, - local_time: self.local_time, + time: start_time .. end, + local_time: local_start, video_samples: self.index.video_samples, video_sync_samples: self.index.video_sync_samples, video_sample_entry_id: self.video_sample_entry_id, diff --git a/src/mp4.rs b/src/mp4.rs index 344efa4..46cd7bd 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -1408,7 +1408,7 @@ mod tests { let extra_data = input.get_extra_data().unwrap(); let video_sample_entry_id = db.db.lock().insert_video_sample_entry( extra_data.width, extra_data.height, &extra_data.sample_entry).unwrap(); - let mut output = db.dir.create_writer(&db.syncer_channel, START_TIME, START_TIME, 0, + let mut output = db.dir.create_writer(&db.syncer_channel, None, 0, TEST_CAMERA_ID, video_sample_entry_id).unwrap(); // end_pts is the pts of the end of the most recent frame (start + duration). @@ -1417,15 +1417,20 @@ mod tests { // To write the final packet of this sample .mp4 with a full duration, we need to fake a // next packet's pts from the ffmpeg-supplied duration. let mut end_pts = None; + + let mut frame_time = START_TIME; + loop { let pkt = match input.get_next() { Ok(p) => p, Err(ffmpeg::Error::Eof) => { break; }, Err(e) => { panic!("unexpected input error: {}", e); }, }; - output.write(pkt.data().expect("packet without data"), pkt.pts().unwrap(), + let pts = pkt.pts().unwrap(); + frame_time += recording::Duration(pkt.duration()); + output.write(pkt.data().expect("packet without data"), frame_time, pts, pkt.is_key()).unwrap(); - end_pts = Some(pkt.pts().unwrap() + pkt.duration()); + end_pts = Some(pts + pkt.duration()); } output.close(end_pts).unwrap(); db.syncer_channel.flush(); diff --git a/src/streamer.rs b/src/streamer.rs index 2cee0a9..078019c 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -68,6 +68,13 @@ pub struct Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { redacted_url: String, } +struct WriterState<'a> { + writer: dir::Writer<'a>, + + /// Seconds since epoch at which to next rotate. + rotate: i64, +} + impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { pub fn new<'b>(env: &Environment<'a, 'b, C, S>, syncer_channel: dir::SyncerChannel, camera_id: i32, c: &Camera, rotate_offset_sec: i64, @@ -113,10 +120,9 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { &extra_data.sample_entry)?; debug!("{}: video_sample_entry_id={}", self.short_name, video_sample_entry_id); let mut seen_key_frame = false; - let mut rotate = None; - let mut writer: Option = None; + let mut state: Option = None; let mut transformed = Vec::new(); - let mut next_start = None; + let mut prev_end = None; let mut run_index = -1; while !self.shutdown.load(Ordering::SeqCst) { let pkt = stream.get_next()?; @@ -128,27 +134,37 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { seen_key_frame = true; } let frame_realtime = self.clock.get_time(); - if let Some(r) = rotate { - if frame_realtime.sec > r && pkt.is_key() { - let w = writer.take().expect("rotate set implies writer is set"); + let local_time = recording::Time::new(frame_realtime); + state = if let Some(s) = state { + if frame_realtime.sec > s.rotate && pkt.is_key() { trace!("{}: write on normal rotation", self.short_name); - next_start = Some(w.close(Some(pts))?); + prev_end = Some(s.writer.close(Some(pts))?); + None + } else { + Some(s) } - }; - let mut w = match writer { - Some(w) => w, + } else { None }; + let mut s = match state { + Some(s) => s, None => { - let r = frame_realtime.sec - - (frame_realtime.sec % self.rotate_interval_sec) + - self.rotate_offset_sec; - rotate = Some( - if r <= frame_realtime.sec { r + self.rotate_interval_sec } else { r }); - let local_realtime = recording::Time::new(frame_realtime); + // Set rotate time to not the next rotate offset, but the one after. + // The first recording interval is longer than usual rather than shorter + // than usual so that there's plenty of frame times to use when calculating the + // start time. + let sec = frame_realtime.sec; + let r = sec - (sec % self.rotate_interval_sec) + self.rotate_offset_sec; + let r = r + if r <= sec { /*2**/self.rotate_interval_sec } + else { 0/*self.rotate_interval_sec*/ }; run_index += 1; - self.dir.create_writer(&self.syncer_channel, - next_start.unwrap_or(local_realtime), local_realtime, - run_index, self.camera_id, video_sample_entry_id)? + + let w = self.dir.create_writer(&self.syncer_channel, prev_end, + run_index, self.camera_id, + video_sample_entry_id)?; + WriterState{ + writer: w, + rotate: r, + } }, }; let orig_data = match pkt.data() { @@ -161,11 +177,11 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { } else { orig_data }; - w.write(transformed_data, pts, pkt.is_key())?; - writer = Some(w); + s.writer.write(transformed_data, local_time, pts, pkt.is_key())?; + state = Some(s); } - if let Some(w) = writer { - w.close(None)?; + if let Some(s) = state { + s.writer.close(None)?; } Ok(()) }