fix #64 (extraneous flushes)

Now each syncer has a binary heap of the times it plans to do a flush.
When one of those times arrives, it rechecks if there's something to do.
Seems more straightforward than rechecking each stream's first
uncommitted recording, especially with the logic to retry failed flushes
every minute.

Also improved the info! log for each flush to see the actual recordings
being flushed for better debuggability.

No new tests right now. :-( They're tricky to write. One problem is that
it's hard to get the timing right: a different flush has to happen
after Syncer::save's database operations and before Syncer::run calls
SimulatedClocks::recv_timeout with an empty channel[*], advancing the
time. I've thought of a few ways of doing this:

   * adding a new SyncerCommand to run something, but it's messy (have
     to add it from the mock of one of the actions done by the save),
     and Box<dyn FnOnce() + 'static> not working (see
     rust-lang/rust#28796) makes it especially annoying.

   * replacing SimulatedClocks with something more like MockClocks.
     Lots of boilerplate. Maybe I need to find a good general-purpose
     Rust mock library. (mockers sounds good but I want something that
     works on stable Rust.)

   * bypassing the Syncer::run loop, instead manually running iterations
     from the test.

Maybe the last way is the best for now. I'm likely to try it soon.

[*] actually, it's calling Receiver::recv_timeout directly;
Clocks::recv_timeout is dead code now? oops.
This commit is contained in:
Scott Lamb 2019-01-04 11:56:15 -08:00
parent 55fa458288
commit 1ce52e334c
5 changed files with 133 additions and 54 deletions

11
Cargo.lock generated
View File

