clean up Writer interface slightly

This commit is contained in:
Scott Lamb 2016-12-29 12:33:34 -08:00
parent d001e4893c
commit cc297adc75
2 changed files with 25 additions and 22 deletions

View File

@ -122,12 +122,11 @@ impl SampleFileDir {
/// directory has sufficient space for a couple recordings per camera in addition to the /// directory has sufficient space for a couple recordings per camera in addition to the
/// cameras' total `retain_bytes`. /// cameras' total `retain_bytes`.
/// ///
/// The new recording will start at `prev_end` if specified; this should be the end time /// The new recording will continue from `prev` if specified; this should be as returned from
/// of the previous segment of the same run. `run_offset` should be the offset of this segment /// a previous `close` call.
/// within the run; 0 iff `prev_end` is None. pub fn create_writer<'a>(&self, channel: &'a SyncerChannel, prev: Option<PreviousWriter>,
pub fn create_writer<'a>(&self, channel: &'a SyncerChannel, prev_end: Option<recording::Time>, camera_id: i32, video_sample_entry_id: i32)
run_offset: i32, camera_id: i32, -> Result<Writer<'a>, Error> {
video_sample_entry_id: i32) -> Result<Writer<'a>, Error> {
// Grab the next uuid. Typically one is cached—a sync has usually completed since the last // Grab the next uuid. Typically one is cached—a sync has usually completed since the last
// writer was created, and syncs ensure `next_uuid` is filled while performing their // writer was created, and syncs ensure `next_uuid` is filled while performing their
// transaction. But if not, perform an extra database transaction to reserve a new one. // transaction. But if not, perform an extra database transaction to reserve a new one.
@ -150,7 +149,7 @@ impl SampleFileDir {
return Err(e.into()); return Err(e.into());
}, },
}; };
Writer::open(f, uuid, prev_end, run_offset, camera_id, video_sample_entry_id, channel) Writer::open(f, uuid, prev, camera_id, video_sample_entry_id, channel)
} }
/// Opens a sample file within this directory with the given flags and (if creating) mode. /// Opens a sample file within this directory with the given flags and (if creating) mode.
@ -458,11 +457,16 @@ struct UnflushedSample {
is_key: bool, is_key: bool,
} }
#[derive(Copy, Clone)]
pub struct PreviousWriter {
end_time: recording::Time,
run_offset: i32,
}
impl<'a> Writer<'a> { impl<'a> Writer<'a> {
/// Opens the writer; for use by `SampleFileDir` (which should supply `f`). /// Opens the writer; for use by `SampleFileDir` (which should supply `f`).
fn open(f: fs::File, uuid: Uuid, prev_end: Option<recording::Time>, fn open(f: fs::File, uuid: Uuid, prev: Option<PreviousWriter>, camera_id: i32,
run_offset: i32, camera_id: i32, video_sample_entry_id: i32, video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel) -> Result<Self, Error> {
syncer_channel: &'a SyncerChannel) -> Result<Self, Error> {
Ok(Writer(Some(InnerWriter{ Ok(Writer(Some(InnerWriter{
syncer_channel: syncer_channel, syncer_channel: syncer_channel,
f: f, f: f,
@ -470,11 +474,11 @@ impl<'a> Writer<'a> {
uuid: uuid, uuid: uuid,
corrupt: false, corrupt: false,
hasher: hash::Hasher::new(hash::Type::SHA1)?, hasher: hash::Hasher::new(hash::Type::SHA1)?,
prev_end: prev_end, prev_end: prev.map(|p| p.end_time),
local_start: None, local_start: None,
camera_id: camera_id, camera_id: camera_id,
video_sample_entry_id: video_sample_entry_id, video_sample_entry_id: video_sample_entry_id,
run_offset: run_offset, run_offset: prev.map(|p| p.run_offset).unwrap_or(0),
unflushed_sample: None, unflushed_sample: None,
}))) })))
} }
@ -519,7 +523,7 @@ impl<'a> Writer<'a> {
/// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's /// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's
/// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait, /// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait,
/// swallowing errors and using a zero duration for the last sample. /// swallowing errors and using a zero duration for the last sample.
pub fn close(mut self, next_pts: Option<i64>) -> Result<recording::Time, Error> { pub fn close(mut self, next_pts: Option<i64>) -> Result<PreviousWriter, Error> {
self.0.take().unwrap().close(next_pts) self.0.take().unwrap().close(next_pts)
} }
} }
@ -533,7 +537,7 @@ impl<'a> InnerWriter<'a> {
} }
} }
fn close(mut self, next_pts: Option<i64>) -> Result<recording::Time, Error> { fn close(mut self, next_pts: Option<i64>) -> Result<PreviousWriter, Error> {
if self.corrupt { if self.corrupt {
self.syncer_channel.async_abandon_recording(self.uuid); self.syncer_channel.async_abandon_recording(self.uuid);
return Err(Error::new(format!("recording {} is corrupt", self.uuid))); return Err(Error::new(format!("recording {} is corrupt", self.uuid)));
@ -565,7 +569,10 @@ impl<'a> InnerWriter<'a> {
flags: 0, // TODO flags: 0, // TODO
}; };
self.syncer_channel.async_save_recording(recording, self.f); self.syncer_channel.async_save_recording(recording, self.f);
Ok(end) Ok(PreviousWriter{
end_time: end,
run_offset: self.run_offset,
})
} }
} }

View File

@ -122,8 +122,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
let mut seen_key_frame = false; let mut seen_key_frame = false;
let mut state: Option<WriterState> = None; let mut state: Option<WriterState> = None;
let mut transformed = Vec::new(); let mut transformed = Vec::new();
let mut prev_end = None; let mut prev = None;
let mut run_index = -1;
while !self.shutdown.load(Ordering::SeqCst) { while !self.shutdown.load(Ordering::SeqCst) {
let pkt = stream.get_next()?; let pkt = stream.get_next()?;
let pts = pkt.pts().ok_or_else(|| Error::new("packet with no pts".to_owned()))?; let pts = pkt.pts().ok_or_else(|| Error::new("packet with no pts".to_owned()))?;
@ -138,7 +137,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
state = if let Some(s) = state { state = if let Some(s) = state {
if frame_realtime.sec > s.rotate && pkt.is_key() { if frame_realtime.sec > s.rotate && pkt.is_key() {
trace!("{}: write on normal rotation", self.short_name); trace!("{}: write on normal rotation", self.short_name);
prev_end = Some(s.writer.close(Some(pts))?); prev = Some(s.writer.close(Some(pts))?);
None None
} else { } else {
Some(s) Some(s)
@ -156,10 +155,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
let r = r + if r <= sec { /*2**/self.rotate_interval_sec } let r = r + if r <= sec { /*2**/self.rotate_interval_sec }
else { 0/*self.rotate_interval_sec*/ }; else { 0/*self.rotate_interval_sec*/ };
run_index += 1; let w = self.dir.create_writer(&self.syncer_channel, prev, self.camera_id,
let w = self.dir.create_writer(&self.syncer_channel, prev_end,
run_index, self.camera_id,
video_sample_entry_id)?; video_sample_entry_id)?;
WriterState{ WriterState{
writer: w, writer: w,