fix #64 (extraneous flushes)

Now each syncer has a binary heap of the times it plans to do a flush.
When one of those times arrives, it rechecks if there's something to do.
Seems more straightforward than rechecking each stream's first
uncommitted recording, especially with the logic to retry failed flushes
every minute.

Also improved the info! log for each flush to see the actual recordings
being flushed for better debuggability.

No new tests right now. :-( They're tricky to write. One problem is that
it's hard to get the timing right: a different flush has to happen
after Syncer::save's database operations and before Syncer::run calls
SimulatedClocks::recv_timeout with an empty channel[*], advancing the
time. I've thought of a few ways of doing this:

   * adding a new SyncerCommand to run something, but it's messy (have
     to add it from the mock of one of the actions done by the save),
     and Box<dyn FnOnce() + 'static> not working (see
     rust-lang/rust#28796) makes it especially annoying.

   * replacing SimulatedClocks with something more like MockClocks.
     Lots of boilerplate. Maybe I need to find a good general-purpose
     Rust mock library. (mockers sounds good but I want something that
     works on stable Rust.)

   * bypassing the Syncer::run loop, instead manually running iterations
     from the test.

Maybe the last way is the best for now. I'm likely to try it soon.

[*] actually, it's calling Receiver::recv_timeout directly;
Clocks::recv_timeout is dead code now? oops.
This commit is contained in:
Scott Lamb 2019-01-04 11:56:15 -08:00
parent 55fa458288
commit 1ce52e334c
5 changed files with 133 additions and 54 deletions

11
Cargo.lock generated
View File

