diff --git a/db/writer.rs b/db/writer.rs index 336fbd7..064c2cf 100644 --- a/db/writer.rs +++ b/db/writer.rs @@ -89,6 +89,7 @@ impl FileWriter for ::std::fs::File { enum SyncerCommand { AsyncSaveRecording(CompositeId, recording::Duration, F), DatabaseFlushed, + Flush(mpsc::SyncSender<()>), } /// A channel which can be used to send commands to the syncer. @@ -117,6 +118,10 @@ struct PlannedFlush { /// A human-readable reason for the flush, for logs. reason: String, + + /// Senders to drop when this time is reached. This is for test instrumentation; see + /// `SyncerChannel::flush`. + senders: Vec>, } // PlannedFlush is meant for placement in a max-heap which should return the soonest flush. This @@ -245,6 +250,15 @@ impl SyncerChannel { fn async_save_recording(&self, id: CompositeId, duration: recording::Duration, f: F) { self.0.send(SyncerCommand::AsyncSaveRecording(id, duration, f)).unwrap(); } + + /// For testing: flushes the syncer, waiting for all currently-queued commands to complete, + /// including the next scheduled database flush (if any). Note this doesn't wait for any + /// post-database flush garbage collection. + pub fn flush(&self) { + let (snd, rcv) = mpsc::sync_channel(0); + self.0.send(SyncerCommand::Flush(snd)).unwrap(); + rcv.recv().unwrap_err(); // syncer should just drop the channel, closing it. + } } /// Lists files which should be "abandoned" (deleted without ever recording in the database) @@ -394,6 +408,13 @@ impl Syncer { match cmd { SyncerCommand::AsyncSaveRecording(id, dur, f) => self.save(id, dur, f), SyncerCommand::DatabaseFlushed => self.collect_garbage(), + SyncerCommand::Flush(flush) => { + // The sender is waiting for the supplied writer to be dropped. If there's no + // timeout, do so immediately; otherwise wait for that timeout then drop it. + if let Some(mut f) = self.planned_flushes.peek_mut() { + f.senders.push(flush); + } + }, }; true @@ -457,6 +478,7 @@ impl Syncer { when, reason, recording: id, + senders: Vec::new(), }); }