From 91636d3193d122e134f3fb748a0c314ca4bb5bbd Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Fri, 23 Mar 2018 15:16:43 -0700 Subject: [PATCH] refine flush_if_sec behavior The new behavior eliminates a couple unpleasant edge cases in which it would never flush: * if all recording stops, whatever was unflushed would stay that way * if every recording attempt produces a 0-duration recording (such as if the camera sends only one frame and thus no PTS delta can be calculated), the list of recordings to flush would continue to grow --- base/clock.rs | 25 ++++++++++- db/db.rs | 22 +--------- db/recording.rs | 6 +++ db/schema.sql | 14 ++++-- db/writer.rs | 114 ++++++++++++++++++++++++++++++++++-------------- src/mp4.rs | 1 + src/web.rs | 2 +- 7 files changed, 124 insertions(+), 60 deletions(-) diff --git a/base/clock.rs b/base/clock.rs index 8c63edf..ab3176d 100644 --- a/base/clock.rs +++ b/base/clock.rs @@ -34,8 +34,9 @@ use failure::Error; use libc; use parking_lot::Mutex; use std::mem; -use std::sync::Arc; +use std::sync::{Arc, mpsc}; use std::thread; +use std::time::Duration as StdDuration; use time::{Duration, Timespec}; /// Abstract interface to the system clocks. This is for testability. @@ -48,9 +49,14 @@ pub trait Clocks : Send + Sync + 'static { /// Causes the current thread to sleep for the specified time. fn sleep(&self, how_long: Duration); + + /// Calls `rcv.recv_timeout` or substitutes a test implementation. + fn recv_timeout(&self, rcv: &mpsc::Receiver, + timeout: StdDuration) -> Result; } -pub fn retry_forever>(clocks: &Clocks, f: &mut FnMut() -> Result) -> T { +pub fn retry_forever(clocks: &C, f: &mut FnMut() -> Result) -> T +where C: Clocks, E: Into { loop { let e = match f() { Ok(t) => return t, @@ -85,6 +91,11 @@ impl Clocks for RealClocks { Err(e) => warn!("Invalid duration {:?}: {}", how_long, e), }; } + + fn recv_timeout(&self, rcv: &mpsc::Receiver, + timeout: StdDuration) -> Result { + rcv.recv_timeout(timeout) + } } /// Logs a warning if the TimerGuard lives "too long", using the label created by a supplied @@ -143,4 +154,14 @@ impl Clocks for SimulatedClocks { let mut l = self.0.uptime.lock(); *l = *l + how_long; } + + /// Advances the clock by the specified amount if data is not immediately available. + fn recv_timeout(&self, rcv: &mpsc::Receiver, + timeout: StdDuration) -> Result { + let r = rcv.recv_timeout(StdDuration::new(0, 0)); + if let Err(_) = r { + self.sleep(Duration::from_std(timeout).unwrap()); + } + r + } } diff --git a/db/db.rs b/db/db.rs index 093138c..c8308dc 100644 --- a/db/db.rs +++ b/db/db.rs @@ -367,9 +367,6 @@ pub struct Stream { pub retain_bytes: i64, pub flush_if_sec: i64, - /// `flush_if_sec` converted to a duration for convenience. - pub flush_if: recording::Duration, - /// The time range of recorded data associated with this stream (minimum start time and maximum /// end time). `None` iff there are no recordings for this camera. pub range: Option>, @@ -409,19 +406,6 @@ pub struct Stream { synced_recordings: usize, } -impl Stream { - /// Returns the duration of synced but uncommitted recordings for the given stream. - /// Note recordings must be flushed in order, so a recording is considered unsynced if any - /// before it are unsynced. - pub(crate) fn unflushed(&self) -> recording::Duration { - let mut sum = 0; - for i in 0..self.synced_recordings { - sum += self.uncommitted[i].lock().duration_90k as i64; - } - recording::Duration(sum) - } -} - #[derive(Clone, Debug, Default)] pub struct StreamChange { pub sample_file_dir_id: Option, @@ -675,8 +659,6 @@ impl StreamStateChanger { rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()), record: sc.record, flush_if_sec: sc.flush_if_sec, - flush_if: recording::Duration(sc.flush_if_sec * - recording::TIME_UNITS_PER_SEC), ..s }))); } @@ -711,7 +693,6 @@ impl StreamStateChanger { rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()), retain_bytes: 0, flush_if_sec: sc.flush_if_sec, - flush_if: recording::Duration(sc.flush_if_sec * recording::TIME_UNITS_PER_SEC), range: None, sample_file_bytes: 0, to_delete: Vec::new(), @@ -917,7 +898,7 @@ impl LockedDatabase { // Fix the range. s.range = new_range; } - info!("Flush due to {}: added {} recordings, deleted {}, marked {} files GCed.", + info!("Flush (why: {}): added {} recordings, deleted {}, marked {} files GCed.", reason, added, deleted, gced); for cb in &self.on_flush { cb(); @@ -1387,7 +1368,6 @@ impl LockedDatabase { rtsp_path: row.get_checked(4)?, retain_bytes: row.get_checked(5)?, flush_if_sec, - flush_if: recording::Duration(flush_if_sec * recording::TIME_UNITS_PER_SEC), range: None, sample_file_bytes: 0, to_delete: Vec::new(), diff --git a/db/recording.rs b/db/recording.rs index ed16272..882ccf0 100644 --- a/db/recording.rs +++ b/db/recording.rs @@ -156,6 +156,12 @@ impl fmt::Display for Time { #[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)] pub struct Duration(pub i64); +impl Duration { + pub fn to_tm_duration(&self) -> time::Duration { + time::Duration::nanoseconds(self.0 * 100000 / 9) + } +} + impl fmt::Display for Duration { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let mut seconds = self.0 / TIME_UNITS_PER_SEC; diff --git a/db/schema.sql b/db/schema.sql index 3abfed9..a65ae46 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -123,9 +123,17 @@ create table stream ( -- file. Older files will be deleted as necessary to stay within this limit. retain_bytes integer not null check (retain_bytes >= 0), - -- Flush the database when completing a recording if this stream has at - -- least this many seconds of unflushed recordings. A value of 0 means that - -- every completed recording will cause a flush. + -- Flush the database when the first instant of completed recording is this + -- many seconds old. A value of 0 means that every completed recording will + -- cause an immediate flush. Higher values may allow flushes to be combined, + -- reducing SSD write cycles. For example, if all streams have a flush_if_sec + -- >= x sec, there will be: + -- + -- * at most one flush per x sec in total + -- * at most x sec of completed but unflushed recordings per stream. + -- * at most x completed but unflushed recordings per stream, in the worst + -- case where a recording instantly fails, waits the 1-second retry delay, + -- then fails again, forever. flush_if_sec integer not null, -- The low 32 bits of the next recording id to assign for this stream. diff --git a/db/writer.rs b/db/writer.rs index 55c7488..7cfd36d 100644 --- a/db/writer.rs +++ b/db/writer.rs @@ -47,6 +47,8 @@ use std::os::unix::ffi::OsStrExt; use std::sync::Arc; use std::sync::mpsc; use std::thread; +use std::time::Duration as StdDuration; +use time::{Duration, Timespec}; pub trait DirWriter : 'static + Send { type File : FileWriter; @@ -83,7 +85,7 @@ impl FileWriter for ::std::fs::File { /// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct. enum SyncerCommand { - AsyncSaveRecording(CompositeId, F), + AsyncSaveRecording(CompositeId, recording::Duration, F), DatabaseFlushed, Flush(mpsc::SyncSender<()>), } @@ -101,6 +103,9 @@ struct Syncer { dir_id: i32, dir: D, db: Arc>, + + /// Information about the next scheduled flush: monotonic time and reason. + next_flush: Option<(Timespec, String)>, } /// Starts a syncer for the given sample file directory. @@ -204,8 +209,8 @@ fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32, impl SyncerChannel { /// Asynchronously syncs the given writer, closes it, records it into the database, and /// starts rotation. - fn async_save_recording(&self, id: CompositeId, f: F) { - self.0.send(SyncerCommand::AsyncSaveRecording(id, f)).unwrap(); + fn async_save_recording(&self, id: CompositeId, duration: recording::Duration, f: F) { + self.0.send(SyncerCommand::AsyncSaveRecording(id, duration, f)).unwrap(); } /// For testing: flushes the syncer, waiting for all currently-queued commands to complete. @@ -279,6 +284,7 @@ impl Syncer> { dir_id, dir, db, + next_flush: None, }, d.path.clone())) } @@ -333,11 +339,37 @@ impl Syncer> { impl Syncer { fn run(&mut self, cmds: mpsc::Receiver>) { loop { - match cmds.recv() { - Err(_) => return, // all senders have closed the channel; shutdown - Ok(SyncerCommand::AsyncSaveRecording(id, f)) => self.save(id, f), - Ok(SyncerCommand::DatabaseFlushed) => self.collect_garbage(), - Ok(SyncerCommand::Flush(_)) => {}, // just drop the supplied sender, closing it. + // Wait for a command, the next_flush timeout (if specified), or channel disconnect. + let next_flush = self.next_flush.take(); + let cmd = match next_flush { + None => match cmds.recv() { + Err(_) => return, // all senders are gone. + Ok(cmd) => cmd, + }, + Some((t, r)) => { + let now = self.db.clocks().monotonic(); + + // Calculate the timeout to use, mapping negative durations to 0. + let timeout = (t - now).to_std().unwrap_or(StdDuration::new(0, 0)); + match cmds.recv_timeout(timeout) { + Err(mpsc::RecvTimeoutError::Disconnected) => return, // all senders gone. + Err(mpsc::RecvTimeoutError::Timeout) => { + self.flush(&r); + continue + }, + Ok(cmd) => { + self.next_flush = Some((t, r)); + cmd + }, + } + }, + }; + + // Have a command; handle it. + match cmd { + SyncerCommand::AsyncSaveRecording(id, dur, f) => self.save(id, dur, f), + SyncerCommand::DatabaseFlushed => self.collect_garbage(), + SyncerCommand::Flush(_) => {}, // just drop the supplied sender, closing it. }; } } @@ -375,7 +407,7 @@ impl Syncer { /// so that there can be only one dir sync and database transaction per save. /// Internal helper for `save`. This is separated out so that the question-mark operator /// can be used in the many error paths. - fn save(&mut self, id: CompositeId, f: D::File) { + fn save(&mut self, id: CompositeId, duration: recording::Duration, f: D::File) { let stream_id = id.stream(); // Free up a like number of bytes. @@ -384,25 +416,32 @@ impl Syncer { let mut db = self.db.lock(); db.mark_synced(id).unwrap(); delete_recordings(&mut db, stream_id, 0).unwrap(); - let reason = { - let s = db.streams_by_id().get(&stream_id).unwrap(); - let c = db.cameras_by_id().get(&s.camera_id).unwrap(); - let unflushed = s.unflushed(); - if unflushed < s.flush_if { - debug!("{}-{}: unflushed={} < if={}, not flushing", - c.short_name, s.type_.as_str(), unflushed, s.flush_if); + let s = db.streams_by_id().get(&stream_id).unwrap(); + let c = db.cameras_by_id().get(&s.camera_id).unwrap(); + + // Schedule a flush. + let how_soon = Duration::seconds(s.flush_if_sec) - duration.to_tm_duration(); + let now = self.db.clocks().monotonic(); + let t = now + how_soon; + if let Some((nft, ref r)) = self.next_flush { + if nft <= t { + trace!("{}-{}: not scheduling flush in {}; there's already one in {}: {}", + c.short_name, s.type_.as_str(), how_soon, nft - now, &r); return; } - format!("{}-{}: unflushed={} >= if={}", - c.short_name, s.type_.as_str(), unflushed, s.flush_if) - }; + } + let reason = format!("{} sec after start of {} {}-{} recording", + s.flush_if_sec, duration, c.short_name, s.type_.as_str()); + trace!("scheduling flush in {} because {}", how_soon, &reason); + self.next_flush = Some((t, reason)); + } - if let Err(e) = db.flush(&reason) { - // Don't retry the commit now in case it causes extra flash write cycles. - // It's not necessary for correctness to flush before proceeding. - // Just wait until the next flush would happen naturally. - warn!("flush failure on save for reason {}; leaving unflushed for now: {:?}", - reason, e); + fn flush(&mut self, reason: &str) { + if let Err(e) = self.db.lock().flush(reason) { + let d = Duration::minutes(1); + warn!("flush failure on save for reason {}; will retry after {}: {:?}", reason, d, e); + let t = self.db.clocks().monotonic() + Duration::minutes(1); + self.next_flush = Some((t, "retry after flush failure".to_owned())); } } } @@ -639,25 +678,28 @@ impl InnerWriter { fn close(mut self, channel: &SyncerChannel, next_pts: Option) -> PreviousWriter { let unflushed = self.unflushed_sample.take().expect("should always be an unflushed sample"); - let (duration, flags) = match next_pts { + let (last_sample_duration, flags) = match next_pts { None => (self.adjuster.adjust(0), db::RecordingFlags::TrailingZero as i32), Some(p) => (self.adjuster.adjust((p - unflushed.pts_90k) as i32), 0), }; let mut sha1_bytes = [0u8; 20]; sha1_bytes.copy_from_slice(&self.hasher.finish().unwrap()[..]); let (local_time_delta, run_offset, end); - self.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time); + self.add_sample(last_sample_duration, unflushed.len, unflushed.is_key, + unflushed.local_time); + let total_duration; { let mut l = self.r.lock(); l.flags = flags; local_time_delta = self.local_start - l.start; l.local_time_delta = local_time_delta; l.sample_file_sha1 = sha1_bytes; + total_duration = recording::Duration(l.duration_90k as i64); run_offset = l.run_offset; - end = l.start + recording::Duration(l.duration_90k as i64); + end = l.start + total_duration; } drop(self.r); - channel.async_save_recording(self.id, self.f); + channel.async_save_recording(self.id, total_duration, self.f); PreviousWriter { end, local_time_delta, @@ -776,7 +818,6 @@ mod tests { } struct Harness { - //clocks: SimulatedClocks, db: Arc>, dir_id: i32, _tmpdir: ::tempdir::TempDir, @@ -801,6 +842,7 @@ mod tests { dir_id: *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap(), dir: dir.clone(), db: tdb.db.clone(), + next_flush: None, }; let (snd, rcv) = mpsc::channel(); tdb.db.lock().on_flush(Box::new({ @@ -932,10 +974,16 @@ mod tests { }))); h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new(|_| Ok(())))); h.dir.expect(MockDirAction::Sync(Box::new(|| Err(eio())))); - h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); + let (gc_done_snd, gc_done_rcv) = mpsc::channel(); + h.dir.expect(MockDirAction::Sync(Box::new(move || { + gc_done_snd.send(()).unwrap(); + Ok(()) + }))); + drop(w); - h.channel.flush(); // wait until the Save... - h.channel.flush(); // ...and the DatabaseFlush are processed. + + gc_done_rcv.recv().unwrap(); // Wait until the successful gc sync call... + h.channel.flush(); // ...and the DatabaseFlush op to complete. f.ensure_done(); h.dir.ensure_done(); } diff --git a/src/mp4.rs b/src/mp4.rs index 85cc8ef..ff485b1 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -2189,6 +2189,7 @@ mod bench { extern crate reqwest; extern crate test; + use base::clock::RealClocks; use db::recording; use db::testutil::{self, TestDb}; use futures::Stream; diff --git a/src/web.rs b/src/web.rs index 45b2593..e54cc18 100644 --- a/src/web.rs +++ b/src/web.rs @@ -564,7 +564,7 @@ mod bench { impl Server { fn new() -> Server { - let db = TestDb::new(); + let db = TestDb::new(::base::clock::RealClocks {}); let test_camera_uuid = db.test_camera_uuid; testutil::add_dummy_recordings_to_db(&db.db, 1440); let (tx, rx) = ::std::sync::mpsc::channel();