diff --git a/db/writer.rs b/db/writer.rs index 7cfd36d..bd2afdd 100644 --- a/db/writer.rs +++ b/db/writer.rs @@ -104,8 +104,11 @@ struct Syncer { dir: D, db: Arc>, - /// Information about the next scheduled flush: monotonic time and reason. - next_flush: Option<(Timespec, String)>, + /// Information about the next scheduled flush: + /// * monotonic time + /// * reason (for logging) + /// * senders to drop when this time is reached (for testing; see SyncerChannel::flush). + next_flush: Option<(Timespec, String, Vec>)>, } /// Starts a syncer for the given sample file directory. @@ -213,7 +216,9 @@ impl SyncerChannel { self.0.send(SyncerCommand::AsyncSaveRecording(id, duration, f)).unwrap(); } - /// For testing: flushes the syncer, waiting for all currently-queued commands to complete. + /// For testing: flushes the syncer, waiting for all currently-queued commands to complete, + /// including a scheduled database flush if any. Note this doesn't wait for any + /// post-database flush garbage collection. pub fn flush(&self) { let (snd, rcv) = mpsc::sync_channel(0); self.0.send(SyncerCommand::Flush(snd)).unwrap(); @@ -346,7 +351,10 @@ impl Syncer { Err(_) => return, // all senders are gone. Ok(cmd) => cmd, }, - Some((t, r)) => { + Some((t, r, flushes)) => { + // Note: `flushes` will be dropped on exit from this block, which has the desired + // behavior of closing the channel. + let now = self.db.clocks().monotonic(); // Calculate the timeout to use, mapping negative durations to 0. @@ -358,7 +366,7 @@ impl Syncer { continue }, Ok(cmd) => { - self.next_flush = Some((t, r)); + self.next_flush = Some((t, r, flushes)); cmd }, } @@ -369,7 +377,13 @@ impl Syncer { match cmd { SyncerCommand::AsyncSaveRecording(id, dur, f) => self.save(id, dur, f), SyncerCommand::DatabaseFlushed => self.collect_garbage(), - SyncerCommand::Flush(_) => {}, // just drop the supplied sender, closing it. + SyncerCommand::Flush(flush) => { + // The sender is waiting for the supplied writer to be dropped. If there's no + // timeout, do so immediately; otherwise wait for that timeout then drop it. + if let Some((_, _, ref mut flushes)) = self.next_flush { + flushes.push(flush); + } + }, }; } } @@ -423,7 +437,7 @@ impl Syncer { let how_soon = Duration::seconds(s.flush_if_sec) - duration.to_tm_duration(); let now = self.db.clocks().monotonic(); let t = now + how_soon; - if let Some((nft, ref r)) = self.next_flush { + if let Some((nft, ref r, _)) = self.next_flush { if nft <= t { trace!("{}-{}: not scheduling flush in {}; there's already one in {}: {}", c.short_name, s.type_.as_str(), how_soon, nft - now, &r); @@ -433,7 +447,7 @@ impl Syncer { let reason = format!("{} sec after start of {} {}-{} recording", s.flush_if_sec, duration, c.short_name, s.type_.as_str()); trace!("scheduling flush in {} because {}", how_soon, &reason); - self.next_flush = Some((t, reason)); + self.next_flush = Some((t, reason, Vec::new())); } fn flush(&mut self, reason: &str) { @@ -441,7 +455,7 @@ impl Syncer { let d = Duration::minutes(1); warn!("flush failure on save for reason {}; will retry after {}: {:?}", reason, d, e); let t = self.db.clocks().monotonic() + Duration::minutes(1); - self.next_flush = Some((t, "retry after flush failure".to_owned())); + self.next_flush = Some((t, "retry after flush failure".to_owned(), Vec::new())); } } }