From fb4d88d3e28e808140bd4402254031af96869708 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Wed, 28 Feb 2018 12:32:52 -0800 Subject: [PATCH] make db::dir::Writer equally stubborn Every recording it starts must be sent to the syncer with at least one sample written. It will try forever (unless the channel is down, then panic). This avoids the situation in which it prevents something in the uncommitted VecDeque from ever being synced and thus any further recordings from being flushed. --- db/dir.rs | 160 ++++++++++++++++++++++++++++++------------------ src/mp4.rs | 6 +- src/streamer.rs | 44 +++++-------- 3 files changed, 121 insertions(+), 89 deletions(-) diff --git a/db/dir.rs b/db/dir.rs index 95368fe..aadef43 100644 --- a/db/dir.rs +++ b/db/dir.rs @@ -242,24 +242,6 @@ impl SampleFileDir { Ok(()) } - /// Creates a new writer. - /// Note this doesn't wait for previous rotation to complete; it's assumed the sample file - /// directory has sufficient space for a couple recordings per camera in addition to the - /// cameras' total `retain_bytes`. - /// - /// The new recording will continue from `prev` if specified; this should be as returned from - /// a previous `close` call. - pub fn create_writer<'a>(&'a self, db: &db::Database, channel: &'a SyncerChannel, - prev: Option, stream_id: i32, - video_sample_entry_id: i32) - -> Result, Error> { - let (id, r) = db.lock().add_recording(stream_id)?; - let p = SampleFileDir::get_rel_pathname(id); - let f = unsafe { self.fd.openat(p.as_ptr(), libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, - 0o600) }.unwrap(); // TODO: don't unwrap! - Writer::open(f, id, r, prev, video_sample_entry_id, channel) - } - pub fn statfs(&self) -> Result { self.fd.statfs() } /// Gets a pathname for a sample file suitable for passing to open or unlink. @@ -620,18 +602,30 @@ fn retry_forever>(f: &mut FnMut() -> Result) -> T { } } -/// Single-use struct to write a single recording to disk and commit its metadata to the database. -/// Use `SampleFileDir::create_writer` to create a new writer. `Writer` hands off its state to the -/// syncer when done. It either saves the recording to the database (if I/O errors do not prevent -/// this) or marks it as abandoned so that the syncer will attempt to unlink the file. -pub struct Writer<'a>(Option>); +/// Struct for writing a single run (of potentially several recordings) to disk and committing its +/// metadata to the database. `Writer` hands off each recording's state to the syncer when done. It +/// saves the recording to the database (if I/O errors do not prevent this), retries forever, +/// or panics (if further writing on this stream is impossible). +pub struct Writer<'a> { + dir: &'a SampleFileDir, + db: &'a db::Database, + channel: &'a SyncerChannel, + stream_id: i32, + video_sample_entry_id: i32, + state: WriterState, +} -/// The state associated with a `Writer`. The indirection is for the `Drop` trait; `close` moves -/// `f` and `index.video_index` out of the `InnerWriter`, which is not allowed on a struct with -/// a `Drop` trait. To avoid this problem, the real state is surrounded by an `Option`. The -/// `Option` should none only after close is called, and thus never in a way visible to callers. -struct InnerWriter<'a> { - syncer_channel: &'a SyncerChannel, +enum WriterState { + Unopened, + Open(InnerWriter), + Closed(PreviousWriter), +} + +/// State for writing a single recording, used within `Writer`. +/// +/// Note that the recording created by every `InnerWriter` must be written to the `SyncerChannel` +/// with at least one sample. The sample may have zero duration. +struct InnerWriter { f: fs::File, r: Arc>, index: recording::SampleIndexEncoder, @@ -650,7 +644,6 @@ struct InnerWriter<'a> { adjuster: ClockAdjuster, - video_sample_entry_id: i32, run_offset: i32, /// A sample which has been written to disk but not added to `index`. Index writes are one @@ -658,6 +651,8 @@ struct InnerWriter<'a> { /// pts and the next sample's pts. A sample is flushed when the next sample is written, when /// the writer is closed cleanly (the caller supplies the next pts), or when the writer is /// closed uncleanly (with a zero duration, which the `.mp4` format allows only at the end). + /// + /// Invariant: this should always be `Some` (briefly violated during `write` call only). unflushed_sample: Option, } @@ -710,6 +705,7 @@ impl ClockAdjuster { } } +#[derive(Copy, Clone)] struct UnflushedSample { local_time: recording::Time, pts_90k: i64, @@ -717,20 +713,43 @@ struct UnflushedSample { is_key: bool, } +/// State associated with a run's previous recording; used within `Writer`. #[derive(Copy, Clone)] -pub struct PreviousWriter { +struct PreviousWriter { end_time: recording::Time, local_time_delta: recording::Duration, run_offset: i32, } impl<'a> Writer<'a> { - /// Opens the writer; for use by `SampleFileDir` (which should supply `f`). - fn open(f: fs::File, id: CompositeId, r: Arc>, - prev: Option, - video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel) -> Result { - Ok(Writer(Some(InnerWriter { - syncer_channel, + pub fn new(dir: &'a SampleFileDir, db: &'a db::Database, channel: &'a SyncerChannel, + stream_id: i32, video_sample_entry_id: i32) -> Self { + Writer { + dir, + db, + channel, + stream_id, + video_sample_entry_id, + state: WriterState::Unopened, + } + } + + /// Opens a new writer. + /// This returns a writer that violates the invariant that `unflushed_sample` is `Some`. + /// The caller (`write`) is responsible for correcting this. + fn open(&mut self) -> Result<&mut InnerWriter, Error> { + let prev = match self.state { + WriterState::Unopened => None, + WriterState::Open(ref mut w) => return Ok(w), + WriterState::Closed(prev) => Some(prev), + }; + let (id, r) = self.db.lock().add_recording(self.stream_id)?; + let p = SampleFileDir::get_rel_pathname(id); + let f = retry_forever(&mut || unsafe { + self.dir.fd.openat(p.as_ptr(), libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, 0o600) + }); + + self.state = WriterState::Open(InnerWriter { f, r, index: recording::SampleIndexEncoder::new(), @@ -739,20 +758,37 @@ impl<'a> Writer<'a> { prev_end: prev.map(|p| p.end_time), local_start: recording::Time(i64::max_value()), adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)), - video_sample_entry_id, run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0), unflushed_sample: None, - }))) + }); + match self.state { + WriterState::Open(ref mut w) => Ok(w), + _ => unreachable!(), + } + } + + pub fn previously_opened(&self) -> Result { + Ok(match self.state { + WriterState::Unopened => false, + WriterState::Closed(_) => true, + WriterState::Open(_) => bail!("open!"), + }) } /// Writes a new frame to this segment. /// `local_time` should be the local clock's time as of when this packet was received. pub fn write(&mut self, pkt: &[u8], local_time: recording::Time, pts_90k: i64, is_key: bool) -> Result<(), Error> { - let w = self.0.as_mut().unwrap(); + let w = self.open()?; + + // Note w's invariant that `unflushed_sample` is `None` may currently be violated. + // We must restore it on all success or error paths. + if let Some(unflushed) = w.unflushed_sample.take() { let duration = (pts_90k - unflushed.pts_90k) as i32; if duration <= 0 { + // Restore invariant. + w.unflushed_sample = Some(unflushed); bail!("pts not monotonically increasing; got {} then {}", unflushed.pts_90k, pts_90k); } @@ -765,33 +801,39 @@ impl<'a> Writer<'a> { let written = retry_forever(&mut || w.f.write(remaining)); remaining = &remaining[written..]; } - w.unflushed_sample = Some(UnflushedSample{ - local_time: local_time, - pts_90k: pts_90k, + w.unflushed_sample = Some(UnflushedSample { + local_time, + pts_90k, len: pkt.len() as i32, - is_key: is_key}); - w.hasher.update(pkt)?; + is_key, + }); + w.hasher.update(pkt).unwrap(); Ok(()) } /// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's /// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait, /// swallowing errors and using a zero duration for the last sample. - pub fn close(mut self, next_pts: Option) -> Result { - self.0.take().unwrap().close(next_pts) + pub fn close(&mut self, next_pts: Option) { + self.state = match mem::replace(&mut self.state, WriterState::Unopened) { + WriterState::Open(w) => { + let prev = w.close(self.channel, self.video_sample_entry_id, next_pts); + WriterState::Closed(prev) + }, + s => s, + }; } } -impl<'a> InnerWriter<'a> { +impl InnerWriter { fn extend_local_start(&mut self, pkt_local_time: recording::Time) { let new = pkt_local_time - recording::Duration(self.index.total_duration_90k as i64); self.local_start = cmp::min(self.local_start, new); } - fn close(mut self, next_pts: Option) -> Result { - let unflushed = - self.unflushed_sample.take() - .ok_or_else(|| format_err!("recording {} has no packets", self.id))?; + fn close(mut self, channel: &SyncerChannel, video_sample_entry_id: i32, + next_pts: Option) -> PreviousWriter { + let unflushed = self.unflushed_sample.take().expect("should always be an unflushed sample"); let duration = self.adjuster.adjust(match next_pts { None => 0, Some(p) => (p - unflushed.pts_90k) as i32, @@ -799,7 +841,7 @@ impl<'a> InnerWriter<'a> { self.index.add_sample(duration, unflushed.len, unflushed.is_key); self.extend_local_start(unflushed.local_time); let mut sha1_bytes = [0u8; 20]; - sha1_bytes.copy_from_slice(&self.hasher.finish()?[..]); + sha1_bytes.copy_from_slice(&self.hasher.finish().unwrap()[..]); let start = self.prev_end.unwrap_or(self.local_start); let end = start + recording::Duration(self.index.total_duration_90k as i64); let flags = if self.index.has_trailing_zero() { db::RecordingFlags::TrailingZero as i32 } @@ -811,29 +853,29 @@ impl<'a> InnerWriter<'a> { local_time_delta: local_start_delta, video_samples: self.index.video_samples, video_sync_samples: self.index.video_sync_samples, - video_sample_entry_id: self.video_sample_entry_id, + video_sample_entry_id, video_index: self.index.video_index, sample_file_sha1: sha1_bytes, run_offset: self.run_offset, flags: flags, }; self.r.lock().recording = Some(recording); - self.syncer_channel.async_save_recording(self.id, self.r, self.f); - Ok(PreviousWriter{ + channel.async_save_recording(self.id, self.r, self.f); + PreviousWriter { end_time: end, local_time_delta: local_start_delta, run_offset: self.run_offset, - }) + } } } impl<'a> Drop for Writer<'a> { fn drop(&mut self) { - if let Some(w) = self.0.take() { + if let WriterState::Open(w) = mem::replace(&mut self.state, WriterState::Unopened) { // Swallow any error. The caller should only drop the Writer without calling close() // if there's already been an error. The caller should report that. No point in // complaining again. - let _ = w.close(None); + let _ = w.close(self.channel, self.video_sample_entry_id, None); } } } diff --git a/src/mp4.rs b/src/mp4.rs index c089107..d56cf57 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -1753,8 +1753,8 @@ mod tests { extra_data.width, extra_data.height, extra_data.sample_entry, extra_data.rfc6381_codec).unwrap(); let dir = db.dirs_by_stream_id.get(&TEST_STREAM_ID).unwrap(); - let mut output = dir.create_writer(&db.db, &db.syncer_channel, None, - TEST_STREAM_ID, video_sample_entry_id).unwrap(); + let mut output = dir::Writer::new(dir, &db.db, &db.syncer_channel, TEST_STREAM_ID, + video_sample_entry_id); // end_pts is the pts of the end of the most recent frame (start + duration). // It's needed because dir::Writer calculates a packet's duration from its pts and the @@ -1777,7 +1777,7 @@ mod tests { pkt.is_key()).unwrap(); end_pts = Some(pts + pkt.duration() as i64); } - output.close(end_pts).unwrap(); + output.close(end_pts); db.syncer_channel.flush(); } diff --git a/src/streamer.rs b/src/streamer.rs index 6d16dc2..4166281 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -65,13 +65,6 @@ pub struct Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { redacted_url: String, } -struct WriterState<'a> { - writer: dir::Writer<'a>, - - /// Seconds since epoch at which to next rotate. - rotate: i64, -} - impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { pub fn new<'b>(env: &Environment<'a, 'b, C, S>, dir: Arc, syncer_channel: dir::SyncerChannel, @@ -124,9 +117,12 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { }; debug!("{}: video_sample_entry_id={}", self.short_name, video_sample_entry_id); let mut seen_key_frame = false; - let mut state: Option = None; + + // Seconds since epoch at which to next rotate. + let mut rotate: Option = None; let mut transformed = Vec::new(); - let mut prev = None; + let mut w = dir::Writer::new(&self.dir, &self.db, &self.syncer_channel, self.stream_id, + video_sample_entry_id); while !self.shutdown.load(Ordering::SeqCst) { let pkt = { let _t = TimerGuard::new(self.clocks, || "getting next packet"); @@ -141,18 +137,18 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { } let frame_realtime = self.clocks.monotonic() + realtime_offset; let local_time = recording::Time::new(frame_realtime); - state = if let Some(s) = state { - if frame_realtime.sec > s.rotate && pkt.is_key() { + rotate = if let Some(r) = rotate { + if frame_realtime.sec > r && pkt.is_key() { trace!("{}: write on normal rotation", self.short_name); let _t = TimerGuard::new(self.clocks, || "closing writer"); - prev = Some(s.writer.close(Some(pts))?); + w.close(Some(pts)); None } else { - Some(s) + Some(r) } } else { None }; - let mut s = match state { - Some(s) => s, + let r = match rotate { + Some(r) => r, None => { let sec = frame_realtime.sec; let r = sec - (sec % self.rotate_interval_sec) + self.rotate_offset_sec; @@ -162,15 +158,9 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { // the one after, so that it's longer than usual rather than shorter than // usual. This ensures there's plenty of frame times to use when calculating // the start time. - let r = r + if prev.is_none() { self.rotate_interval_sec } else { 0 }; - + let r = r + if w.previously_opened()? { 0 } else { self.rotate_interval_sec }; let _t = TimerGuard::new(self.clocks, || "creating writer"); - let w = self.dir.create_writer(&self.db, &self.syncer_channel, prev, - self.stream_id, video_sample_entry_id)?; - WriterState{ - writer: w, - rotate: r, - } + r }, }; let orig_data = match pkt.data() { @@ -185,12 +175,12 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { }; let _t = TimerGuard::new(self.clocks, || format!("writing {} bytes", transformed_data.len())); - s.writer.write(transformed_data, local_time, pts, pkt.is_key())?; - state = Some(s); + w.write(transformed_data, local_time, pts, pkt.is_key())?; + rotate = Some(r); } - if let Some(s) = state { + if rotate.is_some() { let _t = TimerGuard::new(self.clocks, || "closing writer"); - s.writer.close(None)?; + w.close(None); } Ok(()) }