shutdown better

After a frustrating search for a suitable channel to use for shutdown
(tokio::sync::watch::Receiver and
futures::future::Shared<tokio::sync::oneshot::Receiver> didn't look
quite right) in which I rethought my life decisions, I finally just made
my own (server/base/shutdown.rs). We can easily poll it or wait for it
in async or sync contexts. Most importantly, it's convenient; not that
it really matters here, but it's also efficient.

We now do a slightly better job of propagating a "graceful" shutdown
signal, and this channel will give us tools to improve it over time.

* Shut down even when writer or syncer operations are stuck. Fixes #117
* Not done yet: streamers should instantly shut down without waiting for
  a connection attempt or frame or something. I'll probably
  implement that when removing --rtsp-library=ffmpeg. The code should be
  cleaner then.
* Not done yet: fix a couple places that sleep for up to a second when
  they could shut down immediately. I just need to do the plumbing for
  mock clocks to work.

I also implemented an immediate shutdown mode, activated by a second
signal. I think this will mitigate the streamer wait situation.
This commit is contained in:
Scott Lamb
2021-09-23 15:55:53 -07:00
parent 66f76079c0
commit b41a6c43da
21 changed files with 487 additions and 151 deletions

View File

@@ -51,6 +51,8 @@ pub fn init() {
pub struct TestDb<C: Clocks + Clone> {
pub db: Arc<db::Database<C>>,
pub dirs_by_stream_id: Arc<FnvHashMap<i32, Arc<dir::SampleFileDir>>>,
pub shutdown_tx: base::shutdown::Sender,
pub shutdown_rx: base::shutdown::Receiver,
pub syncer_channel: writer::SyncerChannel<::std::fs::File>,
pub syncer_join: thread::JoinHandle<()>,
pub tmpdir: TempDir,
@@ -114,11 +116,14 @@ impl<C: Clocks + Clone> TestDb<C> {
}
let mut dirs_by_stream_id = FnvHashMap::default();
dirs_by_stream_id.insert(TEST_STREAM_ID, dir);
let (shutdown_tx, shutdown_rx) = base::shutdown::channel();
let (syncer_channel, syncer_join) =
writer::start_syncer(db.clone(), sample_file_dir_id).unwrap();
writer::start_syncer(db.clone(), shutdown_rx.clone(), sample_file_dir_id).unwrap();
TestDb {
db,
dirs_by_stream_id: Arc::new(dirs_by_stream_id),
shutdown_tx,
shutdown_rx,
syncer_channel,
syncer_join,
tmpdir,

View File

@@ -8,6 +8,7 @@ use crate::db::{self, CompositeId};
use crate::dir;
use crate::recording::{self, MAX_RECORDING_WALL_DURATION};
use base::clock::{self, Clocks};
use base::shutdown::ShutdownError;
use failure::{bail, format_err, Error};
use fnv::FnvHashMap;
use log::{debug, trace, warn};
@@ -95,6 +96,7 @@ struct Syncer<C: Clocks + Clone, D: DirWriter> {
dir: D,
db: Arc<db::Database<C>>,
planned_flushes: std::collections::BinaryHeap<PlannedFlush>,
shutdown_rx: base::shutdown::Receiver,
}
/// A plan to flush at a given instant due to a recently-saved recording's `flush_if_sec` parameter.
@@ -155,13 +157,14 @@ impl Eq for PlannedFlush {}
/// TODO: add a join wrapper which arranges for the on flush hook to be removed automatically.
pub fn start_syncer<C>(
db: Arc<db::Database<C>>,
shutdown_rx: base::shutdown::Receiver,
dir_id: i32,
) -> Result<(SyncerChannel<::std::fs::File>, thread::JoinHandle<()>), Error>
where
C: Clocks + Clone,
{
let db2 = db.clone();
let (mut syncer, path) = Syncer::new(&db.lock(), db2, dir_id)?;
let (mut syncer, path) = Syncer::new(&db.lock(), shutdown_rx, db2, dir_id)?;
syncer.initial_rotation()?;
let (snd, rcv) = mpsc::channel();
db.lock().on_flush(Box::new({
@@ -199,7 +202,8 @@ pub fn lower_retention(
limits: &[NewLimit],
) -> Result<(), Error> {
let db2 = db.clone();
let (mut syncer, _) = Syncer::new(&db.lock(), db2, dir_id)?;
let (_tx, rx) = base::shutdown::channel();
let (mut syncer, _) = Syncer::new(&db.lock(), rx, db2, dir_id)?;
syncer.do_rotation(|db| {
for l in limits {
let (fs_bytes_before, extra);
@@ -305,6 +309,7 @@ fn list_files_to_abandon(
impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
fn new(
l: &db::LockedDatabase,
shutdown_rx: base::shutdown::Receiver,
db: Arc<db::Database<C>>,
dir_id: i32,
) -> Result<(Self, String), Error> {
@@ -346,6 +351,7 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
Ok((
Syncer {
dir_id,
shutdown_rx,
dir,
db,
planned_flushes: std::collections::BinaryHeap::new(),
@@ -438,8 +444,16 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
// Have a command; handle it.
match cmd {
SyncerCommand::AsyncSaveRecording(id, wall_dur, f) => self.save(id, wall_dur, f),
SyncerCommand::DatabaseFlushed => self.collect_garbage(),
SyncerCommand::AsyncSaveRecording(id, wall_dur, f) => {
if self.save(id, wall_dur, f).is_err() {
return false;
}
}
SyncerCommand::DatabaseFlushed => {
if self.collect_garbage().is_err() {
return false;
}
}
SyncerCommand::Flush(flush) => {
// The sender is waiting for the supplied writer to be dropped. If there's no
// timeout, do so immediately; otherwise wait for that timeout then drop it.
@@ -453,7 +467,7 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
}
/// Collects garbage (without forcing a sync). Called from worker thread.
fn collect_garbage(&mut self) {
fn collect_garbage(&mut self) -> Result<(), ShutdownError> {
trace!("Collecting garbage");
let mut garbage: Vec<_> = {
let l = self.db.lock();
@@ -461,11 +475,11 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
d.garbage_needs_unlink.iter().copied().collect()
};
if garbage.is_empty() {
return;
return Ok(());
}
let c = &self.db.clocks();
for &id in &garbage {
clock::retry_forever(c, &mut || {
clock::retry(c, &self.shutdown_rx, &mut || {
if let Err(e) = self.dir.unlink_file(id) {
if e == nix::Error::ENOENT {
warn!("dir: recording {} already deleted!", id);
@@ -474,25 +488,33 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
return Err(e);
}
Ok(())
});
})?;
}
clock::retry_forever(c, &mut || self.dir.sync());
clock::retry_forever(c, &mut || {
clock::retry(c, &self.shutdown_rx, &mut || self.dir.sync())?;
clock::retry(c, &self.shutdown_rx, &mut || {
self.db.lock().delete_garbage(self.dir_id, &mut garbage)
});
})?;
Ok(())
}
/// Saves the given recording and prompts rotation. Called from worker thread.
/// Note that this doesn't flush immediately; SQLite transactions are batched to lower SSD
/// wear. On the next flush, the old recordings will actually be marked as garbage in the
/// database, and shortly afterward actually deleted from disk.
fn save(&mut self, id: CompositeId, wall_duration: recording::Duration, f: D::File) {
fn save(
&mut self,
id: CompositeId,
wall_duration: recording::Duration,
f: D::File,
) -> Result<(), ShutdownError> {
trace!("Processing save for {}", id);
let stream_id = id.stream();
// Free up a like number of bytes.
clock::retry_forever(&self.db.clocks(), &mut || f.sync_all());
clock::retry_forever(&self.db.clocks(), &mut || self.dir.sync());
clock::retry(&self.db.clocks(), &self.shutdown_rx, &mut || f.sync_all())?;
clock::retry(&self.db.clocks(), &self.shutdown_rx, &mut || {
self.dir.sync()
})?;
let mut db = self.db.lock();
db.mark_synced(id).unwrap();
delete_recordings(&mut db, stream_id, 0).unwrap();
@@ -518,6 +540,7 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
recording: id,
senders: Vec::new(),
});
Ok(())
}
/// Flushes the database if necessary to honor `flush_if_sec` for some recording.
@@ -612,8 +635,8 @@ struct InnerWriter<F: FileWriter> {
hasher: blake3::Hasher,
/// The start time of this segment, based solely on examining the local clock after frames in
/// this segment were received. Frames can suffer from various kinds of delay (initial
/// The start time of this recording, based solely on examining the local clock after frames in
/// this recording were received. Frames can suffer from various kinds of delay (initial
/// buffering, encoding, and network transmission), so this time is set to far in the future on
/// construction, given a real value on the first packet, and decreased as less-delayed packets
/// are discovered. See design/time.md for details.
@@ -625,7 +648,8 @@ struct InnerWriter<F: FileWriter> {
/// the writer is closed cleanly (the caller supplies the next pts), or when the writer is
/// closed uncleanly (with a zero duration, which the `.mp4` format allows only at the end).
///
/// Invariant: this should always be `Some` (briefly violated during `write` call only).
/// `unindexed_sample` should always be `Some`, except when a `write` call has aborted on
/// shutdown. In that case, the close will be unable to write the full segment.
unindexed_sample: Option<UnindexedSample>,
}
@@ -670,7 +694,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
/// On successful return, `self.state` will be `WriterState::Open(w)` with `w` violating the
/// invariant that `unindexed_sample` is `Some`. The caller (`write`) is responsible for
/// correcting this.
fn open(&mut self) -> Result<(), Error> {
fn open(&mut self, shutdown_rx: &mut base::shutdown::Receiver) -> Result<(), Error> {
let prev = match self.state {
WriterState::Unopened => None,
WriterState::Open(_) => return Ok(()),
@@ -688,7 +712,9 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
..Default::default()
},
)?;
let f = clock::retry_forever(&self.db.clocks(), &mut || self.dir.create_file(id));
let f = clock::retry(&self.db.clocks(), shutdown_rx, &mut || {
self.dir.create_file(id)
})?;
self.state = WriterState::Open(InnerWriter {
f,
@@ -710,16 +736,17 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
})
}
/// Writes a new frame to this segment.
/// Writes a new frame to this recording.
/// `local_time` should be the local clock's time as of when this packet was received.
pub fn write(
&mut self,
shutdown_rx: &mut base::shutdown::Receiver,
pkt: &[u8],
local_time: recording::Time,
pts_90k: i64,
is_key: bool,
) -> Result<(), Error> {
self.open()?;
self.open(shutdown_rx)?;
let w = match self.state {
WriterState::Open(ref mut w) => w,
_ => unreachable!(),
@@ -763,7 +790,18 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
}
let mut remaining = pkt;
while !remaining.is_empty() {
let written = clock::retry_forever(&self.db.clocks(), &mut || w.f.write(remaining));
let written =
match clock::retry(&self.db.clocks(), shutdown_rx, &mut || w.f.write(remaining)) {
Ok(w) => w,
Err(e) => {
// close() will do nothing because unindexed_sample will be None.
log::warn!(
"Abandoning incompletely written recording {} on shutdown",
w.id
);
return Err(e.into());
}
};
remaining = &remaining[written..];
}
w.unindexed_sample = Some(UnindexedSample {
@@ -855,10 +893,12 @@ impl<F: FileWriter> InnerWriter<F> {
db: &db::Database<C>,
stream_id: i32,
) -> Result<PreviousWriter, Error> {
let unindexed = self
.unindexed_sample
.take()
.expect("should always be an unindexed sample");
let unindexed = self.unindexed_sample.take().ok_or_else(|| {
format_err!(
"Unable to add recording {} to database due to aborted write",
self.id
)
})?;
let (last_sample_duration, flags) = match next_pts {
None => (0, db::RecordingFlags::TrailingZero as i32),
Some(p) => (i32::try_from(p - unindexed.pts_90k)?, 0),
@@ -1050,8 +1090,10 @@ mod tests {
_tmpdir: ::tempfile::TempDir,
dir: MockDir,
channel: super::SyncerChannel<MockFile>,
_shutdown_tx: base::shutdown::Sender,
shutdown_rx: base::shutdown::Receiver,
syncer: super::Syncer<SimulatedClocks, MockDir>,
syncer_rcv: mpsc::Receiver<super::SyncerCommand<MockFile>>,
syncer_rx: mpsc::Receiver<super::SyncerCommand<MockFile>>,
}
fn new_harness(flush_if_sec: i64) -> Harness {
@@ -1072,6 +1114,7 @@ mod tests {
// Start a mock syncer.
let dir = MockDir::new();
let (shutdown_tx, shutdown_rx) = base::shutdown::channel();
let syncer = super::Syncer {
dir_id: *tdb
.db
@@ -1083,10 +1126,11 @@ mod tests {
dir: dir.clone(),
db: tdb.db.clone(),
planned_flushes: std::collections::BinaryHeap::new(),
shutdown_rx: shutdown_rx.clone(),
};
let (syncer_snd, syncer_rcv) = mpsc::channel();
let (syncer_tx, syncer_rx) = mpsc::channel();
tdb.db.lock().on_flush(Box::new({
let snd = syncer_snd.clone();
let snd = syncer_tx.clone();
move || {
if let Err(e) = snd.send(super::SyncerCommand::DatabaseFlushed) {
warn!("Unable to notify syncer for dir {} of flush: {}", dir_id, e);
@@ -1098,9 +1142,11 @@ mod tests {
dir,
db: tdb.db,
_tmpdir: tdb.tmpdir,
channel: super::SyncerChannel(syncer_snd),
channel: super::SyncerChannel(syncer_tx),
_shutdown_tx: shutdown_tx,
shutdown_rx,
syncer,
syncer_rcv,
syncer_rx,
}
}
@@ -1144,19 +1190,26 @@ mod tests {
));
f.expect(MockFileAction::Write(Box::new(|_| Ok(1))));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"1", recording::Time(1), 0, true).unwrap();
w.write(&mut h.shutdown_rx, b"1", recording::Time(1), 0, true)
.unwrap();
let e = w
.write(b"2", recording::Time(2), i32::max_value() as i64 + 1, true)
.write(
&mut h.shutdown_rx,
b"2",
recording::Time(2),
i32::max_value() as i64 + 1,
true,
)
.unwrap_err();
assert!(e.to_string().contains("excessive pts jump"));
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
drop(w);
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
f.ensure_done();
h.dir.ensure_done();
}
@@ -1206,14 +1259,15 @@ mod tests {
Ok(3)
})));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"123", recording::Time(2), 0, true).unwrap();
w.write(&mut h.shutdown_rx, b"123", recording::Time(2), 0, true)
.unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
w.close(Some(1)).unwrap();
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
f.ensure_done();
h.dir.ensure_done();
@@ -1231,7 +1285,8 @@ mod tests {
Ok(1)
})));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"4", recording::Time(3), 1, true).unwrap();
w.write(&mut h.shutdown_rx, b"4", recording::Time(3), 1, true)
.unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
h.dir.expect(MockDirAction::Unlink(
CompositeId::new(1, 0),
@@ -1252,15 +1307,15 @@ mod tests {
drop(w);
trace!("expecting AsyncSave");
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
trace!("expecting planned flush");
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
assert_eq!(h.syncer.planned_flushes.len(), 0);
trace!("expecting DatabaseFlushed");
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
trace!("expecting DatabaseFlushed again");
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed again
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed again
f.ensure_done();
h.dir.ensure_done();
@@ -1277,13 +1332,13 @@ mod tests {
}
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
// The syncer should shut down cleanly.
drop(h.channel);
h.db.lock().clear_on_flush();
assert_eq!(
h.syncer_rcv.try_recv().err(),
h.syncer_rx.try_recv().err(),
Some(std::sync::mpsc::TryRecvError::Disconnected)
);
assert!(h.syncer.planned_flushes.is_empty());
@@ -1341,16 +1396,17 @@ mod tests {
})));
f.expect(MockFileAction::SyncAll(Box::new(|| Err(eio()))));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"1234", recording::Time(1), 0, true).unwrap();
w.write(&mut h.shutdown_rx, b"1234", recording::Time(1), 0, true)
.unwrap();
h.dir
.expect(MockDirAction::Sync(Box::new(|| Err(nix::Error::EIO))));
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
drop(w);
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
f.ensure_done();
h.dir.ensure_done();
@@ -1365,7 +1421,7 @@ mod tests {
drop(h.channel);
h.db.lock().clear_on_flush();
assert_eq!(
h.syncer_rcv.try_recv().err(),
h.syncer_rx.try_recv().err(),
Some(std::sync::mpsc::TryRecvError::Disconnected)
);
assert!(h.syncer.planned_flushes.is_empty());
@@ -1415,15 +1471,16 @@ mod tests {
Ok(3)
})));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"123", recording::Time(2), 0, true).unwrap();
w.write(&mut h.shutdown_rx, b"123", recording::Time(2), 0, true)
.unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
w.close(Some(1)).unwrap();
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
f.ensure_done();
h.dir.ensure_done();
@@ -1441,7 +1498,8 @@ mod tests {
Ok(1)
})));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"4", recording::Time(3), 1, true).unwrap();
w.write(&mut h.shutdown_rx, b"4", recording::Time(3), 1, true)
.unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
h.dir.expect(MockDirAction::Unlink(
CompositeId::new(1, 0),
@@ -1473,11 +1531,11 @@ mod tests {
drop(w);
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
f.ensure_done();
h.dir.ensure_done();
@@ -1493,13 +1551,13 @@ mod tests {
assert!(dir.garbage_unlinked.is_empty());
}
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
// The syncer should shut down cleanly.
drop(h.channel);
h.db.lock().clear_on_flush();
assert_eq!(
h.syncer_rcv.try_recv().err(),
h.syncer_rx.try_recv().err(),
Some(std::sync::mpsc::TryRecvError::Disconnected)
);
assert!(h.syncer.planned_flushes.is_empty());
@@ -1546,6 +1604,7 @@ mod tests {
})));
f1.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(
&mut h.shutdown_rx,
b"123",
recording::Time(recording::TIME_UNITS_PER_SEC),
0,
@@ -1555,12 +1614,12 @@ mod tests {
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
drop(w);
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
// Flush and let 30 seconds go by.
h.db.lock().flush("forced").unwrap();
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
assert_eq!(h.syncer.planned_flushes.len(), 1);
h.db.clocks().sleep(time::Duration::seconds(30));
@@ -1586,6 +1645,7 @@ mod tests {
})));
f2.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(
&mut h.shutdown_rx,
b"4",
recording::Time(31 * recording::TIME_UNITS_PER_SEC),
1,
@@ -1596,21 +1656,21 @@ mod tests {
drop(w);
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 2);
assert_eq!(h.syncer.planned_flushes.len(), 2);
let db_flush_count_before = h.db.lock().flushes();
assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(31, 0));
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush (no-op)
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush (no-op)
assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(61, 0));
assert_eq!(h.db.lock().flushes(), db_flush_count_before);
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(91, 0));
assert_eq!(h.db.lock().flushes(), db_flush_count_before + 1);
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
f1.ensure_done();
f2.ensure_done();
@@ -1620,7 +1680,7 @@ mod tests {
drop(h.channel);
h.db.lock().clear_on_flush();
assert_eq!(
h.syncer_rcv.try_recv().err(),
h.syncer_rx.try_recv().err(),
Some(std::sync::mpsc::TryRecvError::Disconnected)
);
assert!(h.syncer.planned_flushes.is_empty());