refine flush_if_sec behavior

The new behavior eliminates a couple unpleasant edge cases in which it
would never flush:

* if all recording stops, whatever was unflushed would stay that way
* if every recording attempt produces a 0-duration recording (such as if the
  camera sends only one frame and thus no PTS delta can be calculated),
  the list of recordings to flush would continue to grow
This commit is contained in:
Scott Lamb 2018-03-23 15:16:43 -07:00
parent addeb9d2f6
commit 91636d3193
7 changed files with 124 additions and 60 deletions

View File

@ -34,8 +34,9 @@ use failure::Error;
use libc; use libc;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::mem; use std::mem;
use std::sync::Arc; use std::sync::{Arc, mpsc};
use std::thread; use std::thread;
use std::time::Duration as StdDuration;
use time::{Duration, Timespec}; use time::{Duration, Timespec};
/// Abstract interface to the system clocks. This is for testability. /// Abstract interface to the system clocks. This is for testability.
@ -48,9 +49,14 @@ pub trait Clocks : Send + Sync + 'static {
/// Causes the current thread to sleep for the specified time. /// Causes the current thread to sleep for the specified time.
fn sleep(&self, how_long: Duration); fn sleep(&self, how_long: Duration);
/// Calls `rcv.recv_timeout` or substitutes a test implementation.
fn recv_timeout<T>(&self, rcv: &mpsc::Receiver<T>,
timeout: StdDuration) -> Result<T, mpsc::RecvTimeoutError>;
} }
pub fn retry_forever<T, E: Into<Error>>(clocks: &Clocks, f: &mut FnMut() -> Result<T, E>) -> T { pub fn retry_forever<C, T, E>(clocks: &C, f: &mut FnMut() -> Result<T, E>) -> T
where C: Clocks, E: Into<Error> {
loop { loop {
let e = match f() { let e = match f() {
Ok(t) => return t, Ok(t) => return t,
@ -85,6 +91,11 @@ impl Clocks for RealClocks {
Err(e) => warn!("Invalid duration {:?}: {}", how_long, e), Err(e) => warn!("Invalid duration {:?}: {}", how_long, e),
}; };
} }
fn recv_timeout<T>(&self, rcv: &mpsc::Receiver<T>,
timeout: StdDuration) -> Result<T, mpsc::RecvTimeoutError> {
rcv.recv_timeout(timeout)
}
} }
/// Logs a warning if the TimerGuard lives "too long", using the label created by a supplied /// Logs a warning if the TimerGuard lives "too long", using the label created by a supplied
@ -143,4 +154,14 @@ impl Clocks for SimulatedClocks {
let mut l = self.0.uptime.lock(); let mut l = self.0.uptime.lock();
*l = *l + how_long; *l = *l + how_long;
} }
/// Advances the clock by the specified amount if data is not immediately available.
fn recv_timeout<T>(&self, rcv: &mpsc::Receiver<T>,
timeout: StdDuration) -> Result<T, mpsc::RecvTimeoutError> {
let r = rcv.recv_timeout(StdDuration::new(0, 0));
if let Err(_) = r {
self.sleep(Duration::from_std(timeout).unwrap());
}
r
}
} }

View File

