fix streamer tests broken by 4cc796f

That commit rewrote the db/writer.rs tests. But the flush op they
used before was also used by src/streamer.rs tests. Reintroduce it.
This commit is contained in:
Scott Lamb 2019-01-06 07:07:04 -08:00
parent 4cc796f697
commit b9e6a6461f

View File

@ -89,6 +89,7 @@ impl FileWriter for ::std::fs::File {
enum SyncerCommand<F> {
AsyncSaveRecording(CompositeId, recording::Duration, F),
DatabaseFlushed,
Flush(mpsc::SyncSender<()>),
}
/// A channel which can be used to send commands to the syncer.
@ -117,6 +118,10 @@ struct PlannedFlush {
/// A human-readable reason for the flush, for logs.
reason: String,
/// Senders to drop when this time is reached. This is for test instrumentation; see
/// `SyncerChannel::flush`.
senders: Vec<mpsc::SyncSender<()>>,
}
// PlannedFlush is meant for placement in a max-heap which should return the soonest flush. This
@ -245,6 +250,15 @@ impl<F: FileWriter> SyncerChannel<F> {
fn async_save_recording(&self, id: CompositeId, duration: recording::Duration, f: F) {
self.0.send(SyncerCommand::AsyncSaveRecording(id, duration, f)).unwrap();
}
/// For testing: flushes the syncer, waiting for all currently-queued commands to complete,
/// including the next scheduled database flush (if any). Note this doesn't wait for any
/// post-database flush garbage collection.
pub fn flush(&self) {
let (snd, rcv) = mpsc::sync_channel(0);
self.0.send(SyncerCommand::Flush(snd)).unwrap();
rcv.recv().unwrap_err(); // syncer should just drop the channel, closing it.
}
}
/// Lists files which should be "abandoned" (deleted without ever recording in the database)
@ -394,6 +408,13 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
match cmd {
SyncerCommand::AsyncSaveRecording(id, dur, f) => self.save(id, dur, f),
SyncerCommand::DatabaseFlushed => self.collect_garbage(),
SyncerCommand::Flush(flush) => {
// The sender is waiting for the supplied writer to be dropped. If there's no
// timeout, do so immediately; otherwise wait for that timeout then drop it.
if let Some(mut f) = self.planned_flushes.peek_mut() {
f.senders.push(flush);
}
},
};
true
@ -457,6 +478,7 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
when,
reason,
recording: id,
senders: Vec::new(),
});
}