diff --git a/db/dir.rs b/db/dir.rs index 95368fe..aadef43 100644 --- a/db/dir.rs +++ b/db/dir.rs @@ -242,24 +242,6 @@ impl SampleFileDir { Ok(()) } - /// Creates a new writer. - /// Note this doesn't wait for previous rotation to complete; it's assumed the sample file - /// directory has sufficient space for a couple recordings per camera in addition to the - /// cameras' total `retain_bytes`. - /// - /// The new recording will continue from `prev` if specified; this should be as returned from - /// a previous `close` call. - pub fn create_writer<'a>(&'a self, db: &db::Database, channel: &'a SyncerChannel, - prev: Option, stream_id: i32, - video_sample_entry_id: i32) - -> Result, Error> { - let (id, r) = db.lock().add_recording(stream_id)?; - let p = SampleFileDir::get_rel_pathname(id); - let f = unsafe { self.fd.openat(p.as_ptr(), libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, - 0o600) }.unwrap(); // TODO: don't unwrap! - Writer::open(f, id, r, prev, video_sample_entry_id, channel) - } - pub fn statfs(&self) -> Result { self.fd.statfs() } /// Gets a pathname for a sample file suitable for passing to open or unlink. @@ -620,18 +602,30 @@ fn retry_forever>(f: &mut FnMut() -> Result) -> T { } } -/// Single-use struct to write a single recording to disk and commit its metadata to the database. -/// Use `SampleFileDir::create_writer` to create a new writer. `Writer` hands off its state to the -/// syncer when done. It either saves the recording to the database (if I/O errors do not prevent -/// this) or marks it as abandoned so that the syncer will attempt to unlink the file. -pub struct Writer<'a>(Option>); +/// Struct for writing a single run (of potentially several recordings) to disk and committing its +/// metadata to the database. `Writer` hands off each recording's state to the syncer when done. It +/// saves the recording to the database (if I/O errors do not prevent this), retries forever, +/// or panics (if further writing on this stream is impossible). +pub struct Writer<'a> { + dir: &'a SampleFileDir, + db: &'a db::Database, + channel: &'a SyncerChannel, + stream_id: i32, + video_sample_entry_id: i32, + state: WriterState, +} -/// The state associated with a `Writer`. The indirection is for the `Drop` trait; `close` moves -/// `f` and `index.video_index` out of the `InnerWriter`, which is not allowed on a struct with -/// a `Drop` trait. To avoid this problem, the real state is surrounded by an `Option`. The -/// `Option` should none only after close is called, and thus never in a way visible to callers. -struct InnerWriter<'a> { - syncer_channel: &'a SyncerChannel, +enum WriterState { + Unopened, + Open(InnerWriter), + Closed(PreviousWriter), +} + +/// State for writing a single recording, used within `Writer`. +/// +/// Note that the recording created by every `InnerWriter` must be written to the `SyncerChannel` +/// with at least one sample. The sample may have zero duration. +struct InnerWriter { f: fs::File, r: Arc>, index: recording::SampleIndexEncoder, @@ -650,7 +644,6 @@ struct InnerWriter<'a> { adjuster: ClockAdjuster, - video_sample_entry_id: i32, run_offset: i32, /// A sample which has been written to disk but not added to `index`. Index writes are one @@ -658,6 +651,8 @@ struct InnerWriter<'a> { /// pts and the next sample's pts. A sample is flushed when the next sample is written, when /// the writer is closed cleanly (the caller supplies the next pts), or when the writer is /// closed uncleanly (with a zero duration, which the `.mp4` format allows only at the end). + /// + /// Invariant: this should always be `Some` (briefly violated during `write` call only). unflushed_sample: Option, } @@ -710,6 +705,7 @@ impl ClockAdjuster { } } +#[derive(Copy, Clone)] struct UnflushedSample { local_time: recording::Time, pts_90k: i64, @@ -717,20 +713,43 @@ struct UnflushedSample { is_key: bool, } +/// State associated with a run's previous recording; used within `Writer`. #[derive(Copy, Clone)] -pub struct PreviousWriter { +struct PreviousWriter { end_time: recording::Time, local_time_delta: recording::Duration, run_offset: i32, } impl<'a> Writer<'a> { - /// Opens the writer; for use by `SampleFileDir` (which should supply `f`). - fn open(f: fs::File, id: CompositeId, r: Arc>, - prev: Option, - video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel) -> Result { - Ok(Writer(Some(InnerWriter { - syncer_channel, + pub fn new(dir: &'a SampleFileDir, db: &'a db::Database, channel: &'a SyncerChannel, + stream_id: i32, video_sample_entry_id: i32) -> Self { + Writer { + dir, + db, + channel, + stream_id, + video_sample_entry_id, + state: WriterState::Unopened, + } + } + + /// Opens a new writer. + /// This returns a writer that violates the invariant that `unflushed_sample` is `Some`. + /// The caller (`write`) is responsible for correcting this. + fn open(&mut self) -> Result<&mut InnerWriter, Error> { + let prev = match self.state { + WriterState::Unopened => None, + WriterState::Open(ref mut w) => return Ok(w), + WriterState::Closed(prev) => Some(prev), + }; + let (id, r) = self.db.lock().add_recording(self.stream_id)?; + let p = SampleFileDir::get_rel_pathname(id); + let f = retry_forever(&mut || unsafe { + self.dir.fd.openat(p.as_ptr(), libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, 0o600) + }); + + self.state = WriterState::Open(InnerWriter { f, r, index: recording::SampleIndexEncoder::new(), @@ -739,20 +758,37 @@ impl<'a> Writer<'a> { prev_end: prev.map(|p| p.end_time), local_start: recording::Time(i64::max_value()), adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)), - video_sample_entry_id, run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0), unflushed_sample: None, - }))) + }); + match self.state { + WriterState::Open(ref mut w) => Ok(w), + _ => unreachable!(), + } + } + + pub fn previously_opened(&self) -> Result { + Ok(match self.state { + WriterState::Unopened => false, + WriterState::Closed(_) => true, + WriterState::Open(_) => bail!("open!"), + }) } /// Writes a new frame to this segment. /// `local_time` should be the local clock's time as of when this packet was received. pub fn write(&mut self, pkt: &[u8], local_time: recording::Time, pts_90k: i64, is_key: bool) -> Result<(), Error> { - let w = self.0.as_mut().unwrap(); + let w = self.open()?; + + // Note w's invariant that `unflushed_sample` is `None` may currently be violated. + // We must restore it on all success or error paths. + if let Some(unflushed) = w.unflushed_sample.take() { let duration = (pts_90k - unflushed.pts_90k) as i32; if duration <= 0 { + // Restore invariant. + w.unflushed_sample = Some(unflushed); bail!("pts not monotonically increasing; got {} then {}", unflushed.pts_90k, pts_90k); } @@ -765,33 +801,39 @@ impl<'a> Writer<'a> { let written = retry_forever(&mut || w.f.write(remaining)); remaining = &remaining[written..]; } - w.unflushed_sample = Some(UnflushedSample{ - local_time: local_time, - pts_90k: pts_90k, + w.unflushed_sample = Some(UnflushedSample { + local_time, + pts_90k, len: pkt.len() as i32, - is_key: is_key}); - w.hasher.update(pkt)?; + is_key, + }); + w.hasher.update(pkt).unwrap(); Ok(()) } /// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's /// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait, /// swallowing errors and using a zero duration for the last sample. - pub fn close(mut self, next_pts: Option) -> Result { - self.0.take().unwrap().close(next_pts) + pub fn close(&mut self, next_pts: Option) { + self.state = match mem::replace(&mut self.state, WriterState::Unopened) { + WriterState::Open(w) => { + let prev = w.close(self.channel, self.video_sample_entry_id, next_pts); + WriterState::Closed(prev) + }, + s => s, + }; } } -impl<'a> InnerWriter<'a> { +impl InnerWriter { fn extend_local_start(&mut self, pkt_local_time: recording::Time) { let new = pkt_local_time - recording::Duration(self.index.total_duration_90k as i64); self.local_start = cmp::min(self.local_start, new); } - fn close(mut self, next_pts: Option) -> Result { - let unflushed = - self.unflushed_sample.take() - .ok_or_else(|| format_err!("recording {} has no packets", self.id))?; + fn close(mut self, channel: &SyncerChannel, video_sample_entry_id: i32, + next_pts: Option) -> PreviousWriter { + let unflushed = self.unflushed_sample.take().expect("should always be an unflushed sample"); let duration = self.adjuster.adjust(match next_pts { None => 0, Some(p) => (p - unflushed.pts_90k) as i32, @@ -799,7 +841,7 @@ impl<'a> InnerWriter<'a> { self.index.add_sample(duration, unflushed.len, unflushed.is_key); self.extend_local_start(unflushed.local_time); let mut sha1_bytes = [0u8; 20]; - sha1_bytes.copy_from_slice(&self.hasher.finish()?[..]); + sha1_bytes.copy_from_slice(&self.hasher.finish().unwrap()[..]); let start = self.prev_end.unwrap_or(self.local_start); let end = start + recording::Duration(self.index.total_duration_90k as i64); let flags = if self.index.has_trailing_zero() { db::RecordingFlags::TrailingZero as i32 } @@ -811,29 +853,29 @@ impl<'a> InnerWriter<'a> { local_time_delta: local_start_delta, video_samples: self.index.video_samples, video_sync_samples: self.index.video_sync_samples, - video_sample_entry_id: self.video_sample_entry_id, + video_sample_entry_id, video_index: self.index.video_index, sample_file_sha1: sha1_bytes, run_offset: self.run_offset, flags: flags, }; self.r.lock().recording = Some(recording); - self.syncer_channel.async_save_recording(self.id, self.r, self.f); - Ok(PreviousWriter{ + channel.async_save_recording(self.id, self.r, self.f); + PreviousWriter { end_time: end, local_time_delta: local_start_delta, run_offset: self.run_offset, - }) + } } } impl<'a> Drop for Writer<'a> { fn drop(&mut self) { - if let Some(w) = self.0.take() { + if let WriterState::Open(w) = mem::replace(&mut self.state, WriterState::Unopened) { // Swallow any error. The caller should only drop the Writer without calling close() // if there's already been an error. The caller should report that. No point in // complaining again. - let _ = w.close(None); + let _ = w.close(self.channel, self.video_sample_entry_id, None); } } } diff --git a/src/mp4.rs b/src/mp4.rs index c089107..d56cf57 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -1753,8 +1753,8 @@ mod tests { extra_data.width, extra_data.height, extra_data.sample_entry, extra_data.rfc6381_codec).unwrap(); let dir = db.dirs_by_stream_id.get(&TEST_STREAM_ID).unwrap(); - let mut output = dir.create_writer(&db.db, &db.syncer_channel, None, - TEST_STREAM_ID, video_sample_entry_id).unwrap(); + let mut output = dir::Writer::new(dir, &db.db, &db.syncer_channel, TEST_STREAM_ID, + video_sample_entry_id); // end_pts is the pts of the end of the most recent frame (start + duration). // It's needed because dir::Writer calculates a packet's duration from its pts and the @@ -1777,7 +1777,7 @@ mod tests { pkt.is_key()).unwrap(); end_pts = Some(pts + pkt.duration() as i64); } - output.close(end_pts).unwrap(); + output.close(end_pts); db.syncer_channel.flush(); } diff --git a/src/streamer.rs b/src/streamer.rs index 6d16dc2..4166281 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -65,13 +65,6 @@ pub struct Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { redacted_url: String, } -struct WriterState<'a> { - writer: dir::Writer<'a>, - - /// Seconds since epoch at which to next rotate. - rotate: i64, -} - impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { pub fn new<'b>(env: &Environment<'a, 'b, C, S>, dir: Arc, syncer_channel: dir::SyncerChannel, @@ -124,9 +117,12 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { }; debug!("{}: video_sample_entry_id={}", self.short_name, video_sample_entry_id); let mut seen_key_frame = false; - let mut state: Option = None; + + // Seconds since epoch at which to next rotate. + let mut rotate: Option = None; let mut transformed = Vec::new(); - let mut prev = None; + let mut w = dir::Writer::new(&self.dir, &self.db, &self.syncer_channel, self.stream_id, + video_sample_entry_id); while !self.shutdown.load(Ordering::SeqCst) { let pkt = { let _t = TimerGuard::new(self.clocks, || "getting next packet"); @@ -141,18 +137,18 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { } let frame_realtime = self.clocks.monotonic() + realtime_offset; let local_time = recording::Time::new(frame_realtime); - state = if let Some(s) = state { - if frame_realtime.sec > s.rotate && pkt.is_key() { + rotate = if let Some(r) = rotate { + if frame_realtime.sec > r && pkt.is_key() { trace!("{}: write on normal rotation", self.short_name); let _t = TimerGuard::new(self.clocks, || "closing writer"); - prev = Some(s.writer.close(Some(pts))?); + w.close(Some(pts)); None } else { - Some(s) + Some(r) } } else { None }; - let mut s = match state { - Some(s) => s, + let r = match rotate { + Some(r) => r, None => { let sec = frame_realtime.sec; let r = sec - (sec % self.rotate_interval_sec) + self.rotate_offset_sec; @@ -162,15 +158,9 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { // the one after, so that it's longer than usual rather than shorter than // usual. This ensures there's plenty of frame times to use when calculating // the start time. - let r = r + if prev.is_none() { self.rotate_interval_sec } else { 0 }; - + let r = r + if w.previously_opened()? { 0 } else { self.rotate_interval_sec }; let _t = TimerGuard::new(self.clocks, || "creating writer"); - let w = self.dir.create_writer(&self.db, &self.syncer_channel, prev, - self.stream_id, video_sample_entry_id)?; - WriterState{ - writer: w, - rotate: r, - } + r }, }; let orig_data = match pkt.data() { @@ -185,12 +175,12 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { }; let _t = TimerGuard::new(self.clocks, || format!("writing {} bytes", transformed_data.len())); - s.writer.write(transformed_data, local_time, pts, pkt.is_key())?; - state = Some(s); + w.write(transformed_data, local_time, pts, pkt.is_key())?; + rotate = Some(r); } - if let Some(s) = state { + if rotate.is_some() { let _t = TimerGuard::new(self.clocks, || "closing writer"); - s.writer.close(None)?; + w.close(None); } Ok(()) }