@ -572,6 +572,14 @@ dependencies = [
"either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "itertools"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "itoa"
version = "0.4.3"
@ -822,6 +830,7 @@ dependencies = [
"blake2-rfc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)",
"failure 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)",
"libpasta 0.1.0-rc2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -834,6 +843,7 @@ dependencies = [
"protobuf 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rusqlite 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)",
"smallvec 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)",
"uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2119,6 +2129,7 @@ dependencies = [
"checksum indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d"
"checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08"
"checksum itertools 0.7.11 (registry+https://github.com/rust-lang/crates.io-index)" = "0d47946d458e94a1b7bcabbf6521ea7c037062c81f534615abcad76e84d4970d"
"checksum itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5b8467d9c1cebe26feb08c640139247fac215782d35371ade9a2136ed6085358"
"checksum itoa 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1306f3464951f30e30d12373d31c79fbd52d236e5e896fd92f96ec7babbbe60b"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum lazy_static 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "76f033c7ad61445c5b347c7382dd1237847eb1bce590fe50365dcb33d546be73"

View File

@ -8,7 +8,7 @@ edition = "2018"
# The nightly feature is used within moonfire-nvr itself to gate the
# benchmarks. Also pass it along to crates that can benefit from it.
nightly = ["db/nightly", "parking_lot/nightly"]
nightly = ["db/nightly", "parking_lot/nightly", "smallvec/union"]
# The bundled feature includes bundled (aka statically linked) versions of
# native libraries where possible.

View File

@ -28,6 +28,8 @@ parking_lot = { version = "0.7", features = [] }
protobuf = "2.0"
regex = "1.0"
rusqlite = "0.16"
smallvec = "0.6"
tempdir = "0.3"
time = "0.1"
uuid = { version = "0.7", features = ["std", "v4"] }
itertools = "0.8.0"

View File

@ -60,11 +60,13 @@ use crate::recording::{self, TIME_UNITS_PER_SEC};
use crate::schema;
use failure::{Error, bail, format_err};
use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use log::{error, info, trace};
use lru_cache::LruCache;
use openssl::hash;
use parking_lot::{Mutex,MutexGuard};
use rusqlite::types::ToSql;
use smallvec::SmallVec;
use std::collections::{BTreeMap, VecDeque};
use std::cell::RefCell;
use std::cmp;
@ -912,23 +914,23 @@ impl LockedDatabase {
tx.commit()?;
// Process delete_garbage.
let mut gced = 0;
let mut gced = SmallVec::<[CompositeId; 8]>::new();
for dir in self.sample_file_dirs_by_id.values_mut() {
gced += dir.garbage_unlinked.len();
dir.garbage_unlinked.clear();
gced.extend(dir.garbage_unlinked.drain(..));
}
let mut added = 0;
let mut deleted = 0;
let mut added = SmallVec::<[CompositeId; 8]>::new();
let mut deleted = SmallVec::<[CompositeId; 8]>::new();
for (stream_id, new_range) in new_ranges.drain() {
let s = self.streams_by_id.get_mut(&stream_id).unwrap();
let d = self.sample_file_dirs_by_id.get_mut(&s.sample_file_dir_id.unwrap()).unwrap();
// Process delete_oldest_recordings.
deleted += s.to_delete.len();
s.sample_file_bytes -= s.bytes_to_delete;
s.bytes_to_delete = 0;
deleted.reserve(s.to_delete.len());
for row in s.to_delete.drain(..) {
deleted.push(row.id);
d.garbage_needs_unlink.insert(row.id);
let d = recording::Duration(row.duration as i64);
s.duration -= d;
@ -936,11 +938,12 @@ impl LockedDatabase {
}
// Process add_recordings.
s.next_recording_id += s.synced_recordings as i32;
added += s.synced_recordings;
s.bytes_to_add = 0;
added.reserve(s.synced_recordings);
for _ in 0..s.synced_recordings {
let u = s.uncommitted.pop_front().unwrap();
added.push(CompositeId::new(stream_id, s.next_recording_id));
s.next_recording_id += 1;
let l = u.lock();
let end = l.start + recording::Duration(l.duration_90k as i64);
s.add_recording(l.start .. end, l.sample_file_bytes);
@ -951,8 +954,9 @@ impl LockedDatabase {
s.range = new_range;
}
self.auth.post_flush();
info!("Flush (why: {}): added {} recordings, deleted {}, marked {} files GCed.",
reason, added, deleted, gced);
info!("Flush (why: {}): added {} recordings ({}), deleted {} ({}), marked {} ({}) GCed.",
reason, added.len(), added.iter().join(", "), deleted.len(),
deleted.iter().join(", "), gced.len(), gced.iter().join(", "));
for cb in &self.on_flush {
cb();
}

View File

@ -41,6 +41,7 @@ use fnv::FnvHashMap;
use parking_lot::Mutex;
use log::{debug, info, trace, warn};
use openssl::hash;
use std::cmp::Ordering;
use std::cmp;
use std::io;
use std::mem;
@ -104,14 +105,47 @@ struct Syncer<C: Clocks + Clone, D: DirWriter> {
dir_id: i32,
dir: D,
db: Arc<db::Database<C>>,
/// Information about the next scheduled flush:
/// * monotonic time
/// * reason (for logging)
/// * senders to drop when this time is reached (for testing; see SyncerChannel::flush).
next_flush: Option<(Timespec, String, Vec<mpsc::SyncSender<()>>)>,
planned_flushes: std::collections::BinaryHeap<PlannedFlush>,
}
struct PlannedFlush {
/// Monotonic time at which this flush should happen.
when: Timespec,
/// Recording which prompts this flush. If this recording is already flushed at the planned
/// time, it can be skipped.
recording: CompositeId,
/// A human-readable reason for the flush, for logs.
reason: String,
/// Senders to drop when this time is reached. This is for test instrumentation; see
/// `SyncerChannel::flush`.
senders: Vec<mpsc::SyncSender<()>>,
}
// PlannedFlush is meant for placement in a max-heap which should return the soonest flush. This
// PlannedFlush is greater than other if its when is _less_ than the other's.
impl Ord for PlannedFlush {
fn cmp(&self, other: &Self) -> Ordering {
other.when.cmp(&self.when)
}
}
impl PartialOrd for PlannedFlush {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for PlannedFlush {
fn eq(&self, other: &Self) -> bool {
self.when == other.when
}
}
impl Eq for PlannedFlush {}
/// Starts a syncer for the given sample file directory.
///
/// The lock must not be held on `db` when this is called.
@ -218,7 +252,7 @@ impl<F: FileWriter> SyncerChannel<F> {
}
/// For testing: flushes the syncer, waiting for all currently-queued commands to complete,
/// including a scheduled database flush if any. Note this doesn't wait for any
/// including the next scheduled database flush (if any). Note this doesn't wait for any
/// post-database flush garbage collection.
pub fn flush(&self) {
let (snd, rcv) = mpsc::sync_channel(0);
@ -290,7 +324,7 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
dir_id,
dir,
db,
next_flush: None,
planned_flushes: std::collections::BinaryHeap::new(),
}, d.path.clone()))
}
@ -345,18 +379,14 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
fn run(&mut self, cmds: mpsc::Receiver<SyncerCommand<D::File>>) {
loop {
// Wait for a command, the next_flush timeout (if specified), or channel disconnect.
let next_flush = self.next_flush.take();
// Wait for a command, the next flush timeout (if specified), or channel disconnect.
let next_flush = self.planned_flushes.peek().map(|f| f.when);
let cmd = match next_flush {
None => match cmds.recv() {
Err(_) => return, // all cmd senders are gone.
Ok(cmd) => cmd,
},
Some((t, r, flush_senders)) => {
// Note: `flush_senders` will be dropped on exit from this block if left
// unmoved, which has the desired behavior of closing the channels and
// notifying the receivers the flush occurred.
Some(t) => {
let now = self.db.clocks().monotonic();
// Calculate the timeout to use, mapping negative durations to 0.
@ -364,13 +394,10 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
match cmds.recv_timeout(timeout) {
Err(mpsc::RecvTimeoutError::Disconnected) => return, // cmd senders gone.
Err(mpsc::RecvTimeoutError::Timeout) => {
self.flush(&r);
self.flush();
continue
},
Ok(cmd) => {
self.next_flush = Some((t, r, flush_senders));
cmd
},
Ok(cmd) => cmd,
}
},
};
@ -382,8 +409,8 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
SyncerCommand::Flush(flush) => {
// The sender is waiting for the supplied writer to be dropped. If there's no
// timeout, do so immediately; otherwise wait for that timeout then drop it.
if let Some((_, _, ref mut flushes)) = self.next_flush {
flushes.push(flush);
if let Some(mut f) = self.planned_flushes.peek_mut() {
f.senders.push(flush);
}
},
};
@ -438,27 +465,65 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
// Schedule a flush.
let how_soon = Duration::seconds(s.flush_if_sec) - duration.to_tm_duration();
let now = self.db.clocks().monotonic();
let t = now + how_soon;
if let Some((nft, ref r, _)) = self.next_flush {
if nft <= t {
trace!("{}-{}: not scheduling flush in {}; there's already one in {}: {}",
c.short_name, s.type_.as_str(), how_soon, nft - now, &r);
return;
}
}
let reason = format!("{} sec after start of {} {}-{} recording",
s.flush_if_sec, duration, c.short_name, s.type_.as_str());
let when = now + how_soon;
let reason = format!("{} sec after start of {} {}-{} recording {}",
s.flush_if_sec, duration, c.short_name, s.type_.as_str(), id);
trace!("scheduling flush in {} because {}", how_soon, &reason);
self.next_flush = Some((t, reason, Vec::new()));
self.planned_flushes.push(PlannedFlush {
when,
reason,
recording: id,
senders: Vec::new(),
});
}
fn flush(&mut self, reason: &str) {
if let Err(e) = self.db.lock().flush(reason) {
let d = Duration::minutes(1);
warn!("flush failure on save for reason {}; will retry after {}: {:?}", reason, d, e);
let t = self.db.clocks().monotonic() + Duration::minutes(1);
self.next_flush = Some((t, "retry after flush failure".to_owned(), Vec::new()));
fn flush(&mut self) {
let mut l = self.db.lock();
// Look through the planned flushes and see if any are still relevant. It's possible
// they're not because something else (e.g., a syncer for a different sample file dir)
// has flushed the database in the meantime.
use std::collections::binary_heap::PeekMut;
while let Some(f) = self.planned_flushes.peek_mut() {
let s = match l.streams_by_id().get(&f.recording.stream()) {
Some(s) => s,
None => {
// Removing streams while running hasn't been implemented yet, so this should
// be impossible.
warn!("bug: no stream for {} which was scheduled to be flushed", f.recording);
PeekMut::pop(f);
continue;
}
};
if s.next_recording_id <= f.recording.recording() { // not yet committed.
break;
}
trace!("planned flush ({}) no longer needed", &f.reason);
PeekMut::pop(f);
}
// If there's anything left to do now, try to flush.
let f = match self.planned_flushes.peek() {
None => return,
Some(f) => f,
};
let now = self.db.clocks().monotonic();
if f.when > now {
return;
}
if let Err(e) = l.flush(&f.reason) {
let d = Duration::minutes(1);
warn!("flush failure on save for reason {}; will retry after {}: {:?}",
f.reason, d, e);
self.planned_flushes.peek_mut().expect("planned_flushes is non-empty").when =
self.db.clocks().monotonic() + Duration::minutes(1);
return;
}
// A successful flush should take care of everything planned.
self.planned_flushes.clear();
}
}
@ -861,7 +926,7 @@ mod tests {
dir_id: *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap(),
dir: dir.clone(),
db: tdb.db.clone(),
next_flush: None,
planned_flushes: std::collections::BinaryHeap::new(),
};
let (snd, rcv) = mpsc::channel();
tdb.db.lock().on_flush(Box::new({
@ -875,7 +940,6 @@ mod tests {
.spawn(move || syncer.run(rcv)).unwrap();
Harness {
//clocks,
dir_id,
dir,
db: tdb.db,
@ -941,8 +1005,6 @@ mod tests {
gc_done_snd.send(()).unwrap();
Ok(())
})));
//h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new(|_| Ok(()))));
//h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
drop(w);