properly test fix for #64

I went with the third idea in 1ce52e3: have the tests run each iteration
of the syncer explicitly. These are messy tests that know tons of
internal details, but I think they're less confusing and racy than if I
had the syncer running in a separate thread.
This commit is contained in:
Scott Lamb 2019-01-04 16:11:58 -08:00
parent 1ce52e334c
commit 4cc796f697
4 changed files with 305 additions and 212 deletions

View File

@ -589,6 +589,7 @@ fn init_recordings(conn: &mut rusqlite::Connection, stream_id: i32, camera: &Cam
pub struct LockedDatabase {
conn: rusqlite::Connection,
uuid: Uuid,
flush_count: usize,
/// If the database is open in read-write mode, the information about the current Open row.
open: Option<Open>,
@ -783,6 +784,9 @@ impl LockedDatabase {
&self.sample_file_dirs_by_id
}
/// Returns the number of completed database flushes since startup.
pub fn flushes(&self) -> usize { self.flush_count }
/// Adds a placeholder for an uncommitted recording.
/// The caller should write samples and fill the returned `RecordingToInsert` as it goes
/// (noting that while holding the lock, it should not perform I/O or acquire the database
@ -954,8 +958,9 @@ impl LockedDatabase {
s.range = new_range;
}
self.auth.post_flush();
info!("Flush (why: {}): added {} recordings ({}), deleted {} ({}), marked {} ({}) GCed.",
reason, added.len(), added.iter().join(", "), deleted.len(),
self.flush_count += 1;
info!("Flush {} (why: {}): added {} recordings ({}), deleted {} ({}), marked {} ({}) GCed.",
self.flush_count, reason, added.len(), added.iter().join(", "), deleted.len(),
deleted.iter().join(", "), gced.len(), gced.iter().join(", "));
for cb in &self.on_flush {
cb();
@ -1841,6 +1846,7 @@ impl<C: Clocks + Clone> Database<C> {
db: Some(Mutex::new(LockedDatabase {
conn,
uuid,
flush_count: 0,
open,
open_monotonic,
auth,

View File

@ -196,7 +196,8 @@ pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: Com
(":video_samples", &r.video_samples),
(":video_sync_samples", &r.video_sync_samples),
(":video_sample_entry_id", &r.video_sample_entry_id),
]).with_context(|e| format!("unable to insert recording for {:#?}: {}", r, e))?;
]).with_context(|e| format!("unable to insert recording for recording {} {:#?}: {}",
id, r, e))?;
let mut stmt = tx.prepare_cached(r#"
insert into recording_integrity (composite_id, local_time_delta_90k, sample_file_sha1)

View File

@ -78,6 +78,10 @@ pub struct TestDb<C: Clocks + Clone> {
impl<C: Clocks + Clone> TestDb<C> {
/// Creates a test database with one camera.
pub fn new(clocks: C) -> Self {
Self::new_with_flush_if_sec(clocks, 0)
}
pub(crate) fn new_with_flush_if_sec(clocks: C, flush_if_sec: i64) -> Self {
let tmpdir = TempDir::new("moonfire-nvr-test").unwrap();
let mut conn = rusqlite::Connection::open_in_memory().unwrap();
@ -100,7 +104,7 @@ impl<C: Clocks + Clone> TestDb<C> {
sample_file_dir_id: Some(sample_file_dir_id),
rtsp_path: "/main".to_owned(),
record: true,
flush_if_sec: 0,
flush_if_sec,
},
Default::default(),
],

View File

@ -89,7 +89,6 @@ impl FileWriter for ::std::fs::File {
enum SyncerCommand<F> {
AsyncSaveRecording(CompositeId, recording::Duration, F),
DatabaseFlushed,
Flush(mpsc::SyncSender<()>),
}
/// A channel which can be used to send commands to the syncer.
@ -118,10 +117,6 @@ struct PlannedFlush {
/// A human-readable reason for the flush, for logs.
reason: String,
/// Senders to drop when this time is reached. This is for test instrumentation; see
/// `SyncerChannel::flush`.
senders: Vec<mpsc::SyncSender<()>>,
}
// PlannedFlush is meant for placement in a max-heap which should return the soonest flush. This
@ -179,7 +174,7 @@ where C: Clocks + Clone {
Ok((SyncerChannel(snd),
thread::Builder::new()
.name(format!("sync-{}", path))
.spawn(move || syncer.run(rcv)).unwrap()))
.spawn(move || { while syncer.iter(&rcv) {} }).unwrap()))
}
pub struct NewLimit {
@ -250,15 +245,6 @@ impl<F: FileWriter> SyncerChannel<F> {
fn async_save_recording(&self, id: CompositeId, duration: recording::Duration, f: F) {
self.0.send(SyncerCommand::AsyncSaveRecording(id, duration, f)).unwrap();
}
/// For testing: flushes the syncer, waiting for all currently-queued commands to complete,
/// including the next scheduled database flush (if any). Note this doesn't wait for any
/// post-database flush garbage collection.
pub fn flush(&self) {
let (snd, rcv) = mpsc::sync_channel(0);
self.0.send(SyncerCommand::Flush(snd)).unwrap();
rcv.recv().unwrap_err(); // syncer should just drop the channel, closing it.
}
}
/// Lists files which should be "abandoned" (deleted without ever recording in the database)
@ -377,48 +363,45 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
}
impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
fn run(&mut self, cmds: mpsc::Receiver<SyncerCommand<D::File>>) {
loop {
// Wait for a command, the next flush timeout (if specified), or channel disconnect.
let next_flush = self.planned_flushes.peek().map(|f| f.when);
let cmd = match next_flush {
None => match cmds.recv() {
Err(_) => return, // all cmd senders are gone.
/// Processes a single command or timeout.
///
/// Returns true iff the loop should continue.
fn iter(&mut self, cmds: &mpsc::Receiver<SyncerCommand<D::File>>) -> bool {
// Wait for a command, the next flush timeout (if specified), or channel disconnect.
let next_flush = self.planned_flushes.peek().map(|f| f.when);
let cmd = match next_flush {
None => match cmds.recv() {
Err(_) => return false, // all cmd senders are gone.
Ok(cmd) => cmd,
},
Some(t) => {
let now = self.db.clocks().monotonic();
// Calculate the timeout to use, mapping negative durations to 0.
let timeout = (t - now).to_std().unwrap_or(StdDuration::new(0, 0));
match self.db.clocks().recv_timeout(&cmds, timeout) {
Err(mpsc::RecvTimeoutError::Disconnected) => return false, // cmd senders gone.
Err(mpsc::RecvTimeoutError::Timeout) => {
self.flush();
return true;
},
Ok(cmd) => cmd,
},
Some(t) => {
let now = self.db.clocks().monotonic();
}
},
};
// Calculate the timeout to use, mapping negative durations to 0.
let timeout = (t - now).to_std().unwrap_or(StdDuration::new(0, 0));
match cmds.recv_timeout(timeout) {
Err(mpsc::RecvTimeoutError::Disconnected) => return, // cmd senders gone.
Err(mpsc::RecvTimeoutError::Timeout) => {
self.flush();
continue
},
Ok(cmd) => cmd,
}
},
};
// Have a command; handle it.
match cmd {
SyncerCommand::AsyncSaveRecording(id, dur, f) => self.save(id, dur, f),
SyncerCommand::DatabaseFlushed => self.collect_garbage(),
};
// Have a command; handle it.
match cmd {
SyncerCommand::AsyncSaveRecording(id, dur, f) => self.save(id, dur, f),
SyncerCommand::DatabaseFlushed => self.collect_garbage(),
SyncerCommand::Flush(flush) => {
// The sender is waiting for the supplied writer to be dropped. If there's no
// timeout, do so immediately; otherwise wait for that timeout then drop it.
if let Some(mut f) = self.planned_flushes.peek_mut() {
f.senders.push(flush);
}
},
};
}
true
}
/// Collects garbage (without forcing a sync). Called from worker thread.
fn collect_garbage(&mut self) {
trace!("Collecting garbage");
let mut garbage: Vec<_> = {
let l = self.db.lock();
let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap();
@ -451,6 +434,7 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
/// Internal helper for `save`. This is separated out so that the question-mark operator
/// can be used in the many error paths.
fn save(&mut self, id: CompositeId, duration: recording::Duration, f: D::File) {
trace!("Processing save for {}", id);
let stream_id = id.stream();
// Free up a like number of bytes.
@ -473,11 +457,13 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
when,
reason,
recording: id,
senders: Vec::new(),
});
}
/// Flushes the database if necessary to honor `flush_if_sec` for some recording.
/// Called from worker thread when one of the `planned_flushes` arrives.
fn flush(&mut self) {
trace!("Flushing");
let mut l = self.db.lock();
// Look through the planned flushes and see if any are still relevant. It's possible
@ -808,11 +794,11 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Drop for Writer<'a, C, D> {
#[cfg(test)]
mod tests {
use base::clock::SimulatedClocks;
use base::clock::{Clocks, SimulatedClocks};
use crate::db::{self, CompositeId};
use crate::recording;
use parking_lot::Mutex;
use log::warn;
use log::{trace, warn};
use std::collections::VecDeque;
use std::io;
use std::sync::Arc;
@ -907,12 +893,13 @@ mod tests {
_tmpdir: ::tempdir::TempDir,
dir: MockDir,
channel: super::SyncerChannel<MockFile>,
join: ::std::thread::JoinHandle<()>,
syncer: super::Syncer<SimulatedClocks, MockDir>,
syncer_rcv: mpsc::Receiver<super::SyncerCommand<MockFile>>,
}
fn new_harness() -> Harness {
fn new_harness(flush_if_sec: i64) -> Harness {
let clocks = SimulatedClocks::new(::time::Timespec::new(0, 0));
let tdb = testutil::TestDb::new(clocks);
let tdb = testutil::TestDb::new_with_flush_if_sec(clocks, flush_if_sec);
let dir_id = *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap();
// This starts a real fs-backed syncer. Get rid of it.
@ -922,30 +909,27 @@ mod tests {
// Start a mocker syncer.
let dir = MockDir::new();
let mut syncer = super::Syncer {
let syncer = super::Syncer {
dir_id: *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap(),
dir: dir.clone(),
db: tdb.db.clone(),
planned_flushes: std::collections::BinaryHeap::new(),
};
let (snd, rcv) = mpsc::channel();
let (syncer_snd, syncer_rcv) = mpsc::channel();
tdb.db.lock().on_flush(Box::new({
let snd = snd.clone();
let snd = syncer_snd.clone();
move || if let Err(e) = snd.send(super::SyncerCommand::DatabaseFlushed) {
warn!("Unable to notify syncer for dir {} of flush: {}", dir_id, e);
}
}));
let join = ::std::thread::Builder::new()
.name("mock-syncer".to_owned())
.spawn(move || syncer.run(rcv)).unwrap();
Harness {
dir_id,
dir,
db: tdb.db,
_tmpdir: tdb.tmpdir,
channel: super::SyncerChannel(snd),
join,
channel: super::SyncerChannel(syncer_snd),
syncer,
syncer_rcv,
}
}
@ -955,7 +939,7 @@ mod tests {
#[test]
fn double_flush() {
testutil::init();
let h = new_harness();
let mut h = new_harness(0);
h.db.lock().update_retention(&[db::RetentionChange {
stream_id: testutil::TEST_STREAM_ID,
new_record: true,
@ -965,110 +949,124 @@ mod tests {
// Setup: add a 3-byte recording.
let video_sample_entry_id = h.db.lock().insert_video_sample_entry(
1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap();
{
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
video_sample_entry_id);
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) })));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"123", recording::Time(2), 0, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
w.close(Some(1));
h.channel.flush();
f.ensure_done();
h.dir.ensure_done();
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
video_sample_entry_id);
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) })));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"123", recording::Time(2), 0, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
w.close(Some(1));
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
f.ensure_done();
h.dir.ensure_done();
// Then a 1-byte recording.
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2),
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) })));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"4", recording::Time(3), 1, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new({
let db = h.db.clone();
move |_| {
// The drop(w) below should cause the old recording to be deleted (moved to
// garbage). When the database is flushed, the syncer forces garbage collection
// including this unlink.
// Then a 1-byte recording.
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2),
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) })));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"4", recording::Time(3), 1, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new({
let db = h.db.clone();
move |_| {
// The drop(w) below should cause the old recording to be deleted (moved to
// garbage). When the database is flushed, the syncer forces garbage collection
// including this unlink.
// Do another database flush here, as if from another syncer.
db.lock().flush("another syncer running").unwrap();
Ok(())
}
})));
let (gc_done_snd, gc_done_rcv) = mpsc::channel();
h.dir.expect(MockDirAction::Sync(Box::new(move || {
gc_done_snd.send(()).unwrap();
// Do another database flush here, as if from another syncer.
db.lock().flush("another syncer running").unwrap();
Ok(())
})));
}
})));
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
drop(w);
drop(w);
trace!("expecting AsyncSave");
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
trace!("expecting planned flush");
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
assert_eq!(h.syncer.planned_flushes.len(), 0);
trace!("expecting DatabaseFlushed");
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
trace!("expecting DatabaseFlushed again");
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed again
f.ensure_done();
h.dir.ensure_done();
gc_done_rcv.recv().unwrap(); // Wait until the successful gc sync call...
h.channel.flush(); // ...and the DatabaseFlush op to complete.
f.ensure_done();
h.dir.ensure_done();
}
// Garbage should be marked collected on the next flush.
// Garbage should be marked collected on the next database flush.
{
let mut l = h.db.lock();
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty());
assert!(!l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty());
let dir = l.sample_file_dirs_by_id().get(&h.dir_id).unwrap();
assert!(dir.garbage_needs_unlink.is_empty());
assert!(!dir.garbage_unlinked.is_empty());
l.flush("forced gc").unwrap();
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty());
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty());
let dir = l.sample_file_dirs_by_id().get(&h.dir_id).unwrap();
assert!(dir.garbage_needs_unlink.is_empty());
assert!(dir.garbage_unlinked.is_empty());
}
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
// The syncer should shut down cleanly.
drop(h.channel);
h.db.lock().clear_on_flush();
h.join.join().unwrap();
assert_eq!(h.syncer_rcv.try_recv().err(),
Some(std::sync::mpsc::TryRecvError::Disconnected));
assert!(h.syncer.planned_flushes.is_empty());
}
#[test]
fn write_path_retries() {
testutil::init();
let h = new_harness();
let mut h = new_harness(0);
let video_sample_entry_id = h.db.lock().insert_video_sample_entry(
1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap();
{
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
video_sample_entry_id);
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), Box::new(|_id| Err(eio()))));
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
f.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"1234");
Err(eio())
})));
f.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"1234");
Ok(1)
})));
f.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"234");
Err(eio())
})));
f.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"234");
Ok(3)
})));
f.expect(MockFileAction::SyncAll(Box::new(|| Err(eio()))));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"1234", recording::Time(1), 0, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Err(eio()))));
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
drop(w);
h.channel.flush();
f.ensure_done();
h.dir.ensure_done();
}
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
video_sample_entry_id);
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), Box::new(|_id| Err(eio()))));
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
f.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"1234");
Err(eio())
})));
f.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"1234");
Ok(1)
})));
f.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"234");
Err(eio())
})));
f.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"234");
Ok(3)
})));
f.expect(MockFileAction::SyncAll(Box::new(|| Err(eio()))));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"1234", recording::Time(1), 0, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Err(eio()))));
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
drop(w);
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
f.ensure_done();
h.dir.ensure_done();
{
let l = h.db.lock();
@ -1076,15 +1074,19 @@ mod tests {
assert_eq!(s.bytes_to_add, 0);
assert_eq!(s.sample_file_bytes, 4);
}
// The syncer should shut down cleanly.
drop(h.channel);
h.db.lock().clear_on_flush();
h.join.join().unwrap();
assert_eq!(h.syncer_rcv.try_recv().err(),
Some(std::sync::mpsc::TryRecvError::Disconnected));
assert!(h.syncer.planned_flushes.is_empty());
}
#[test]
fn gc_path_retries() {
testutil::init();
let h = new_harness();
let mut h = new_harness(0);
h.db.lock().update_retention(&[db::RetentionChange {
stream_id: testutil::TEST_STREAM_ID,
new_record: true,
@ -1094,76 +1096,156 @@ mod tests {
// Setup: add a 3-byte recording.
let video_sample_entry_id = h.db.lock().insert_video_sample_entry(
1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap();
{
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
video_sample_entry_id);
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) })));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"123", recording::Time(2), 0, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
w.close(Some(1));
h.channel.flush();
f.ensure_done();
h.dir.ensure_done();
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
video_sample_entry_id);
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) })));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"123", recording::Time(2), 0, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
w.close(Some(1));
// Then a 1-byte recording.
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2),
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) })));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"4", recording::Time(3), 1, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new({
let db = h.db.clone();
move |_| {
// The drop(w) below should cause the old recording to be deleted (moved to
// garbage). When the database is flushed, the syncer forces garbage collection
// including this unlink.
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
f.ensure_done();
h.dir.ensure_done();
// This should have already applied the changes to sample file bytes, even
// though the garbage has yet to be collected.
let l = db.lock();
let s = l.streams_by_id().get(&testutil::TEST_STREAM_ID).unwrap();
assert_eq!(s.bytes_to_delete, 0);
assert_eq!(s.bytes_to_add, 0);
assert_eq!(s.sample_file_bytes, 1);
Err(eio()) // force a retry.
}
})));
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new(|_| Ok(()))));
h.dir.expect(MockDirAction::Sync(Box::new(|| Err(eio()))));
let (gc_done_snd, gc_done_rcv) = mpsc::channel();
h.dir.expect(MockDirAction::Sync(Box::new(move || {
gc_done_snd.send(()).unwrap();
Ok(())
})));
// Then a 1-byte recording.
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2),
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) })));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"4", recording::Time(3), 1, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new({
let db = h.db.clone();
move |_| {
// The drop(w) below should cause the old recording to be deleted (moved to
// garbage). When the database is flushed, the syncer forces garbage collection
// including this unlink.
drop(w);
// This should have already applied the changes to sample file bytes, even
// though the garbage has yet to be collected.
let l = db.lock();
let s = l.streams_by_id().get(&testutil::TEST_STREAM_ID).unwrap();
assert_eq!(s.bytes_to_delete, 0);
assert_eq!(s.bytes_to_add, 0);
assert_eq!(s.sample_file_bytes, 1);
Err(eio()) // force a retry.
}
})));
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new(|_| Ok(()))));
h.dir.expect(MockDirAction::Sync(Box::new(|| Err(eio()))));
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
gc_done_rcv.recv().unwrap(); // Wait until the successful gc sync call...
h.channel.flush(); // ...and the DatabaseFlush op to complete.
f.ensure_done();
h.dir.ensure_done();
}
drop(w);
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
f.ensure_done();
h.dir.ensure_done();
// Garbage should be marked collected on the next flush.
{
let mut l = h.db.lock();
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty());
assert!(!l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty());
let dir = l.sample_file_dirs_by_id().get(&h.dir_id).unwrap();
assert!(dir.garbage_needs_unlink.is_empty());
assert!(!dir.garbage_unlinked.is_empty());
l.flush("forced gc").unwrap();
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty());
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty());
let dir = l.sample_file_dirs_by_id().get(&h.dir_id).unwrap();
assert!(dir.garbage_needs_unlink.is_empty());
assert!(dir.garbage_unlinked.is_empty());
}
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
// The syncer should shut down cleanly.
drop(h.channel);
h.db.lock().clear_on_flush();
h.join.join().unwrap();
assert_eq!(h.syncer_rcv.try_recv().err(),
Some(std::sync::mpsc::TryRecvError::Disconnected));
assert!(h.syncer.planned_flushes.is_empty());
}
#[test]
fn planned_flush() {
testutil::init();
let mut h = new_harness(60); // flush_if_sec=60
// There's a database constraint forbidding a recording starting at t=0, so advance.
h.db.clocks().sleep(time::Duration::seconds(1));
// Setup: add a 3-byte recording.
let video_sample_entry_id = h.db.lock().insert_video_sample_entry(
1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap();
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
video_sample_entry_id);
let f1 = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
Box::new({ let f = f1.clone(); move |_id| Ok(f.clone()) })));
f1.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) })));
f1.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"123", recording::Time(recording::TIME_UNITS_PER_SEC), 0, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
drop(w);
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
// Flush and let 30 seconds go by.
h.db.lock().flush("forced").unwrap();
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
assert_eq!(h.syncer.planned_flushes.len(), 1);
h.db.clocks().sleep(time::Duration::seconds(30));
// Then, a 1-byte recording.
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
video_sample_entry_id);
let f2 = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2),
Box::new({ let f = f2.clone(); move |_id| Ok(f.clone()) })));
f2.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) })));
f2.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"4", recording::Time(31*recording::TIME_UNITS_PER_SEC), 1, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
drop(w);
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 2);
assert_eq!(h.syncer.planned_flushes.len(), 2);
let db_flush_count_before = h.db.lock().flushes();
assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(31, 0));
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush (no-op)
assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(61, 0));
assert_eq!(h.db.lock().flushes(), db_flush_count_before);
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(91, 0));
assert_eq!(h.db.lock().flushes(), db_flush_count_before + 1);
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
f1.ensure_done();
f2.ensure_done();
h.dir.ensure_done();
// The syncer should shut down cleanly.
drop(h.channel);
h.db.lock().clear_on_flush();
assert_eq!(h.syncer_rcv.try_recv().err(),
Some(std::sync::mpsc::TryRecvError::Disconnected));
assert!(h.syncer.planned_flushes.is_empty());
}
#[test]