diff --git a/db/db.rs b/db/db.rs index bf90864..c0d8d27 100644 --- a/db/db.rs +++ b/db/db.rs @@ -589,6 +589,7 @@ fn init_recordings(conn: &mut rusqlite::Connection, stream_id: i32, camera: &Cam pub struct LockedDatabase { conn: rusqlite::Connection, uuid: Uuid, + flush_count: usize, /// If the database is open in read-write mode, the information about the current Open row. open: Option, @@ -783,6 +784,9 @@ impl LockedDatabase { &self.sample_file_dirs_by_id } + /// Returns the number of completed database flushes since startup. + pub fn flushes(&self) -> usize { self.flush_count } + /// Adds a placeholder for an uncommitted recording. /// The caller should write samples and fill the returned `RecordingToInsert` as it goes /// (noting that while holding the lock, it should not perform I/O or acquire the database @@ -954,8 +958,9 @@ impl LockedDatabase { s.range = new_range; } self.auth.post_flush(); - info!("Flush (why: {}): added {} recordings ({}), deleted {} ({}), marked {} ({}) GCed.", - reason, added.len(), added.iter().join(", "), deleted.len(), + self.flush_count += 1; + info!("Flush {} (why: {}): added {} recordings ({}), deleted {} ({}), marked {} ({}) GCed.", + self.flush_count, reason, added.len(), added.iter().join(", "), deleted.len(), deleted.iter().join(", "), gced.len(), gced.iter().join(", ")); for cb in &self.on_flush { cb(); @@ -1841,6 +1846,7 @@ impl Database { db: Some(Mutex::new(LockedDatabase { conn, uuid, + flush_count: 0, open, open_monotonic, auth, diff --git a/db/raw.rs b/db/raw.rs index 6588ddc..7216e28 100644 --- a/db/raw.rs +++ b/db/raw.rs @@ -196,7 +196,8 @@ pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: Com (":video_samples", &r.video_samples), (":video_sync_samples", &r.video_sync_samples), (":video_sample_entry_id", &r.video_sample_entry_id), - ]).with_context(|e| format!("unable to insert recording for {:#?}: {}", r, e))?; + ]).with_context(|e| format!("unable to insert recording for recording {} {:#?}: {}", + id, r, e))?; let mut stmt = tx.prepare_cached(r#" insert into recording_integrity (composite_id, local_time_delta_90k, sample_file_sha1) diff --git a/db/testutil.rs b/db/testutil.rs index 19c39ce..6021c53 100644 --- a/db/testutil.rs +++ b/db/testutil.rs @@ -78,6 +78,10 @@ pub struct TestDb { impl TestDb { /// Creates a test database with one camera. pub fn new(clocks: C) -> Self { + Self::new_with_flush_if_sec(clocks, 0) + } + + pub(crate) fn new_with_flush_if_sec(clocks: C, flush_if_sec: i64) -> Self { let tmpdir = TempDir::new("moonfire-nvr-test").unwrap(); let mut conn = rusqlite::Connection::open_in_memory().unwrap(); @@ -100,7 +104,7 @@ impl TestDb { sample_file_dir_id: Some(sample_file_dir_id), rtsp_path: "/main".to_owned(), record: true, - flush_if_sec: 0, + flush_if_sec, }, Default::default(), ], diff --git a/db/writer.rs b/db/writer.rs index 428e195..336fbd7 100644 --- a/db/writer.rs +++ b/db/writer.rs @@ -89,7 +89,6 @@ impl FileWriter for ::std::fs::File { enum SyncerCommand { AsyncSaveRecording(CompositeId, recording::Duration, F), DatabaseFlushed, - Flush(mpsc::SyncSender<()>), } /// A channel which can be used to send commands to the syncer. @@ -118,10 +117,6 @@ struct PlannedFlush { /// A human-readable reason for the flush, for logs. reason: String, - - /// Senders to drop when this time is reached. This is for test instrumentation; see - /// `SyncerChannel::flush`. - senders: Vec>, } // PlannedFlush is meant for placement in a max-heap which should return the soonest flush. This @@ -179,7 +174,7 @@ where C: Clocks + Clone { Ok((SyncerChannel(snd), thread::Builder::new() .name(format!("sync-{}", path)) - .spawn(move || syncer.run(rcv)).unwrap())) + .spawn(move || { while syncer.iter(&rcv) {} }).unwrap())) } pub struct NewLimit { @@ -250,15 +245,6 @@ impl SyncerChannel { fn async_save_recording(&self, id: CompositeId, duration: recording::Duration, f: F) { self.0.send(SyncerCommand::AsyncSaveRecording(id, duration, f)).unwrap(); } - - /// For testing: flushes the syncer, waiting for all currently-queued commands to complete, - /// including the next scheduled database flush (if any). Note this doesn't wait for any - /// post-database flush garbage collection. - pub fn flush(&self) { - let (snd, rcv) = mpsc::sync_channel(0); - self.0.send(SyncerCommand::Flush(snd)).unwrap(); - rcv.recv().unwrap_err(); // syncer should just drop the channel, closing it. - } } /// Lists files which should be "abandoned" (deleted without ever recording in the database) @@ -377,48 +363,45 @@ impl Syncer> { } impl Syncer { - fn run(&mut self, cmds: mpsc::Receiver>) { - loop { - // Wait for a command, the next flush timeout (if specified), or channel disconnect. - let next_flush = self.planned_flushes.peek().map(|f| f.when); - let cmd = match next_flush { - None => match cmds.recv() { - Err(_) => return, // all cmd senders are gone. + /// Processes a single command or timeout. + /// + /// Returns true iff the loop should continue. + fn iter(&mut self, cmds: &mpsc::Receiver>) -> bool { + // Wait for a command, the next flush timeout (if specified), or channel disconnect. + let next_flush = self.planned_flushes.peek().map(|f| f.when); + let cmd = match next_flush { + None => match cmds.recv() { + Err(_) => return false, // all cmd senders are gone. + Ok(cmd) => cmd, + }, + Some(t) => { + let now = self.db.clocks().monotonic(); + + // Calculate the timeout to use, mapping negative durations to 0. + let timeout = (t - now).to_std().unwrap_or(StdDuration::new(0, 0)); + match self.db.clocks().recv_timeout(&cmds, timeout) { + Err(mpsc::RecvTimeoutError::Disconnected) => return false, // cmd senders gone. + Err(mpsc::RecvTimeoutError::Timeout) => { + self.flush(); + return true; + }, Ok(cmd) => cmd, - }, - Some(t) => { - let now = self.db.clocks().monotonic(); + } + }, + }; - // Calculate the timeout to use, mapping negative durations to 0. - let timeout = (t - now).to_std().unwrap_or(StdDuration::new(0, 0)); - match cmds.recv_timeout(timeout) { - Err(mpsc::RecvTimeoutError::Disconnected) => return, // cmd senders gone. - Err(mpsc::RecvTimeoutError::Timeout) => { - self.flush(); - continue - }, - Ok(cmd) => cmd, - } - }, - }; + // Have a command; handle it. + match cmd { + SyncerCommand::AsyncSaveRecording(id, dur, f) => self.save(id, dur, f), + SyncerCommand::DatabaseFlushed => self.collect_garbage(), + }; - // Have a command; handle it. - match cmd { - SyncerCommand::AsyncSaveRecording(id, dur, f) => self.save(id, dur, f), - SyncerCommand::DatabaseFlushed => self.collect_garbage(), - SyncerCommand::Flush(flush) => { - // The sender is waiting for the supplied writer to be dropped. If there's no - // timeout, do so immediately; otherwise wait for that timeout then drop it. - if let Some(mut f) = self.planned_flushes.peek_mut() { - f.senders.push(flush); - } - }, - }; - } + true } /// Collects garbage (without forcing a sync). Called from worker thread. fn collect_garbage(&mut self) { + trace!("Collecting garbage"); let mut garbage: Vec<_> = { let l = self.db.lock(); let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap(); @@ -451,6 +434,7 @@ impl Syncer { /// Internal helper for `save`. This is separated out so that the question-mark operator /// can be used in the many error paths. fn save(&mut self, id: CompositeId, duration: recording::Duration, f: D::File) { + trace!("Processing save for {}", id); let stream_id = id.stream(); // Free up a like number of bytes. @@ -473,11 +457,13 @@ impl Syncer { when, reason, recording: id, - senders: Vec::new(), }); } + /// Flushes the database if necessary to honor `flush_if_sec` for some recording. + /// Called from worker thread when one of the `planned_flushes` arrives. fn flush(&mut self) { + trace!("Flushing"); let mut l = self.db.lock(); // Look through the planned flushes and see if any are still relevant. It's possible @@ -808,11 +794,11 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Drop for Writer<'a, C, D> { #[cfg(test)] mod tests { - use base::clock::SimulatedClocks; + use base::clock::{Clocks, SimulatedClocks}; use crate::db::{self, CompositeId}; use crate::recording; use parking_lot::Mutex; - use log::warn; + use log::{trace, warn}; use std::collections::VecDeque; use std::io; use std::sync::Arc; @@ -907,12 +893,13 @@ mod tests { _tmpdir: ::tempdir::TempDir, dir: MockDir, channel: super::SyncerChannel, - join: ::std::thread::JoinHandle<()>, + syncer: super::Syncer, + syncer_rcv: mpsc::Receiver>, } - fn new_harness() -> Harness { + fn new_harness(flush_if_sec: i64) -> Harness { let clocks = SimulatedClocks::new(::time::Timespec::new(0, 0)); - let tdb = testutil::TestDb::new(clocks); + let tdb = testutil::TestDb::new_with_flush_if_sec(clocks, flush_if_sec); let dir_id = *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap(); // This starts a real fs-backed syncer. Get rid of it. @@ -922,30 +909,27 @@ mod tests { // Start a mocker syncer. let dir = MockDir::new(); - let mut syncer = super::Syncer { + let syncer = super::Syncer { dir_id: *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap(), dir: dir.clone(), db: tdb.db.clone(), planned_flushes: std::collections::BinaryHeap::new(), }; - let (snd, rcv) = mpsc::channel(); + let (syncer_snd, syncer_rcv) = mpsc::channel(); tdb.db.lock().on_flush(Box::new({ - let snd = snd.clone(); + let snd = syncer_snd.clone(); move || if let Err(e) = snd.send(super::SyncerCommand::DatabaseFlushed) { warn!("Unable to notify syncer for dir {} of flush: {}", dir_id, e); } })); - let join = ::std::thread::Builder::new() - .name("mock-syncer".to_owned()) - .spawn(move || syncer.run(rcv)).unwrap(); - Harness { dir_id, dir, db: tdb.db, _tmpdir: tdb.tmpdir, - channel: super::SyncerChannel(snd), - join, + channel: super::SyncerChannel(syncer_snd), + syncer, + syncer_rcv, } } @@ -955,7 +939,7 @@ mod tests { #[test] fn double_flush() { testutil::init(); - let h = new_harness(); + let mut h = new_harness(0); h.db.lock().update_retention(&[db::RetentionChange { stream_id: testutil::TEST_STREAM_ID, new_record: true, @@ -965,110 +949,124 @@ mod tests { // Setup: add a 3-byte recording. let video_sample_entry_id = h.db.lock().insert_video_sample_entry( 1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap(); - { - let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, - video_sample_entry_id); - let f = MockFile::new(); - h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), - Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); - f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) }))); - f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); - w.write(b"123", recording::Time(2), 0, true).unwrap(); - h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); - w.close(Some(1)); - h.channel.flush(); - f.ensure_done(); - h.dir.ensure_done(); + let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, + video_sample_entry_id); + let f = MockFile::new(); + h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), + Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); + f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) }))); + f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); + w.write(b"123", recording::Time(2), 0, true).unwrap(); + h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); + w.close(Some(1)); + assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave + assert_eq!(h.syncer.planned_flushes.len(), 1); + assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush + assert_eq!(h.syncer.planned_flushes.len(), 0); + assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + f.ensure_done(); + h.dir.ensure_done(); - // Then a 1-byte recording. - let f = MockFile::new(); - h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2), - Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); - f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) }))); - f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); - w.write(b"4", recording::Time(3), 1, true).unwrap(); - h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); - h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new({ - let db = h.db.clone(); - move |_| { - // The drop(w) below should cause the old recording to be deleted (moved to - // garbage). When the database is flushed, the syncer forces garbage collection - // including this unlink. + // Then a 1-byte recording. + let f = MockFile::new(); + h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2), + Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); + f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) }))); + f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); + w.write(b"4", recording::Time(3), 1, true).unwrap(); + h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); + h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new({ + let db = h.db.clone(); + move |_| { + // The drop(w) below should cause the old recording to be deleted (moved to + // garbage). When the database is flushed, the syncer forces garbage collection + // including this unlink. - // Do another database flush here, as if from another syncer. - db.lock().flush("another syncer running").unwrap(); - Ok(()) - } - }))); - let (gc_done_snd, gc_done_rcv) = mpsc::channel(); - h.dir.expect(MockDirAction::Sync(Box::new(move || { - gc_done_snd.send(()).unwrap(); + // Do another database flush here, as if from another syncer. + db.lock().flush("another syncer running").unwrap(); Ok(()) - }))); + } + }))); + h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); + drop(w); - drop(w); + trace!("expecting AsyncSave"); + assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave + assert_eq!(h.syncer.planned_flushes.len(), 1); + trace!("expecting planned flush"); + assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush + assert_eq!(h.syncer.planned_flushes.len(), 0); + trace!("expecting DatabaseFlushed"); + assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + trace!("expecting DatabaseFlushed again"); + assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed again + f.ensure_done(); + h.dir.ensure_done(); - gc_done_rcv.recv().unwrap(); // Wait until the successful gc sync call... - h.channel.flush(); // ...and the DatabaseFlush op to complete. - f.ensure_done(); - h.dir.ensure_done(); - } - - // Garbage should be marked collected on the next flush. + // Garbage should be marked collected on the next database flush. { let mut l = h.db.lock(); - assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty()); - assert!(!l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty()); + let dir = l.sample_file_dirs_by_id().get(&h.dir_id).unwrap(); + assert!(dir.garbage_needs_unlink.is_empty()); + assert!(!dir.garbage_unlinked.is_empty()); l.flush("forced gc").unwrap(); - assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty()); - assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty()); + let dir = l.sample_file_dirs_by_id().get(&h.dir_id).unwrap(); + assert!(dir.garbage_needs_unlink.is_empty()); + assert!(dir.garbage_unlinked.is_empty()); } + assert_eq!(h.syncer.planned_flushes.len(), 0); + assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + // The syncer should shut down cleanly. drop(h.channel); h.db.lock().clear_on_flush(); - h.join.join().unwrap(); + assert_eq!(h.syncer_rcv.try_recv().err(), + Some(std::sync::mpsc::TryRecvError::Disconnected)); + assert!(h.syncer.planned_flushes.is_empty()); } #[test] fn write_path_retries() { testutil::init(); - let h = new_harness(); + let mut h = new_harness(0); let video_sample_entry_id = h.db.lock().insert_video_sample_entry( 1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap(); - { - let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, - video_sample_entry_id); - h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), Box::new(|_id| Err(eio())))); - let f = MockFile::new(); - h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), - Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); - f.expect(MockFileAction::Write(Box::new(|buf| { - assert_eq!(buf, b"1234"); - Err(eio()) - }))); - f.expect(MockFileAction::Write(Box::new(|buf| { - assert_eq!(buf, b"1234"); - Ok(1) - }))); - f.expect(MockFileAction::Write(Box::new(|buf| { - assert_eq!(buf, b"234"); - Err(eio()) - }))); - f.expect(MockFileAction::Write(Box::new(|buf| { - assert_eq!(buf, b"234"); - Ok(3) - }))); - f.expect(MockFileAction::SyncAll(Box::new(|| Err(eio())))); - f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); - w.write(b"1234", recording::Time(1), 0, true).unwrap(); - h.dir.expect(MockDirAction::Sync(Box::new(|| Err(eio())))); - h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); - drop(w); - h.channel.flush(); - f.ensure_done(); - h.dir.ensure_done(); - } + let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, + video_sample_entry_id); + h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), Box::new(|_id| Err(eio())))); + let f = MockFile::new(); + h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), + Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); + f.expect(MockFileAction::Write(Box::new(|buf| { + assert_eq!(buf, b"1234"); + Err(eio()) + }))); + f.expect(MockFileAction::Write(Box::new(|buf| { + assert_eq!(buf, b"1234"); + Ok(1) + }))); + f.expect(MockFileAction::Write(Box::new(|buf| { + assert_eq!(buf, b"234"); + Err(eio()) + }))); + f.expect(MockFileAction::Write(Box::new(|buf| { + assert_eq!(buf, b"234"); + Ok(3) + }))); + f.expect(MockFileAction::SyncAll(Box::new(|| Err(eio())))); + f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); + w.write(b"1234", recording::Time(1), 0, true).unwrap(); + h.dir.expect(MockDirAction::Sync(Box::new(|| Err(eio())))); + h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); + drop(w); + assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave + assert_eq!(h.syncer.planned_flushes.len(), 1); + assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush + assert_eq!(h.syncer.planned_flushes.len(), 0); + assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + f.ensure_done(); + h.dir.ensure_done(); { let l = h.db.lock(); @@ -1076,15 +1074,19 @@ mod tests { assert_eq!(s.bytes_to_add, 0); assert_eq!(s.sample_file_bytes, 4); } + + // The syncer should shut down cleanly. drop(h.channel); h.db.lock().clear_on_flush(); - h.join.join().unwrap(); + assert_eq!(h.syncer_rcv.try_recv().err(), + Some(std::sync::mpsc::TryRecvError::Disconnected)); + assert!(h.syncer.planned_flushes.is_empty()); } #[test] fn gc_path_retries() { testutil::init(); - let h = new_harness(); + let mut h = new_harness(0); h.db.lock().update_retention(&[db::RetentionChange { stream_id: testutil::TEST_STREAM_ID, new_record: true, @@ -1094,76 +1096,156 @@ mod tests { // Setup: add a 3-byte recording. let video_sample_entry_id = h.db.lock().insert_video_sample_entry( 1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap(); - { - let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, - video_sample_entry_id); - let f = MockFile::new(); - h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), - Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); - f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) }))); - f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); - w.write(b"123", recording::Time(2), 0, true).unwrap(); - h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); - w.close(Some(1)); - h.channel.flush(); - f.ensure_done(); - h.dir.ensure_done(); + let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, + video_sample_entry_id); + let f = MockFile::new(); + h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), + Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); + f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) }))); + f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); + w.write(b"123", recording::Time(2), 0, true).unwrap(); + h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); + w.close(Some(1)); - // Then a 1-byte recording. - let f = MockFile::new(); - h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2), - Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); - f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) }))); - f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); - w.write(b"4", recording::Time(3), 1, true).unwrap(); - h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); - h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new({ - let db = h.db.clone(); - move |_| { - // The drop(w) below should cause the old recording to be deleted (moved to - // garbage). When the database is flushed, the syncer forces garbage collection - // including this unlink. + assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave + assert_eq!(h.syncer.planned_flushes.len(), 1); + assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush + assert_eq!(h.syncer.planned_flushes.len(), 0); + assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + f.ensure_done(); + h.dir.ensure_done(); - // This should have already applied the changes to sample file bytes, even - // though the garbage has yet to be collected. - let l = db.lock(); - let s = l.streams_by_id().get(&testutil::TEST_STREAM_ID).unwrap(); - assert_eq!(s.bytes_to_delete, 0); - assert_eq!(s.bytes_to_add, 0); - assert_eq!(s.sample_file_bytes, 1); - Err(eio()) // force a retry. - } - }))); - h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new(|_| Ok(())))); - h.dir.expect(MockDirAction::Sync(Box::new(|| Err(eio())))); - let (gc_done_snd, gc_done_rcv) = mpsc::channel(); - h.dir.expect(MockDirAction::Sync(Box::new(move || { - gc_done_snd.send(()).unwrap(); - Ok(()) - }))); + // Then a 1-byte recording. + let f = MockFile::new(); + h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2), + Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); + f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) }))); + f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); + w.write(b"4", recording::Time(3), 1, true).unwrap(); + h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); + h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new({ + let db = h.db.clone(); + move |_| { + // The drop(w) below should cause the old recording to be deleted (moved to + // garbage). When the database is flushed, the syncer forces garbage collection + // including this unlink. - drop(w); + // This should have already applied the changes to sample file bytes, even + // though the garbage has yet to be collected. + let l = db.lock(); + let s = l.streams_by_id().get(&testutil::TEST_STREAM_ID).unwrap(); + assert_eq!(s.bytes_to_delete, 0); + assert_eq!(s.bytes_to_add, 0); + assert_eq!(s.sample_file_bytes, 1); + Err(eio()) // force a retry. + } + }))); + h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new(|_| Ok(())))); + h.dir.expect(MockDirAction::Sync(Box::new(|| Err(eio())))); + h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); - gc_done_rcv.recv().unwrap(); // Wait until the successful gc sync call... - h.channel.flush(); // ...and the DatabaseFlush op to complete. - f.ensure_done(); - h.dir.ensure_done(); - } + drop(w); + + assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave + assert_eq!(h.syncer.planned_flushes.len(), 1); + assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush + assert_eq!(h.syncer.planned_flushes.len(), 0); + assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + f.ensure_done(); + h.dir.ensure_done(); // Garbage should be marked collected on the next flush. { let mut l = h.db.lock(); - assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty()); - assert!(!l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty()); + let dir = l.sample_file_dirs_by_id().get(&h.dir_id).unwrap(); + assert!(dir.garbage_needs_unlink.is_empty()); + assert!(!dir.garbage_unlinked.is_empty()); l.flush("forced gc").unwrap(); - assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_needs_unlink.is_empty()); - assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage_unlinked.is_empty()); + let dir = l.sample_file_dirs_by_id().get(&h.dir_id).unwrap(); + assert!(dir.garbage_needs_unlink.is_empty()); + assert!(dir.garbage_unlinked.is_empty()); } + assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + // The syncer should shut down cleanly. drop(h.channel); h.db.lock().clear_on_flush(); - h.join.join().unwrap(); + assert_eq!(h.syncer_rcv.try_recv().err(), + Some(std::sync::mpsc::TryRecvError::Disconnected)); + assert!(h.syncer.planned_flushes.is_empty()); + } + + #[test] + fn planned_flush() { + testutil::init(); + let mut h = new_harness(60); // flush_if_sec=60 + + // There's a database constraint forbidding a recording starting at t=0, so advance. + h.db.clocks().sleep(time::Duration::seconds(1)); + + // Setup: add a 3-byte recording. + let video_sample_entry_id = h.db.lock().insert_video_sample_entry( + 1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap(); + let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, + video_sample_entry_id); + let f1 = MockFile::new(); + h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), + Box::new({ let f = f1.clone(); move |_id| Ok(f.clone()) }))); + f1.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) }))); + f1.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); + w.write(b"123", recording::Time(recording::TIME_UNITS_PER_SEC), 0, true).unwrap(); + h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); + drop(w); + + assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave + assert_eq!(h.syncer.planned_flushes.len(), 1); + + // Flush and let 30 seconds go by. + h.db.lock().flush("forced").unwrap(); + assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + assert_eq!(h.syncer.planned_flushes.len(), 1); + h.db.clocks().sleep(time::Duration::seconds(30)); + + // Then, a 1-byte recording. + let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, + video_sample_entry_id); + let f2 = MockFile::new(); + h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2), + Box::new({ let f = f2.clone(); move |_id| Ok(f.clone()) }))); + f2.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) }))); + f2.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); + w.write(b"4", recording::Time(31*recording::TIME_UNITS_PER_SEC), 1, true).unwrap(); + h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); + + drop(w); + + assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave + assert_eq!(h.syncer.planned_flushes.len(), 2); + + assert_eq!(h.syncer.planned_flushes.len(), 2); + let db_flush_count_before = h.db.lock().flushes(); + assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(31, 0)); + assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush (no-op) + assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(61, 0)); + assert_eq!(h.db.lock().flushes(), db_flush_count_before); + assert_eq!(h.syncer.planned_flushes.len(), 1); + assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush + assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(91, 0)); + assert_eq!(h.db.lock().flushes(), db_flush_count_before + 1); + assert_eq!(h.syncer.planned_flushes.len(), 0); + assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + + f1.ensure_done(); + f2.ensure_done(); + h.dir.ensure_done(); + + // The syncer should shut down cleanly. + drop(h.channel); + h.db.lock().clear_on_flush(); + assert_eq!(h.syncer_rcv.try_recv().err(), + Some(std::sync::mpsc::TryRecvError::Disconnected)); + assert!(h.syncer.planned_flushes.is_empty()); } #[test]