@ -572,6 +572,14 @@ dependencies = [
"either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "itertools"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "itoa" name = "itoa"
version = "0.4.3" version = "0.4.3"
@ -822,6 +830,7 @@ dependencies = [
"blake2-rfc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)", "blake2-rfc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)",
"failure 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)",
"libpasta 0.1.0-rc2 (registry+https://github.com/rust-lang/crates.io-index)", "libpasta 0.1.0-rc2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -834,6 +843,7 @@ dependencies = [
"protobuf 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "regex 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rusqlite 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", "rusqlite 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)",
"smallvec 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)",
"uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2119,6 +2129,7 @@ dependencies = [
"checksum indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d" "checksum indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d"
"checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08" "checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08"
"checksum itertools 0.7.11 (registry+https://github.com/rust-lang/crates.io-index)" = "0d47946d458e94a1b7bcabbf6521ea7c037062c81f534615abcad76e84d4970d" "checksum itertools 0.7.11 (registry+https://github.com/rust-lang/crates.io-index)" = "0d47946d458e94a1b7bcabbf6521ea7c037062c81f534615abcad76e84d4970d"
"checksum itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5b8467d9c1cebe26feb08c640139247fac215782d35371ade9a2136ed6085358"
"checksum itoa 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1306f3464951f30e30d12373d31c79fbd52d236e5e896fd92f96ec7babbbe60b" "checksum itoa 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1306f3464951f30e30d12373d31c79fbd52d236e5e896fd92f96ec7babbbe60b"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum lazy_static 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "76f033c7ad61445c5b347c7382dd1237847eb1bce590fe50365dcb33d546be73" "checksum lazy_static 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "76f033c7ad61445c5b347c7382dd1237847eb1bce590fe50365dcb33d546be73"

View File

@ -8,7 +8,7 @@ edition = "2018"
# The nightly feature is used within moonfire-nvr itself to gate the # The nightly feature is used within moonfire-nvr itself to gate the
# benchmarks. Also pass it along to crates that can benefit from it. # benchmarks. Also pass it along to crates that can benefit from it.
nightly = ["db/nightly", "parking_lot/nightly"] nightly = ["db/nightly", "parking_lot/nightly", "smallvec/union"]
# The bundled feature includes bundled (aka statically linked) versions of # The bundled feature includes bundled (aka statically linked) versions of
# native libraries where possible. # native libraries where possible.

View File

@ -28,6 +28,8 @@ parking_lot = { version = "0.7", features = [] }
protobuf = "2.0" protobuf = "2.0"
regex = "1.0" regex = "1.0"
rusqlite = "0.16" rusqlite = "0.16"
smallvec = "0.6"
tempdir = "0.3" tempdir = "0.3"
time = "0.1" time = "0.1"
uuid = { version = "0.7", features = ["std", "v4"] } uuid = { version = "0.7", features = ["std", "v4"] }
itertools = "0.8.0"

View File

@ -60,11 +60,13 @@ use crate::recording::{self, TIME_UNITS_PER_SEC};
use crate::schema; use crate::schema;
use failure::{Error, bail, format_err}; use failure::{Error, bail, format_err};
use fnv::{FnvHashMap, FnvHashSet}; use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use log::{error, info, trace}; use log::{error, info, trace};
use lru_cache::LruCache; use lru_cache::LruCache;
use openssl::hash; use openssl::hash;
use parking_lot::{Mutex,MutexGuard}; use parking_lot::{Mutex,MutexGuard};
use rusqlite::types::ToSql; use rusqlite::types::ToSql;
use smallvec::SmallVec;
use std::collections::{BTreeMap, VecDeque}; use std::collections::{BTreeMap, VecDeque};
use std::cell::RefCell; use std::cell::RefCell;
use std::cmp; use std::cmp;
@ -912,23 +914,23 @@ impl LockedDatabase {
tx.commit()?; tx.commit()?;
// Process delete_garbage. // Process delete_garbage.
let mut gced = 0; let mut gced = SmallVec::<[CompositeId; 8]>::new();
for dir in self.sample_file_dirs_by_id.values_mut() { for dir in self.sample_file_dirs_by_id.values_mut() {
gced += dir.garbage_unlinked.len(); gced.extend(dir.garbage_unlinked.drain(..));
dir.garbage_unlinked.clear();
} }
let mut added = 0; let mut added = SmallVec::<[CompositeId; 8]>::new();
let mut deleted = 0; let mut deleted = SmallVec::<[CompositeId; 8]>::new();
for (stream_id, new_range) in new_ranges.drain() { for (stream_id, new_range) in new_ranges.drain() {
let s = self.streams_by_id.get_mut(&stream_id).unwrap(); let s = self.streams_by_id.get_mut(&stream_id).unwrap();
let d = self.sample_file_dirs_by_id.get_mut(&s.sample_file_dir_id.unwrap()).unwrap(); let d = self.sample_file_dirs_by_id.get_mut(&s.sample_file_dir_id.unwrap()).unwrap();
// Process delete_oldest_recordings. // Process delete_oldest_recordings.
deleted += s.to_delete.len();
s.sample_file_bytes -= s.bytes_to_delete; s.sample_file_bytes -= s.bytes_to_delete;
s.bytes_to_delete = 0; s.bytes_to_delete = 0;
deleted.reserve(s.to_delete.len());
for row in s.to_delete.drain(..) { for row in s.to_delete.drain(..) {
deleted.push(row.id);
d.garbage_needs_unlink.insert(row.id); d.garbage_needs_unlink.insert(row.id);
let d = recording::Duration(row.duration as i64); let d = recording::Duration(row.duration as i64);
s.duration -= d; s.duration -= d;
@ -936,11 +938,12 @@ impl LockedDatabase {
} }
// Process add_recordings. // Process add_recordings.
s.next_recording_id += s.synced_recordings as i32;
added += s.synced_recordings;
s.bytes_to_add = 0; s.bytes_to_add = 0;
added.reserve(s.synced_recordings);
for _ in 0..s.synced_recordings { for _ in 0..s.synced_recordings {
let u = s.uncommitted.pop_front().unwrap(); let u = s.uncommitted.pop_front().unwrap();
added.push(CompositeId::new(stream_id, s.next_recording_id));
s.next_recording_id += 1;
let l = u.lock(); let l = u.lock();
let end = l.start + recording::Duration(l.duration_90k as i64); let end = l.start + recording::Duration(l.duration_90k as i64);
s.add_recording(l.start .. end, l.sample_file_bytes); s.add_recording(l.start .. end, l.sample_file_bytes);
@ -951,8 +954,9 @@ impl LockedDatabase {
s.range = new_range; s.range = new_range;
} }
self.auth.post_flush(); self.auth.post_flush();
info!("Flush (why: {}): added {} recordings, deleted {}, marked {} files GCed.", info!("Flush (why: {}): added {} recordings ({}), deleted {} ({}), marked {} ({}) GCed.",
reason, added, deleted, gced); reason, added.len(), added.iter().join(", "), deleted.len(),
deleted.iter().join(", "), gced.len(), gced.iter().join(", "));
for cb in &self.on_flush { for cb in &self.on_flush {
cb(); cb();
} }

View File

@ -41,6 +41,7 @@ use fnv::FnvHashMap;
use parking_lot::Mutex; use parking_lot::Mutex;
use log::{debug, info, trace, warn}; use log::{debug, info, trace, warn};
use openssl::hash; use openssl::hash;
use std::cmp::Ordering;
use std::cmp; use std::cmp;
use std::io; use std::io;
use std::mem; use std::mem;
@ -104,14 +105,47 @@ struct Syncer<C: Clocks + Clone, D: DirWriter> {
dir_id: i32, dir_id: i32,
dir: D, dir: D,
db: Arc<db::Database<C>>, db: Arc<db::Database<C>>,
planned_flushes: std::collections::BinaryHeap<PlannedFlush>,
/// Information about the next scheduled flush:
/// * monotonic time
/// * reason (for logging)
/// * senders to drop when this time is reached (for testing; see SyncerChannel::flush).
next_flush: Option<(Timespec, String, Vec<mpsc::SyncSender<()>>)>,
} }
struct PlannedFlush {
/// Monotonic time at which this flush should happen.
when: Timespec,
/// Recording which prompts this flush. If this recording is already flushed at the planned
/// time, it can be skipped.
recording: CompositeId,
/// A human-readable reason for the flush, for logs.
reason: String,
/// Senders to drop when this time is reached. This is for test instrumentation; see
/// `SyncerChannel::flush`.
senders: Vec<mpsc::SyncSender<()>>,
}
// PlannedFlush is meant for placement in a max-heap which should return the soonest flush. This
// PlannedFlush is greater than other if its when is _less_ than the other's.
impl Ord for PlannedFlush {
fn cmp(&self, other: &Self) -> Ordering {
other.when.cmp(&self.when)
}
}
impl PartialOrd for PlannedFlush {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for PlannedFlush {
fn eq(&self, other: &Self) -> bool {
self.when == other.when
}
}
impl Eq for PlannedFlush {}
/// Starts a syncer for the given sample file directory. /// Starts a syncer for the given sample file directory.
/// ///
/// The lock must not be held on `db` when this is called. /// The lock must not be held on `db` when this is called.
@ -218,7 +252,7 @@ impl<F: FileWriter> SyncerChannel<F> {
} }
/// For testing: flushes the syncer, waiting for all currently-queued commands to complete, /// For testing: flushes the syncer, waiting for all currently-queued commands to complete,
/// including a scheduled database flush if any. Note this doesn't wait for any /// including the next scheduled database flush (if any). Note this doesn't wait for any
/// post-database flush garbage collection. /// post-database flush garbage collection.
pub fn flush(&self) { pub fn flush(&self) {
let (snd, rcv) = mpsc::sync_channel(0); let (snd, rcv) = mpsc::sync_channel(0);
@ -290,7 +324,7 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
dir_id, dir_id,
dir, dir,
db, db,
next_flush: None, planned_flushes: std::collections::BinaryHeap::new(),
}, d.path.clone())) }, d.path.clone()))
} }
@ -345,18 +379,14 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> { impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
fn run(&mut self, cmds: mpsc::Receiver<SyncerCommand<D::File>>) { fn run(&mut self, cmds: mpsc::Receiver<SyncerCommand<D::File>>) {
loop { loop {
// Wait for a command, the next_flush timeout (if specified), or channel disconnect. // Wait for a command, the next flush timeout (if specified), or channel disconnect.
let next_flush = self.next_flush.take(); let next_flush = self.planned_flushes.peek().map(|f| f.when);
let cmd = match next_flush { let cmd = match next_flush {
None => match cmds.recv() { None => match cmds.recv() {
Err(_) => return, // all cmd senders are gone. Err(_) => return, // all cmd senders are gone.
Ok(cmd) => cmd, Ok(cmd) => cmd,
}, },
Some((t, r, flush_senders)) => { Some(t) => {
// Note: `flush_senders` will be dropped on exit from this block if left
// unmoved, which has the desired behavior of closing the channels and
// notifying the receivers the flush occurred.
let now = self.db.clocks().monotonic(); let now = self.db.clocks().monotonic();
// Calculate the timeout to use, mapping negative durations to 0. // Calculate the timeout to use, mapping negative durations to 0.
@ -364,13 +394,10 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
match cmds.recv_timeout(timeout) { match cmds.recv_timeout(timeout) {
Err(mpsc::RecvTimeoutError::Disconnected) => return, // cmd senders gone. Err(mpsc::RecvTimeoutError::Disconnected) => return, // cmd senders gone.
Err(mpsc::RecvTimeoutError::Timeout) => { Err(mpsc::RecvTimeoutError::Timeout) => {
self.flush(&r); self.flush();
continue continue
}, },
Ok(cmd) => { Ok(cmd) => cmd,
self.next_flush = Some((t, r, flush_senders));
cmd
},
} }
}, },
}; };
@ -382,8 +409,8 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
SyncerCommand::Flush(flush) => { SyncerCommand::Flush(flush) => {
// The sender is waiting for the supplied writer to be dropped. If there's no // The sender is waiting for the supplied writer to be dropped. If there's no
// timeout, do so immediately; otherwise wait for that timeout then drop it. // timeout, do so immediately; otherwise wait for that timeout then drop it.
if let Some((_, _, ref mut flushes)) = self.next_flush { if let Some(mut f) = self.planned_flushes.peek_mut() {
flushes.push(flush); f.senders.push(flush);
} }
}, },
}; };
@ -438,27 +465,65 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
// Schedule a flush. // Schedule a flush.
let how_soon = Duration::seconds(s.flush_if_sec) - duration.to_tm_duration(); let how_soon = Duration::seconds(s.flush_if_sec) - duration.to_tm_duration();
let now = self.db.clocks().monotonic(); let now = self.db.clocks().monotonic();
let t = now + how_soon; let when = now + how_soon;
if let Some((nft, ref r, _)) = self.next_flush { let reason = format!("{} sec after start of {} {}-{} recording {}",
if nft <= t { s.flush_if_sec, duration, c.short_name, s.type_.as_str(), id);
trace!("{}-{}: not scheduling flush in {}; there's already one in {}: {}",
c.short_name, s.type_.as_str(), how_soon, nft - now, &r);
return;
}
}
let reason = format!("{} sec after start of {} {}-{} recording",
s.flush_if_sec, duration, c.short_name, s.type_.as_str());
trace!("scheduling flush in {} because {}", how_soon, &reason); trace!("scheduling flush in {} because {}", how_soon, &reason);
self.next_flush = Some((t, reason, Vec::new())); self.planned_flushes.push(PlannedFlush {
when,
reason,
recording: id,
senders: Vec::new(),
});
} }
fn flush(&mut self, reason: &str) { fn flush(&mut self) {
if let Err(e) = self.db.lock().flush(reason) { let mut l = self.db.lock();
let d = Duration::minutes(1);
warn!("flush failure on save for reason {}; will retry after {}: {:?}", reason, d, e); // Look through the planned flushes and see if any are still relevant. It's possible
let t = self.db.clocks().monotonic() + Duration::minutes(1); // they're not because something else (e.g., a syncer for a different sample file dir)
self.next_flush = Some((t, "retry after flush failure".to_owned(), Vec::new())); // has flushed the database in the meantime.
use std::collections::binary_heap::PeekMut;
while let Some(f) = self.planned_flushes.peek_mut() {
let s = match l.streams_by_id().get(&f.recording.stream()) {
Some(s) => s,
None => {
// Removing streams while running hasn't been implemented yet, so this should
// be impossible.
warn!("bug: no stream for {} which was scheduled to be flushed", f.recording);
PeekMut::pop(f);
continue;
}
};
if s.next_recording_id <= f.recording.recording() { // not yet committed.
break;
}
trace!("planned flush ({}) no longer needed", &f.reason);
PeekMut::pop(f);
} }
// If there's anything left to do now, try to flush.
let f = match self.planned_flushes.peek() {
None => return,
Some(f) => f,
};
let now = self.db.clocks().monotonic();
if f.when > now {
return;
}
if let Err(e) = l.flush(&f.reason) {
let d = Duration::minutes(1);
warn!("flush failure on save for reason {}; will retry after {}: {:?}",
f.reason, d, e);
self.planned_flushes.peek_mut().expect("planned_flushes is non-empty").when =
self.db.clocks().monotonic() + Duration::minutes(1);
return;
}
// A successful flush should take care of everything planned.
self.planned_flushes.clear();
} }
} }
@ -861,7 +926,7 @@ mod tests {
dir_id: *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap(), dir_id: *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap(),
dir: dir.clone(), dir: dir.clone(),
db: tdb.db.clone(), db: tdb.db.clone(),
next_flush: None, planned_flushes: std::collections::BinaryHeap::new(),
}; };
let (snd, rcv) = mpsc::channel(); let (snd, rcv) = mpsc::channel();
tdb.db.lock().on_flush(Box::new({ tdb.db.lock().on_flush(Box::new({
@ -875,7 +940,6 @@ mod tests {
.spawn(move || syncer.run(rcv)).unwrap(); .spawn(move || syncer.run(rcv)).unwrap();
Harness { Harness {
//clocks,
dir_id, dir_id,
dir, dir,
db: tdb.db, db: tdb.db,
@ -941,8 +1005,6 @@ mod tests {
gc_done_snd.send(()).unwrap(); gc_done_snd.send(()).unwrap();
Ok(()) Ok(())
}))); })));
//h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new(|_| Ok(()))));
//h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
drop(w); drop(w);