@ -367,9 +367,6 @@ pub struct Stream {
pub retain_bytes: i64, pub retain_bytes: i64,
pub flush_if_sec: i64, pub flush_if_sec: i64,
/// `flush_if_sec` converted to a duration for convenience.
pub flush_if: recording::Duration,
/// The time range of recorded data associated with this stream (minimum start time and maximum /// The time range of recorded data associated with this stream (minimum start time and maximum
/// end time). `None` iff there are no recordings for this camera. /// end time). `None` iff there are no recordings for this camera.
pub range: Option<Range<recording::Time>>, pub range: Option<Range<recording::Time>>,
@ -409,19 +406,6 @@ pub struct Stream {
synced_recordings: usize, synced_recordings: usize,
} }
impl Stream {
/// Returns the duration of synced but uncommitted recordings for the given stream.
/// Note recordings must be flushed in order, so a recording is considered unsynced if any
/// before it are unsynced.
pub(crate) fn unflushed(&self) -> recording::Duration {
let mut sum = 0;
for i in 0..self.synced_recordings {
sum += self.uncommitted[i].lock().duration_90k as i64;
}
recording::Duration(sum)
}
}
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
pub struct StreamChange { pub struct StreamChange {
pub sample_file_dir_id: Option<i32>, pub sample_file_dir_id: Option<i32>,
@ -675,8 +659,6 @@ impl StreamStateChanger {
rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()), rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()),
record: sc.record, record: sc.record,
flush_if_sec: sc.flush_if_sec, flush_if_sec: sc.flush_if_sec,
flush_if: recording::Duration(sc.flush_if_sec *
recording::TIME_UNITS_PER_SEC),
..s ..s
}))); })));
} }
@ -711,7 +693,6 @@ impl StreamStateChanger {
rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()), rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()),
retain_bytes: 0, retain_bytes: 0,
flush_if_sec: sc.flush_if_sec, flush_if_sec: sc.flush_if_sec,
flush_if: recording::Duration(sc.flush_if_sec * recording::TIME_UNITS_PER_SEC),
range: None, range: None,
sample_file_bytes: 0, sample_file_bytes: 0,
to_delete: Vec::new(), to_delete: Vec::new(),
@ -917,7 +898,7 @@ impl LockedDatabase {
// Fix the range. // Fix the range.
s.range = new_range; s.range = new_range;
} }
info!("Flush due to {}: added {} recordings, deleted {}, marked {} files GCed.", info!("Flush (why: {}): added {} recordings, deleted {}, marked {} files GCed.",
reason, added, deleted, gced); reason, added, deleted, gced);
for cb in &self.on_flush { for cb in &self.on_flush {
cb(); cb();
@ -1387,7 +1368,6 @@ impl LockedDatabase {
rtsp_path: row.get_checked(4)?, rtsp_path: row.get_checked(4)?,
retain_bytes: row.get_checked(5)?, retain_bytes: row.get_checked(5)?,
flush_if_sec, flush_if_sec,
flush_if: recording::Duration(flush_if_sec * recording::TIME_UNITS_PER_SEC),
range: None, range: None,
sample_file_bytes: 0, sample_file_bytes: 0,
to_delete: Vec::new(), to_delete: Vec::new(),

View File

@ -156,6 +156,12 @@ impl fmt::Display for Time {
#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)] #[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
pub struct Duration(pub i64); pub struct Duration(pub i64);
impl Duration {
pub fn to_tm_duration(&self) -> time::Duration {
time::Duration::nanoseconds(self.0 * 100000 / 9)
}
}
impl fmt::Display for Duration { impl fmt::Display for Duration {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut seconds = self.0 / TIME_UNITS_PER_SEC; let mut seconds = self.0 / TIME_UNITS_PER_SEC;

View File

@ -123,9 +123,17 @@ create table stream (
-- file. Older files will be deleted as necessary to stay within this limit. -- file. Older files will be deleted as necessary to stay within this limit.
retain_bytes integer not null check (retain_bytes >= 0), retain_bytes integer not null check (retain_bytes >= 0),
-- Flush the database when completing a recording if this stream has at -- Flush the database when the first instant of completed recording is this
-- least this many seconds of unflushed recordings. A value of 0 means that -- many seconds old. A value of 0 means that every completed recording will
-- every completed recording will cause a flush. -- cause an immediate flush. Higher values may allow flushes to be combined,
-- reducing SSD write cycles. For example, if all streams have a flush_if_sec
-- >= x sec, there will be:
--
-- * at most one flush per x sec in total
-- * at most x sec of completed but unflushed recordings per stream.
-- * at most x completed but unflushed recordings per stream, in the worst
-- case where a recording instantly fails, waits the 1-second retry delay,
-- then fails again, forever.
flush_if_sec integer not null, flush_if_sec integer not null,
-- The low 32 bits of the next recording id to assign for this stream. -- The low 32 bits of the next recording id to assign for this stream.

View File

@ -47,6 +47,8 @@ use std::os::unix::ffi::OsStrExt;
use std::sync::Arc; use std::sync::Arc;
use std::sync::mpsc; use std::sync::mpsc;
use std::thread; use std::thread;
use std::time::Duration as StdDuration;
use time::{Duration, Timespec};
pub trait DirWriter : 'static + Send { pub trait DirWriter : 'static + Send {
type File : FileWriter; type File : FileWriter;
@ -83,7 +85,7 @@ impl FileWriter for ::std::fs::File {
/// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct. /// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct.
enum SyncerCommand<F> { enum SyncerCommand<F> {
AsyncSaveRecording(CompositeId, F), AsyncSaveRecording(CompositeId, recording::Duration, F),
DatabaseFlushed, DatabaseFlushed,
Flush(mpsc::SyncSender<()>), Flush(mpsc::SyncSender<()>),
} }
@ -101,6 +103,9 @@ struct Syncer<C: Clocks + Clone, D: DirWriter> {
dir_id: i32, dir_id: i32,
dir: D, dir: D,
db: Arc<db::Database<C>>, db: Arc<db::Database<C>>,
/// Information about the next scheduled flush: monotonic time and reason.
next_flush: Option<(Timespec, String)>,
} }
/// Starts a syncer for the given sample file directory. /// Starts a syncer for the given sample file directory.
@ -204,8 +209,8 @@ fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32,
impl<F: FileWriter> SyncerChannel<F> { impl<F: FileWriter> SyncerChannel<F> {
/// Asynchronously syncs the given writer, closes it, records it into the database, and /// Asynchronously syncs the given writer, closes it, records it into the database, and
/// starts rotation. /// starts rotation.
fn async_save_recording(&self, id: CompositeId, f: F) { fn async_save_recording(&self, id: CompositeId, duration: recording::Duration, f: F) {
self.0.send(SyncerCommand::AsyncSaveRecording(id, f)).unwrap(); self.0.send(SyncerCommand::AsyncSaveRecording(id, duration, f)).unwrap();
} }
/// For testing: flushes the syncer, waiting for all currently-queued commands to complete. /// For testing: flushes the syncer, waiting for all currently-queued commands to complete.
@ -279,6 +284,7 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
dir_id, dir_id,
dir, dir,
db, db,
next_flush: None,
}, d.path.clone())) }, d.path.clone()))
} }
@ -333,11 +339,37 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> { impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
fn run(&mut self, cmds: mpsc::Receiver<SyncerCommand<D::File>>) { fn run(&mut self, cmds: mpsc::Receiver<SyncerCommand<D::File>>) {
loop { loop {
match cmds.recv() { // Wait for a command, the next_flush timeout (if specified), or channel disconnect.
Err(_) => return, // all senders have closed the channel; shutdown let next_flush = self.next_flush.take();
Ok(SyncerCommand::AsyncSaveRecording(id, f)) => self.save(id, f), let cmd = match next_flush {
Ok(SyncerCommand::DatabaseFlushed) => self.collect_garbage(), None => match cmds.recv() {
Ok(SyncerCommand::Flush(_)) => {}, // just drop the supplied sender, closing it. Err(_) => return, // all senders are gone.
Ok(cmd) => cmd,
},
Some((t, r)) => {
let now = self.db.clocks().monotonic();
// Calculate the timeout to use, mapping negative durations to 0.
let timeout = (t - now).to_std().unwrap_or(StdDuration::new(0, 0));
match cmds.recv_timeout(timeout) {
Err(mpsc::RecvTimeoutError::Disconnected) => return, // all senders gone.
Err(mpsc::RecvTimeoutError::Timeout) => {
self.flush(&r);
continue
},
Ok(cmd) => {
self.next_flush = Some((t, r));
cmd
},
}
},
};
// Have a command; handle it.
match cmd {
SyncerCommand::AsyncSaveRecording(id, dur, f) => self.save(id, dur, f),
SyncerCommand::DatabaseFlushed => self.collect_garbage(),
SyncerCommand::Flush(_) => {}, // just drop the supplied sender, closing it.
}; };
} }
} }
@ -375,7 +407,7 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
/// so that there can be only one dir sync and database transaction per save. /// so that there can be only one dir sync and database transaction per save.
/// Internal helper for `save`. This is separated out so that the question-mark operator /// Internal helper for `save`. This is separated out so that the question-mark operator
/// can be used in the many error paths. /// can be used in the many error paths.
fn save(&mut self, id: CompositeId, f: D::File) { fn save(&mut self, id: CompositeId, duration: recording::Duration, f: D::File) {
let stream_id = id.stream(); let stream_id = id.stream();
// Free up a like number of bytes. // Free up a like number of bytes.
@ -384,25 +416,32 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
let mut db = self.db.lock(); let mut db = self.db.lock();
db.mark_synced(id).unwrap(); db.mark_synced(id).unwrap();
delete_recordings(&mut db, stream_id, 0).unwrap(); delete_recordings(&mut db, stream_id, 0).unwrap();
let reason = {
let s = db.streams_by_id().get(&stream_id).unwrap(); let s = db.streams_by_id().get(&stream_id).unwrap();
let c = db.cameras_by_id().get(&s.camera_id).unwrap(); let c = db.cameras_by_id().get(&s.camera_id).unwrap();
let unflushed = s.unflushed();
if unflushed < s.flush_if { // Schedule a flush.
debug!("{}-{}: unflushed={} < if={}, not flushing", let how_soon = Duration::seconds(s.flush_if_sec) - duration.to_tm_duration();
c.short_name, s.type_.as_str(), unflushed, s.flush_if); let now = self.db.clocks().monotonic();
let t = now + how_soon;
if let Some((nft, ref r)) = self.next_flush {
if nft <= t {
trace!("{}-{}: not scheduling flush in {}; there's already one in {}: {}",
c.short_name, s.type_.as_str(), how_soon, nft - now, &r);
return; return;
} }
format!("{}-{}: unflushed={} >= if={}", }
c.short_name, s.type_.as_str(), unflushed, s.flush_if) let reason = format!("{} sec after start of {} {}-{} recording",
}; s.flush_if_sec, duration, c.short_name, s.type_.as_str());
trace!("scheduling flush in {} because {}", how_soon, &reason);
self.next_flush = Some((t, reason));
}
if let Err(e) = db.flush(&reason) { fn flush(&mut self, reason: &str) {
// Don't retry the commit now in case it causes extra flash write cycles. if let Err(e) = self.db.lock().flush(reason) {
// It's not necessary for correctness to flush before proceeding. let d = Duration::minutes(1);
// Just wait until the next flush would happen naturally. warn!("flush failure on save for reason {}; will retry after {}: {:?}", reason, d, e);
warn!("flush failure on save for reason {}; leaving unflushed for now: {:?}", let t = self.db.clocks().monotonic() + Duration::minutes(1);
reason, e); self.next_flush = Some((t, "retry after flush failure".to_owned()));
} }
} }
} }
@ -639,25 +678,28 @@ impl<F: FileWriter> InnerWriter<F> {
fn close(mut self, channel: &SyncerChannel<F>, next_pts: Option<i64>) -> PreviousWriter { fn close(mut self, channel: &SyncerChannel<F>, next_pts: Option<i64>) -> PreviousWriter {
let unflushed = self.unflushed_sample.take().expect("should always be an unflushed sample"); let unflushed = self.unflushed_sample.take().expect("should always be an unflushed sample");
let (duration, flags) = match next_pts { let (last_sample_duration, flags) = match next_pts {
None => (self.adjuster.adjust(0), db::RecordingFlags::TrailingZero as i32), None => (self.adjuster.adjust(0), db::RecordingFlags::TrailingZero as i32),
Some(p) => (self.adjuster.adjust((p - unflushed.pts_90k) as i32), 0), Some(p) => (self.adjuster.adjust((p - unflushed.pts_90k) as i32), 0),
}; };
let mut sha1_bytes = [0u8; 20]; let mut sha1_bytes = [0u8; 20];
sha1_bytes.copy_from_slice(&self.hasher.finish().unwrap()[..]); sha1_bytes.copy_from_slice(&self.hasher.finish().unwrap()[..]);
let (local_time_delta, run_offset, end); let (local_time_delta, run_offset, end);
self.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time); self.add_sample(last_sample_duration, unflushed.len, unflushed.is_key,
unflushed.local_time);
let total_duration;
{ {
let mut l = self.r.lock(); let mut l = self.r.lock();
l.flags = flags; l.flags = flags;
local_time_delta = self.local_start - l.start; local_time_delta = self.local_start - l.start;
l.local_time_delta = local_time_delta; l.local_time_delta = local_time_delta;
l.sample_file_sha1 = sha1_bytes; l.sample_file_sha1 = sha1_bytes;
total_duration = recording::Duration(l.duration_90k as i64);
run_offset = l.run_offset; run_offset = l.run_offset;
end = l.start + recording::Duration(l.duration_90k as i64); end = l.start + total_duration;
} }
drop(self.r); drop(self.r);
channel.async_save_recording(self.id, self.f); channel.async_save_recording(self.id, total_duration, self.f);
PreviousWriter { PreviousWriter {
end, end,
local_time_delta, local_time_delta,
@ -776,7 +818,6 @@ mod tests {
} }
struct Harness { struct Harness {
//clocks: SimulatedClocks,
db: Arc<db::Database<SimulatedClocks>>, db: Arc<db::Database<SimulatedClocks>>,
dir_id: i32, dir_id: i32,
_tmpdir: ::tempdir::TempDir, _tmpdir: ::tempdir::TempDir,
@ -801,6 +842,7 @@ mod tests {
dir_id: *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap(), dir_id: *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap(),
dir: dir.clone(), dir: dir.clone(),
db: tdb.db.clone(), db: tdb.db.clone(),
next_flush: None,
}; };
let (snd, rcv) = mpsc::channel(); let (snd, rcv) = mpsc::channel();
tdb.db.lock().on_flush(Box::new({ tdb.db.lock().on_flush(Box::new({
@ -932,10 +974,16 @@ mod tests {
}))); })));
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new(|_| Ok(())))); h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new(|_| Ok(()))));
h.dir.expect(MockDirAction::Sync(Box::new(|| Err(eio())))); h.dir.expect(MockDirAction::Sync(Box::new(|| Err(eio()))));
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); let (gc_done_snd, gc_done_rcv) = mpsc::channel();
h.dir.expect(MockDirAction::Sync(Box::new(move || {
gc_done_snd.send(()).unwrap();
Ok(())
})));
drop(w); drop(w);
h.channel.flush(); // wait until the Save...
h.channel.flush(); // ...and the DatabaseFlush are processed. gc_done_rcv.recv().unwrap(); // Wait until the successful gc sync call...
h.channel.flush(); // ...and the DatabaseFlush op to complete.
f.ensure_done(); f.ensure_done();
h.dir.ensure_done(); h.dir.ensure_done();
} }

View File

@ -2189,6 +2189,7 @@ mod bench {
extern crate reqwest; extern crate reqwest;
extern crate test; extern crate test;
use base::clock::RealClocks;
use db::recording; use db::recording;
use db::testutil::{self, TestDb}; use db::testutil::{self, TestDb};
use futures::Stream; use futures::Stream;

View File

@ -564,7 +564,7 @@ mod bench {
impl Server { impl Server {
fn new() -> Server { fn new() -> Server {
let db = TestDb::new(); let db = TestDb::new(::base::clock::RealClocks {});
let test_camera_uuid = db.test_camera_uuid; let test_camera_uuid = db.test_camera_uuid;
testutil::add_dummy_recordings_to_db(&db.db, 1440); testutil::add_dummy_recordings_to_db(&db.db, 1440);
let (tx, rx) = ::std::sync::mpsc::channel(); let (tx, rx) = ::std::sync::mpsc::channel();