deflake writer tests

There was a race condition here because it wasn't waiting for the db
flush to complete. This made write_path_retries sometimes not reflect
the consequence of the flush, causing an assertion failure. I assume it
was also responsible for gc_path_retries timeouts under travis-ci.
This commit is contained in:
Scott Lamb 2018-08-07 21:54:25 -05:00
parent f3127f563a
commit d7a94956eb

View File

@ -104,8 +104,11 @@ struct Syncer<C: Clocks + Clone, D: DirWriter> {
dir: D, dir: D,
db: Arc<db::Database<C>>, db: Arc<db::Database<C>>,
/// Information about the next scheduled flush: monotonic time and reason. /// Information about the next scheduled flush:
next_flush: Option<(Timespec, String)>, /// * monotonic time
/// * reason (for logging)
/// * senders to drop when this time is reached (for testing; see SyncerChannel::flush).
next_flush: Option<(Timespec, String, Vec<mpsc::SyncSender<()>>)>,
} }
/// Starts a syncer for the given sample file directory. /// Starts a syncer for the given sample file directory.
@ -213,7 +216,9 @@ impl<F: FileWriter> SyncerChannel<F> {
self.0.send(SyncerCommand::AsyncSaveRecording(id, duration, f)).unwrap(); self.0.send(SyncerCommand::AsyncSaveRecording(id, duration, f)).unwrap();
} }
/// For testing: flushes the syncer, waiting for all currently-queued commands to complete. /// For testing: flushes the syncer, waiting for all currently-queued commands to complete,
/// including a scheduled database flush if any. Note this doesn't wait for any
/// post-database flush garbage collection.
pub fn flush(&self) { pub fn flush(&self) {
let (snd, rcv) = mpsc::sync_channel(0); let (snd, rcv) = mpsc::sync_channel(0);
self.0.send(SyncerCommand::Flush(snd)).unwrap(); self.0.send(SyncerCommand::Flush(snd)).unwrap();
@ -346,7 +351,10 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
Err(_) => return, // all senders are gone. Err(_) => return, // all senders are gone.
Ok(cmd) => cmd, Ok(cmd) => cmd,
}, },
Some((t, r)) => { Some((t, r, flushes)) => {
// Note: `flushes` will be dropped on exit from this block, which has the desired
// behavior of closing the channel.
let now = self.db.clocks().monotonic(); let now = self.db.clocks().monotonic();
// Calculate the timeout to use, mapping negative durations to 0. // Calculate the timeout to use, mapping negative durations to 0.
@ -358,7 +366,7 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
continue continue
}, },
Ok(cmd) => { Ok(cmd) => {
self.next_flush = Some((t, r)); self.next_flush = Some((t, r, flushes));
cmd cmd
}, },
} }
@ -369,7 +377,13 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
match cmd { match cmd {
SyncerCommand::AsyncSaveRecording(id, dur, f) => self.save(id, dur, f), SyncerCommand::AsyncSaveRecording(id, dur, f) => self.save(id, dur, f),
SyncerCommand::DatabaseFlushed => self.collect_garbage(), SyncerCommand::DatabaseFlushed => self.collect_garbage(),
SyncerCommand::Flush(_) => {}, // just drop the supplied sender, closing it. SyncerCommand::Flush(flush) => {
// The sender is waiting for the supplied writer to be dropped. If there's no
// timeout, do so immediately; otherwise wait for that timeout then drop it.
if let Some((_, _, ref mut flushes)) = self.next_flush {
flushes.push(flush);
}
},
}; };
} }
} }
@ -423,7 +437,7 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
let how_soon = Duration::seconds(s.flush_if_sec) - duration.to_tm_duration(); let how_soon = Duration::seconds(s.flush_if_sec) - duration.to_tm_duration();
let now = self.db.clocks().monotonic(); let now = self.db.clocks().monotonic();
let t = now + how_soon; let t = now + how_soon;
if let Some((nft, ref r)) = self.next_flush { if let Some((nft, ref r, _)) = self.next_flush {
if nft <= t { if nft <= t {
trace!("{}-{}: not scheduling flush in {}; there's already one in {}: {}", trace!("{}-{}: not scheduling flush in {}; there's already one in {}: {}",
c.short_name, s.type_.as_str(), how_soon, nft - now, &r); c.short_name, s.type_.as_str(), how_soon, nft - now, &r);
@ -433,7 +447,7 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
let reason = format!("{} sec after start of {} {}-{} recording", let reason = format!("{} sec after start of {} {}-{} recording",
s.flush_if_sec, duration, c.short_name, s.type_.as_str()); s.flush_if_sec, duration, c.short_name, s.type_.as_str());
trace!("scheduling flush in {} because {}", how_soon, &reason); trace!("scheduling flush in {} because {}", how_soon, &reason);
self.next_flush = Some((t, reason)); self.next_flush = Some((t, reason, Vec::new()));
} }
fn flush(&mut self, reason: &str) { fn flush(&mut self, reason: &str) {
@ -441,7 +455,7 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
let d = Duration::minutes(1); let d = Duration::minutes(1);
warn!("flush failure on save for reason {}; will retry after {}: {:?}", reason, d, e); warn!("flush failure on save for reason {}; will retry after {}: {:?}", reason, d, e);
let t = self.db.clocks().monotonic() + Duration::minutes(1); let t = self.db.clocks().monotonic() + Duration::minutes(1);
self.next_flush = Some((t, "retry after flush failure".to_owned())); self.next_flush = Some((t, "retry after flush failure".to_owned(), Vec::new()));
} }
} }
} }