mirror of
https://github.com/scottlamb/moonfire-nvr.git
synced 2025-12-06 07:42:29 -05:00
Merge branch 'master' into new-schema
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "moonfire-db"
|
||||
version = "0.6.5"
|
||||
version = "0.6.7"
|
||||
authors = ["Scott Lamb <slamb@slamb.org>"]
|
||||
readme = "../README.md"
|
||||
edition = "2018"
|
||||
@@ -28,7 +28,7 @@ libc = "0.2"
|
||||
libpasta = "0.1.2"
|
||||
log = "0.4"
|
||||
mylog = { git = "https://github.com/scottlamb/mylog" }
|
||||
nix = "0.22.0"
|
||||
nix = "0.23.0"
|
||||
num-rational = { version = "0.4.0", default-features = false, features = ["std"] }
|
||||
odds = { version = "0.4.0", features = ["std-vec"] }
|
||||
parking_lot = { version = "0.11.1", features = [] }
|
||||
@@ -36,7 +36,7 @@ pretty-hex = "0.2.1"
|
||||
prettydiff = { git = "https://github.com/scottlamb/prettydiff", branch = "pr-update-deps" }
|
||||
protobuf = { git = "https://github.com/stepancheg/rust-protobuf" }
|
||||
ring = "0.16.2"
|
||||
rusqlite = "0.25.3"
|
||||
rusqlite = "0.26.1"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
smallvec = "1.0"
|
||||
|
||||
@@ -33,6 +33,7 @@ use crate::raw;
|
||||
use crate::recording;
|
||||
use crate::schema;
|
||||
use crate::signal;
|
||||
use base::bail_t;
|
||||
use base::clock::{self, Clocks};
|
||||
use base::strutil::encode_size;
|
||||
use failure::{bail, format_err, Error, ResultExt};
|
||||
@@ -214,6 +215,7 @@ pub struct ListAggregatedRecordingsRow {
|
||||
pub open_id: u32,
|
||||
pub first_uncommitted: Option<i32>,
|
||||
pub growing: bool,
|
||||
pub has_trailing_zero: bool,
|
||||
}
|
||||
|
||||
impl ListAggregatedRecordingsRow {
|
||||
@@ -237,6 +239,7 @@ impl ListAggregatedRecordingsRow {
|
||||
None
|
||||
},
|
||||
growing,
|
||||
has_trailing_zero: (row.flags & RecordingFlags::TrailingZero as i32) != 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -341,7 +344,7 @@ impl SampleFileDir {
|
||||
}
|
||||
|
||||
/// Returns expected existing metadata when opening this directory.
|
||||
fn meta(&self, db_uuid: &Uuid) -> schema::DirMeta {
|
||||
fn expected_meta(&self, db_uuid: &Uuid) -> schema::DirMeta {
|
||||
let mut meta = schema::DirMeta::default();
|
||||
meta.db_uuid.extend_from_slice(&db_uuid.as_bytes()[..]);
|
||||
meta.dir_uuid.extend_from_slice(&self.uuid.as_bytes()[..]);
|
||||
@@ -1172,20 +1175,20 @@ impl LockedDatabase {
|
||||
if dir.dir.is_some() {
|
||||
continue;
|
||||
}
|
||||
let mut meta = dir.meta(&self.uuid);
|
||||
let mut expected_meta = dir.expected_meta(&self.uuid);
|
||||
if let Some(o) = self.open.as_ref() {
|
||||
let open = meta.in_progress_open.set_default();
|
||||
let open = expected_meta.in_progress_open.set_default();
|
||||
open.id = o.id;
|
||||
open.uuid.extend_from_slice(&o.uuid.as_bytes()[..]);
|
||||
}
|
||||
let d = dir::SampleFileDir::open(&dir.path, &meta)
|
||||
let d = dir::SampleFileDir::open(&dir.path, &expected_meta)
|
||||
.map_err(|e| e.context(format!("Failed to open dir {}", dir.path)))?;
|
||||
if self.open.is_none() {
|
||||
// read-only mode; it's already fully opened.
|
||||
dir.dir = Some(d);
|
||||
} else {
|
||||
// read-write mode; there are more steps to do.
|
||||
e.insert((meta, d));
|
||||
e.insert((expected_meta, d));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1211,8 +1214,7 @@ impl LockedDatabase {
|
||||
|
||||
for (id, (mut meta, d)) in in_progress.drain() {
|
||||
let dir = self.sample_file_dirs_by_id.get_mut(&id).unwrap();
|
||||
meta.last_complete_open.clear();
|
||||
mem::swap(&mut meta.last_complete_open, &mut meta.in_progress_open);
|
||||
meta.last_complete_open = meta.in_progress_open.take().into();
|
||||
d.write_meta(&meta)?;
|
||||
dir.dir = Some(d);
|
||||
}
|
||||
@@ -1247,10 +1249,10 @@ impl LockedDatabase {
|
||||
&self,
|
||||
stream_id: i32,
|
||||
desired_time: Range<recording::Time>,
|
||||
f: &mut dyn FnMut(ListRecordingsRow) -> Result<(), Error>,
|
||||
) -> Result<(), Error> {
|
||||
f: &mut dyn FnMut(ListRecordingsRow) -> Result<(), base::Error>,
|
||||
) -> Result<(), base::Error> {
|
||||
let s = match self.streams_by_id.get(&stream_id) {
|
||||
None => bail!("no such stream {}", stream_id),
|
||||
None => bail_t!(NotFound, "no such stream {}", stream_id),
|
||||
Some(s) => s,
|
||||
};
|
||||
raw::list_recordings_by_time(&self.conn, stream_id, desired_time.clone(), f)?;
|
||||
@@ -1280,10 +1282,10 @@ impl LockedDatabase {
|
||||
&self,
|
||||
stream_id: i32,
|
||||
desired_ids: Range<i32>,
|
||||
f: &mut dyn FnMut(ListRecordingsRow) -> Result<(), Error>,
|
||||
) -> Result<(), Error> {
|
||||
f: &mut dyn FnMut(ListRecordingsRow) -> Result<(), base::Error>,
|
||||
) -> Result<(), base::Error> {
|
||||
let s = match self.streams_by_id.get(&stream_id) {
|
||||
None => bail!("no such stream {}", stream_id),
|
||||
None => bail_t!(NotFound, "no such stream {}", stream_id),
|
||||
Some(s) => s,
|
||||
};
|
||||
if desired_ids.start < s.cum_recordings {
|
||||
@@ -1321,8 +1323,8 @@ impl LockedDatabase {
|
||||
stream_id: i32,
|
||||
desired_time: Range<recording::Time>,
|
||||
forced_split: recording::Duration,
|
||||
f: &mut dyn FnMut(&ListAggregatedRecordingsRow) -> Result<(), Error>,
|
||||
) -> Result<(), Error> {
|
||||
f: &mut dyn FnMut(&ListAggregatedRecordingsRow) -> Result<(), base::Error>,
|
||||
) -> Result<(), base::Error> {
|
||||
// Iterate, maintaining a map from a recording_id to the aggregated row for the latest
|
||||
// batch of recordings from the run starting at that id. Runs can be split into multiple
|
||||
// batches for a few reasons:
|
||||
@@ -1343,6 +1345,7 @@ impl LockedDatabase {
|
||||
let run_start_id = recording_id - row.run_offset;
|
||||
let uncommitted = (row.flags & RecordingFlags::Uncommitted as i32) != 0;
|
||||
let growing = (row.flags & RecordingFlags::Growing as i32) != 0;
|
||||
let has_trailing_zero = (row.flags & RecordingFlags::TrailingZero as i32) != 0;
|
||||
use std::collections::btree_map::Entry;
|
||||
match aggs.entry(run_start_id) {
|
||||
Entry::Occupied(mut e) => {
|
||||
@@ -1359,7 +1362,8 @@ impl LockedDatabase {
|
||||
} else {
|
||||
// append.
|
||||
if a.time.end != row.start {
|
||||
bail!(
|
||||
bail_t!(
|
||||
Internal,
|
||||
"stream {} recording {} ends at {} but {} starts at {}",
|
||||
stream_id,
|
||||
a.ids.end - 1,
|
||||
@@ -1369,7 +1373,8 @@ impl LockedDatabase {
|
||||
);
|
||||
}
|
||||
if a.open_id != row.open_id {
|
||||
bail!(
|
||||
bail_t!(
|
||||
Internal,
|
||||
"stream {} recording {} has open id {} but {} has {}",
|
||||
stream_id,
|
||||
a.ids.end - 1,
|
||||
@@ -1387,6 +1392,7 @@ impl LockedDatabase {
|
||||
a.first_uncommitted = a.first_uncommitted.or(Some(recording_id));
|
||||
}
|
||||
a.growing = growing;
|
||||
a.has_trailing_zero = has_trailing_zero;
|
||||
}
|
||||
}
|
||||
Entry::Vacant(e) => {
|
||||
@@ -1763,14 +1769,13 @@ impl LockedDatabase {
|
||||
path,
|
||||
uuid,
|
||||
dir: Some(dir),
|
||||
last_complete_open: None,
|
||||
last_complete_open: Some(*o),
|
||||
garbage_needs_unlink: FnvHashSet::default(),
|
||||
garbage_unlinked: Vec::new(),
|
||||
}),
|
||||
Entry::Occupied(_) => bail!("duplicate sample file dir id {}", id),
|
||||
};
|
||||
d.last_complete_open = Some(*o);
|
||||
mem::swap(&mut meta.last_complete_open, &mut meta.in_progress_open);
|
||||
meta.last_complete_open = meta.in_progress_open.take().into();
|
||||
d.dir.as_ref().unwrap().write_meta(&meta)?;
|
||||
Ok(id)
|
||||
}
|
||||
@@ -1792,7 +1797,7 @@ impl LockedDatabase {
|
||||
);
|
||||
}
|
||||
let dir = match d.get_mut().dir.take() {
|
||||
None => dir::SampleFileDir::open(&d.get().path, &d.get().meta(&self.uuid))?,
|
||||
None => dir::SampleFileDir::open(&d.get().path, &d.get().expected_meta(&self.uuid))?,
|
||||
Some(arc) => match Arc::strong_count(&arc) {
|
||||
1 => {
|
||||
d.get_mut().dir = Some(arc); // put it back.
|
||||
@@ -1807,7 +1812,7 @@ impl LockedDatabase {
|
||||
&d.get().path
|
||||
);
|
||||
}
|
||||
let mut meta = d.get().meta(&self.uuid);
|
||||
let mut meta = d.get().expected_meta(&self.uuid);
|
||||
meta.in_progress_open = meta.last_complete_open.take().into();
|
||||
dir.write_meta(&meta)?;
|
||||
if self
|
||||
|
||||
@@ -212,8 +212,8 @@ impl SampleFileDir {
|
||||
///
|
||||
/// `db_meta.in_progress_open` should be filled if the directory should be opened in read/write
|
||||
/// mode; absent in read-only mode.
|
||||
pub fn open(path: &str, db_meta: &schema::DirMeta) -> Result<Arc<SampleFileDir>, Error> {
|
||||
let read_write = db_meta.in_progress_open.is_some();
|
||||
pub fn open(path: &str, expected_meta: &schema::DirMeta) -> Result<Arc<SampleFileDir>, Error> {
|
||||
let read_write = expected_meta.in_progress_open.is_some();
|
||||
let s = SampleFileDir::open_self(path, false)?;
|
||||
s.fd.lock(if read_write {
|
||||
FlockArg::LockExclusiveNonblock
|
||||
@@ -222,45 +222,50 @@ impl SampleFileDir {
|
||||
})
|
||||
.map_err(|e| e.context(format!("unable to lock dir {}", path)))?;
|
||||
let dir_meta = read_meta(&s.fd).map_err(|e| e.context("unable to read meta file"))?;
|
||||
if !SampleFileDir::consistent(db_meta, &dir_meta) {
|
||||
let serialized = db_meta
|
||||
.write_length_delimited_to_bytes()
|
||||
.expect("proto3->vec is infallible");
|
||||
if let Err(e) = SampleFileDir::check_consistent(expected_meta, &dir_meta) {
|
||||
bail!(
|
||||
"metadata mismatch.\ndb: {:#?}\ndir: {:#?}\nserialized db: {:#?}",
|
||||
db_meta,
|
||||
&dir_meta,
|
||||
&serialized
|
||||
"metadata mismatch: {}.\nexpected:\n{:#?}\n\nactual:\n{:#?}",
|
||||
e,
|
||||
expected_meta,
|
||||
&dir_meta
|
||||
);
|
||||
}
|
||||
if db_meta.in_progress_open.is_some() {
|
||||
s.write_meta(db_meta)?;
|
||||
if expected_meta.in_progress_open.is_some() {
|
||||
s.write_meta(expected_meta)?;
|
||||
}
|
||||
Ok(s)
|
||||
}
|
||||
|
||||
/// Returns true if the existing directory and database metadata are consistent; the directory
|
||||
/// Checks that the existing directory and database metadata are consistent; the directory
|
||||
/// is then openable.
|
||||
pub(crate) fn consistent(db_meta: &schema::DirMeta, dir_meta: &schema::DirMeta) -> bool {
|
||||
if dir_meta.db_uuid != db_meta.db_uuid {
|
||||
return false;
|
||||
pub(crate) fn check_consistent(
|
||||
expected_meta: &schema::DirMeta,
|
||||
actual_meta: &schema::DirMeta,
|
||||
) -> Result<(), String> {
|
||||
if actual_meta.db_uuid != expected_meta.db_uuid {
|
||||
return Err("db uuid mismatch".into());
|
||||
}
|
||||
if dir_meta.dir_uuid != db_meta.dir_uuid {
|
||||
return false;
|
||||
if actual_meta.dir_uuid != expected_meta.dir_uuid {
|
||||
return Err("dir uuid mismatch".into());
|
||||
}
|
||||
|
||||
if db_meta.last_complete_open.is_some()
|
||||
&& (db_meta.last_complete_open != dir_meta.last_complete_open
|
||||
&& db_meta.last_complete_open != dir_meta.in_progress_open)
|
||||
if expected_meta.last_complete_open.is_some()
|
||||
&& (expected_meta.last_complete_open != actual_meta.last_complete_open
|
||||
&& expected_meta.last_complete_open != actual_meta.in_progress_open)
|
||||
{
|
||||
return false;
|
||||
return Err(format!(
|
||||
"expected open {:?}; but got {:?} (complete) or {:?} (in progress)",
|
||||
&expected_meta.last_complete_open,
|
||||
&actual_meta.last_complete_open,
|
||||
&actual_meta.in_progress_open,
|
||||
));
|
||||
}
|
||||
|
||||
if db_meta.last_complete_open.is_none() && dir_meta.last_complete_open.is_some() {
|
||||
return false;
|
||||
if expected_meta.last_complete_open.is_none() && actual_meta.last_complete_open.is_some() {
|
||||
return Err("expected never opened".into());
|
||||
}
|
||||
|
||||
true
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn create(
|
||||
|
||||
@@ -6,7 +6,8 @@
|
||||
|
||||
use crate::db::{self, CompositeId, FromSqlUuid};
|
||||
use crate::recording;
|
||||
use failure::{bail, Error, ResultExt};
|
||||
use base::{ErrorKind, ResultExt as _};
|
||||
use failure::{bail, Error, ResultExt as _};
|
||||
use fnv::FnvHashSet;
|
||||
use rusqlite::{named_params, params};
|
||||
use std::ops::Range;
|
||||
@@ -103,14 +104,18 @@ pub(crate) fn list_recordings_by_time(
|
||||
conn: &rusqlite::Connection,
|
||||
stream_id: i32,
|
||||
desired_time: Range<recording::Time>,
|
||||
f: &mut dyn FnMut(db::ListRecordingsRow) -> Result<(), Error>,
|
||||
) -> Result<(), Error> {
|
||||
let mut stmt = conn.prepare_cached(LIST_RECORDINGS_BY_TIME_SQL)?;
|
||||
let rows = stmt.query(named_params! {
|
||||
":stream_id": stream_id,
|
||||
":start_time_90k": desired_time.start.0,
|
||||
":end_time_90k": desired_time.end.0,
|
||||
})?;
|
||||
f: &mut dyn FnMut(db::ListRecordingsRow) -> Result<(), base::Error>,
|
||||
) -> Result<(), base::Error> {
|
||||
let mut stmt = conn
|
||||
.prepare_cached(LIST_RECORDINGS_BY_TIME_SQL)
|
||||
.err_kind(ErrorKind::Internal)?;
|
||||
let rows = stmt
|
||||
.query(named_params! {
|
||||
":stream_id": stream_id,
|
||||
":start_time_90k": desired_time.start.0,
|
||||
":end_time_90k": desired_time.end.0,
|
||||
})
|
||||
.err_kind(ErrorKind::Internal)?;
|
||||
list_recordings_inner(rows, false, f)
|
||||
}
|
||||
|
||||
@@ -119,39 +124,46 @@ pub(crate) fn list_recordings_by_id(
|
||||
conn: &rusqlite::Connection,
|
||||
stream_id: i32,
|
||||
desired_ids: Range<i32>,
|
||||
f: &mut dyn FnMut(db::ListRecordingsRow) -> Result<(), Error>,
|
||||
) -> Result<(), Error> {
|
||||
let mut stmt = conn.prepare_cached(LIST_RECORDINGS_BY_ID_SQL)?;
|
||||
let rows = stmt.query(named_params! {
|
||||
":start": CompositeId::new(stream_id, desired_ids.start).0,
|
||||
":end": CompositeId::new(stream_id, desired_ids.end).0,
|
||||
})?;
|
||||
f: &mut dyn FnMut(db::ListRecordingsRow) -> Result<(), base::Error>,
|
||||
) -> Result<(), base::Error> {
|
||||
let mut stmt = conn
|
||||
.prepare_cached(LIST_RECORDINGS_BY_ID_SQL)
|
||||
.err_kind(ErrorKind::Internal)?;
|
||||
let rows = stmt
|
||||
.query(named_params! {
|
||||
":start": CompositeId::new(stream_id, desired_ids.start).0,
|
||||
":end": CompositeId::new(stream_id, desired_ids.end).0,
|
||||
})
|
||||
.err_kind(ErrorKind::Internal)?;
|
||||
list_recordings_inner(rows, true, f)
|
||||
}
|
||||
|
||||
fn list_recordings_inner(
|
||||
mut rows: rusqlite::Rows,
|
||||
include_prev: bool,
|
||||
f: &mut dyn FnMut(db::ListRecordingsRow) -> Result<(), Error>,
|
||||
) -> Result<(), Error> {
|
||||
while let Some(row) = rows.next()? {
|
||||
let wall_duration_90k = row.get(4)?;
|
||||
let media_duration_delta_90k: i32 = row.get(5)?;
|
||||
f: &mut dyn FnMut(db::ListRecordingsRow) -> Result<(), base::Error>,
|
||||
) -> Result<(), base::Error> {
|
||||
while let Some(row) = rows.next().err_kind(ErrorKind::Internal)? {
|
||||
let wall_duration_90k = row.get(4).err_kind(ErrorKind::Internal)?;
|
||||
let media_duration_delta_90k: i32 = row.get(5).err_kind(ErrorKind::Internal)?;
|
||||
f(db::ListRecordingsRow {
|
||||
id: CompositeId(row.get(0)?),
|
||||
run_offset: row.get(1)?,
|
||||
flags: row.get(2)?,
|
||||
start: recording::Time(row.get(3)?),
|
||||
id: CompositeId(row.get(0).err_kind(ErrorKind::Internal)?),
|
||||
run_offset: row.get(1).err_kind(ErrorKind::Internal)?,
|
||||
flags: row.get(2).err_kind(ErrorKind::Internal)?,
|
||||
start: recording::Time(row.get(3).err_kind(ErrorKind::Internal)?),
|
||||
wall_duration_90k,
|
||||
media_duration_90k: wall_duration_90k + media_duration_delta_90k,
|
||||
sample_file_bytes: row.get(6)?,
|
||||
video_samples: row.get(7)?,
|
||||
video_sync_samples: row.get(8)?,
|
||||
video_sample_entry_id: row.get(9)?,
|
||||
open_id: row.get(10)?,
|
||||
sample_file_bytes: row.get(6).err_kind(ErrorKind::Internal)?,
|
||||
video_samples: row.get(7).err_kind(ErrorKind::Internal)?,
|
||||
video_sync_samples: row.get(8).err_kind(ErrorKind::Internal)?,
|
||||
video_sample_entry_id: row.get(9).err_kind(ErrorKind::Internal)?,
|
||||
open_id: row.get(10).err_kind(ErrorKind::Internal)?,
|
||||
prev_media_duration_and_runs: match include_prev {
|
||||
false => None,
|
||||
true => Some((recording::Duration(row.get(11)?), row.get(12)?)),
|
||||
true => Some((
|
||||
recording::Duration(row.get(11).err_kind(ErrorKind::Internal)?),
|
||||
row.get(12).err_kind(ErrorKind::Internal)?,
|
||||
)),
|
||||
},
|
||||
})?;
|
||||
}
|
||||
|
||||
@@ -51,6 +51,8 @@ pub fn init() {
|
||||
pub struct TestDb<C: Clocks + Clone> {
|
||||
pub db: Arc<db::Database<C>>,
|
||||
pub dirs_by_stream_id: Arc<FnvHashMap<i32, Arc<dir::SampleFileDir>>>,
|
||||
pub shutdown_tx: base::shutdown::Sender,
|
||||
pub shutdown_rx: base::shutdown::Receiver,
|
||||
pub syncer_channel: writer::SyncerChannel<::std::fs::File>,
|
||||
pub syncer_join: thread::JoinHandle<()>,
|
||||
pub tmpdir: TempDir,
|
||||
@@ -119,11 +121,14 @@ impl<C: Clocks + Clone> TestDb<C> {
|
||||
}
|
||||
let mut dirs_by_stream_id = FnvHashMap::default();
|
||||
dirs_by_stream_id.insert(TEST_STREAM_ID, dir);
|
||||
let (shutdown_tx, shutdown_rx) = base::shutdown::channel();
|
||||
let (syncer_channel, syncer_join) =
|
||||
writer::start_syncer(db.clone(), sample_file_dir_id).unwrap();
|
||||
writer::start_syncer(db.clone(), shutdown_rx.clone(), sample_file_dir_id).unwrap();
|
||||
TestDb {
|
||||
db,
|
||||
dirs_by_stream_id: Arc::new(dirs_by_stream_id),
|
||||
shutdown_tx,
|
||||
shutdown_rx,
|
||||
syncer_channel,
|
||||
syncer_join,
|
||||
tmpdir,
|
||||
|
||||
@@ -37,11 +37,12 @@ fn maybe_upgrade_meta(dir: &dir::Fd, db_meta: &schema::DirMeta) -> Result<bool,
|
||||
dir_meta
|
||||
.merge_from(&mut s)
|
||||
.map_err(|e| e.context("Unable to parse metadata proto: {}"))?;
|
||||
if !dir::SampleFileDir::consistent(&db_meta, &dir_meta) {
|
||||
if let Err(e) = dir::SampleFileDir::check_consistent(&db_meta, &dir_meta) {
|
||||
bail!(
|
||||
"Inconsistent db_meta={:?} dir_meta={:?}",
|
||||
"Inconsistent db_meta={:?} dir_meta={:?}: {}",
|
||||
&db_meta,
|
||||
&dir_meta
|
||||
&dir_meta,
|
||||
e
|
||||
);
|
||||
}
|
||||
let mut f = crate::fs::openat(
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::db::{self, CompositeId};
|
||||
use crate::dir;
|
||||
use crate::recording::{self, MAX_RECORDING_WALL_DURATION};
|
||||
use base::clock::{self, Clocks};
|
||||
use base::shutdown::ShutdownError;
|
||||
use failure::{bail, format_err, Error};
|
||||
use fnv::FnvHashMap;
|
||||
use log::{debug, trace, warn};
|
||||
@@ -95,6 +96,7 @@ struct Syncer<C: Clocks + Clone, D: DirWriter> {
|
||||
dir: D,
|
||||
db: Arc<db::Database<C>>,
|
||||
planned_flushes: std::collections::BinaryHeap<PlannedFlush>,
|
||||
shutdown_rx: base::shutdown::Receiver,
|
||||
}
|
||||
|
||||
/// A plan to flush at a given instant due to a recently-saved recording's `flush_if_sec` parameter.
|
||||
@@ -155,13 +157,14 @@ impl Eq for PlannedFlush {}
|
||||
/// TODO: add a join wrapper which arranges for the on flush hook to be removed automatically.
|
||||
pub fn start_syncer<C>(
|
||||
db: Arc<db::Database<C>>,
|
||||
shutdown_rx: base::shutdown::Receiver,
|
||||
dir_id: i32,
|
||||
) -> Result<(SyncerChannel<::std::fs::File>, thread::JoinHandle<()>), Error>
|
||||
where
|
||||
C: Clocks + Clone,
|
||||
{
|
||||
let db2 = db.clone();
|
||||
let (mut syncer, path) = Syncer::new(&db.lock(), db2, dir_id)?;
|
||||
let (mut syncer, path) = Syncer::new(&db.lock(), shutdown_rx, db2, dir_id)?;
|
||||
syncer.initial_rotation()?;
|
||||
let (snd, rcv) = mpsc::channel();
|
||||
db.lock().on_flush(Box::new({
|
||||
@@ -199,7 +202,8 @@ pub fn lower_retention(
|
||||
limits: &[NewLimit],
|
||||
) -> Result<(), Error> {
|
||||
let db2 = db.clone();
|
||||
let (mut syncer, _) = Syncer::new(&db.lock(), db2, dir_id)?;
|
||||
let (_tx, rx) = base::shutdown::channel();
|
||||
let (mut syncer, _) = Syncer::new(&db.lock(), rx, db2, dir_id)?;
|
||||
syncer.do_rotation(|db| {
|
||||
for l in limits {
|
||||
let (fs_bytes_before, extra);
|
||||
@@ -305,6 +309,7 @@ fn list_files_to_abandon(
|
||||
impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
|
||||
fn new(
|
||||
l: &db::LockedDatabase,
|
||||
shutdown_rx: base::shutdown::Receiver,
|
||||
db: Arc<db::Database<C>>,
|
||||
dir_id: i32,
|
||||
) -> Result<(Self, String), Error> {
|
||||
@@ -346,6 +351,7 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
|
||||
Ok((
|
||||
Syncer {
|
||||
dir_id,
|
||||
shutdown_rx,
|
||||
dir,
|
||||
db,
|
||||
planned_flushes: std::collections::BinaryHeap::new(),
|
||||
@@ -438,8 +444,16 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
|
||||
|
||||
// Have a command; handle it.
|
||||
match cmd {
|
||||
SyncerCommand::AsyncSaveRecording(id, wall_dur, f) => self.save(id, wall_dur, f),
|
||||
SyncerCommand::DatabaseFlushed => self.collect_garbage(),
|
||||
SyncerCommand::AsyncSaveRecording(id, wall_dur, f) => {
|
||||
if self.save(id, wall_dur, f).is_err() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
SyncerCommand::DatabaseFlushed => {
|
||||
if self.collect_garbage().is_err() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
SyncerCommand::Flush(flush) => {
|
||||
// The sender is waiting for the supplied writer to be dropped. If there's no
|
||||
// timeout, do so immediately; otherwise wait for that timeout then drop it.
|
||||
@@ -453,7 +467,7 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
|
||||
}
|
||||
|
||||
/// Collects garbage (without forcing a sync). Called from worker thread.
|
||||
fn collect_garbage(&mut self) {
|
||||
fn collect_garbage(&mut self) -> Result<(), ShutdownError> {
|
||||
trace!("Collecting garbage");
|
||||
let mut garbage: Vec<_> = {
|
||||
let l = self.db.lock();
|
||||
@@ -461,11 +475,11 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
|
||||
d.garbage_needs_unlink.iter().copied().collect()
|
||||
};
|
||||
if garbage.is_empty() {
|
||||
return;
|
||||
return Ok(());
|
||||
}
|
||||
let c = &self.db.clocks();
|
||||
for &id in &garbage {
|
||||
clock::retry_forever(c, &mut || {
|
||||
clock::retry(c, &self.shutdown_rx, &mut || {
|
||||
if let Err(e) = self.dir.unlink_file(id) {
|
||||
if e == nix::Error::ENOENT {
|
||||
warn!("dir: recording {} already deleted!", id);
|
||||
@@ -474,25 +488,33 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
|
||||
return Err(e);
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
})?;
|
||||
}
|
||||
clock::retry_forever(c, &mut || self.dir.sync());
|
||||
clock::retry_forever(c, &mut || {
|
||||
clock::retry(c, &self.shutdown_rx, &mut || self.dir.sync())?;
|
||||
clock::retry(c, &self.shutdown_rx, &mut || {
|
||||
self.db.lock().delete_garbage(self.dir_id, &mut garbage)
|
||||
});
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Saves the given recording and prompts rotation. Called from worker thread.
|
||||
/// Note that this doesn't flush immediately; SQLite transactions are batched to lower SSD
|
||||
/// wear. On the next flush, the old recordings will actually be marked as garbage in the
|
||||
/// database, and shortly afterward actually deleted from disk.
|
||||
fn save(&mut self, id: CompositeId, wall_duration: recording::Duration, f: D::File) {
|
||||
fn save(
|
||||
&mut self,
|
||||
id: CompositeId,
|
||||
wall_duration: recording::Duration,
|
||||
f: D::File,
|
||||
) -> Result<(), ShutdownError> {
|
||||
trace!("Processing save for {}", id);
|
||||
let stream_id = id.stream();
|
||||
|
||||
// Free up a like number of bytes.
|
||||
clock::retry_forever(&self.db.clocks(), &mut || f.sync_all());
|
||||
clock::retry_forever(&self.db.clocks(), &mut || self.dir.sync());
|
||||
clock::retry(&self.db.clocks(), &self.shutdown_rx, &mut || f.sync_all())?;
|
||||
clock::retry(&self.db.clocks(), &self.shutdown_rx, &mut || {
|
||||
self.dir.sync()
|
||||
})?;
|
||||
let mut db = self.db.lock();
|
||||
db.mark_synced(id).unwrap();
|
||||
delete_recordings(&mut db, stream_id, 0).unwrap();
|
||||
@@ -519,6 +541,7 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
|
||||
recording: id,
|
||||
senders: Vec::new(),
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flushes the database if necessary to honor `flush_if_sec` for some recording.
|
||||
@@ -613,8 +636,8 @@ struct InnerWriter<F: FileWriter> {
|
||||
|
||||
hasher: blake3::Hasher,
|
||||
|
||||
/// The start time of this segment, based solely on examining the local clock after frames in
|
||||
/// this segment were received. Frames can suffer from various kinds of delay (initial
|
||||
/// The start time of this recording, based solely on examining the local clock after frames in
|
||||
/// this recording were received. Frames can suffer from various kinds of delay (initial
|
||||
/// buffering, encoding, and network transmission), so this time is set to far in the future on
|
||||
/// construction, given a real value on the first packet, and decreased as less-delayed packets
|
||||
/// are discovered. See design/time.md for details.
|
||||
@@ -626,7 +649,8 @@ struct InnerWriter<F: FileWriter> {
|
||||
/// the writer is closed cleanly (the caller supplies the next pts), or when the writer is
|
||||
/// closed uncleanly (with a zero duration, which the `.mp4` format allows only at the end).
|
||||
///
|
||||
/// Invariant: this should always be `Some` (briefly violated during `write` call only).
|
||||
/// `unindexed_sample` should always be `Some`, except when a `write` call has aborted on
|
||||
/// shutdown. In that case, the close will be unable to write the full segment.
|
||||
unindexed_sample: Option<UnindexedSample>,
|
||||
}
|
||||
|
||||
@@ -671,7 +695,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
|
||||
/// On successful return, `self.state` will be `WriterState::Open(w)` with `w` violating the
|
||||
/// invariant that `unindexed_sample` is `Some`. The caller (`write`) is responsible for
|
||||
/// correcting this.
|
||||
fn open(&mut self) -> Result<(), Error> {
|
||||
fn open(&mut self, shutdown_rx: &mut base::shutdown::Receiver) -> Result<(), Error> {
|
||||
let prev = match self.state {
|
||||
WriterState::Unopened => None,
|
||||
WriterState::Open(_) => return Ok(()),
|
||||
@@ -689,7 +713,9 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
|
||||
..Default::default()
|
||||
},
|
||||
)?;
|
||||
let f = clock::retry_forever(&self.db.clocks(), &mut || self.dir.create_file(id));
|
||||
let f = clock::retry(&self.db.clocks(), shutdown_rx, &mut || {
|
||||
self.dir.create_file(id)
|
||||
})?;
|
||||
|
||||
self.state = WriterState::Open(InnerWriter {
|
||||
f,
|
||||
@@ -711,16 +737,17 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Writes a new frame to this segment.
|
||||
/// Writes a new frame to this recording.
|
||||
/// `local_time` should be the local clock's time as of when this packet was received.
|
||||
pub fn write(
|
||||
&mut self,
|
||||
shutdown_rx: &mut base::shutdown::Receiver,
|
||||
pkt: &[u8],
|
||||
local_time: recording::Time,
|
||||
pts_90k: i64,
|
||||
is_key: bool,
|
||||
) -> Result<(), Error> {
|
||||
self.open()?;
|
||||
self.open(shutdown_rx)?;
|
||||
let w = match self.state {
|
||||
WriterState::Open(ref mut w) => w,
|
||||
_ => unreachable!(),
|
||||
@@ -764,7 +791,18 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
|
||||
}
|
||||
let mut remaining = pkt;
|
||||
while !remaining.is_empty() {
|
||||
let written = clock::retry_forever(&self.db.clocks(), &mut || w.f.write(remaining));
|
||||
let written =
|
||||
match clock::retry(&self.db.clocks(), shutdown_rx, &mut || w.f.write(remaining)) {
|
||||
Ok(w) => w,
|
||||
Err(e) => {
|
||||
// close() will do nothing because unindexed_sample will be None.
|
||||
log::warn!(
|
||||
"Abandoning incompletely written recording {} on shutdown",
|
||||
w.id
|
||||
);
|
||||
return Err(e.into());
|
||||
}
|
||||
};
|
||||
remaining = &remaining[written..];
|
||||
}
|
||||
w.unindexed_sample = Some(UnindexedSample {
|
||||
@@ -857,10 +895,12 @@ impl<F: FileWriter> InnerWriter<F> {
|
||||
stream_id: i32,
|
||||
reason: Option<String>,
|
||||
) -> Result<PreviousWriter, Error> {
|
||||
let unindexed = self
|
||||
.unindexed_sample
|
||||
.take()
|
||||
.expect("should always be an unindexed sample");
|
||||
let unindexed = self.unindexed_sample.take().ok_or_else(|| {
|
||||
format_err!(
|
||||
"Unable to add recording {} to database due to aborted write",
|
||||
self.id
|
||||
)
|
||||
})?;
|
||||
let (last_sample_duration, flags) = match next_pts {
|
||||
None => (0, db::RecordingFlags::TrailingZero as i32),
|
||||
Some(p) => (i32::try_from(p - unindexed.pts_90k)?, 0),
|
||||
@@ -1059,8 +1099,10 @@ mod tests {
|
||||
_tmpdir: ::tempfile::TempDir,
|
||||
dir: MockDir,
|
||||
channel: super::SyncerChannel<MockFile>,
|
||||
_shutdown_tx: base::shutdown::Sender,
|
||||
shutdown_rx: base::shutdown::Receiver,
|
||||
syncer: super::Syncer<SimulatedClocks, MockDir>,
|
||||
syncer_rcv: mpsc::Receiver<super::SyncerCommand<MockFile>>,
|
||||
syncer_rx: mpsc::Receiver<super::SyncerCommand<MockFile>>,
|
||||
}
|
||||
|
||||
fn new_harness(flush_if_sec: u32) -> Harness {
|
||||
@@ -1081,6 +1123,7 @@ mod tests {
|
||||
|
||||
// Start a mock syncer.
|
||||
let dir = MockDir::new();
|
||||
let (shutdown_tx, shutdown_rx) = base::shutdown::channel();
|
||||
let syncer = super::Syncer {
|
||||
dir_id: *tdb
|
||||
.db
|
||||
@@ -1092,10 +1135,11 @@ mod tests {
|
||||
dir: dir.clone(),
|
||||
db: tdb.db.clone(),
|
||||
planned_flushes: std::collections::BinaryHeap::new(),
|
||||
shutdown_rx: shutdown_rx.clone(),
|
||||
};
|
||||
let (syncer_snd, syncer_rcv) = mpsc::channel();
|
||||
let (syncer_tx, syncer_rx) = mpsc::channel();
|
||||
tdb.db.lock().on_flush(Box::new({
|
||||
let snd = syncer_snd.clone();
|
||||
let snd = syncer_tx.clone();
|
||||
move || {
|
||||
if let Err(e) = snd.send(super::SyncerCommand::DatabaseFlushed) {
|
||||
warn!("Unable to notify syncer for dir {} of flush: {}", dir_id, e);
|
||||
@@ -1107,9 +1151,11 @@ mod tests {
|
||||
dir,
|
||||
db: tdb.db,
|
||||
_tmpdir: tdb.tmpdir,
|
||||
channel: super::SyncerChannel(syncer_snd),
|
||||
channel: super::SyncerChannel(syncer_tx),
|
||||
_shutdown_tx: shutdown_tx,
|
||||
shutdown_rx,
|
||||
syncer,
|
||||
syncer_rcv,
|
||||
syncer_rx,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1153,19 +1199,26 @@ mod tests {
|
||||
));
|
||||
f.expect(MockFileAction::Write(Box::new(|_| Ok(1))));
|
||||
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
||||
w.write(b"1", recording::Time(1), 0, true).unwrap();
|
||||
w.write(&mut h.shutdown_rx, b"1", recording::Time(1), 0, true)
|
||||
.unwrap();
|
||||
|
||||
let e = w
|
||||
.write(b"2", recording::Time(2), i32::max_value() as i64 + 1, true)
|
||||
.write(
|
||||
&mut h.shutdown_rx,
|
||||
b"2",
|
||||
recording::Time(2),
|
||||
i32::max_value() as i64 + 1,
|
||||
true,
|
||||
)
|
||||
.unwrap_err();
|
||||
assert!(e.to_string().contains("excessive pts jump"));
|
||||
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
|
||||
drop(w);
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 1);
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 0);
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
|
||||
f.ensure_done();
|
||||
h.dir.ensure_done();
|
||||
}
|
||||
@@ -1215,14 +1268,15 @@ mod tests {
|
||||
Ok(3)
|
||||
})));
|
||||
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
||||
w.write(b"123", recording::Time(2), 0, true).unwrap();
|
||||
w.write(&mut h.shutdown_rx, b"123", recording::Time(2), 0, true)
|
||||
.unwrap();
|
||||
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
|
||||
w.close(Some(1), None).unwrap();
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 1);
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 0);
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
|
||||
f.ensure_done();
|
||||
h.dir.ensure_done();
|
||||
|
||||
@@ -1240,7 +1294,8 @@ mod tests {
|
||||
Ok(1)
|
||||
})));
|
||||
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
||||
w.write(b"4", recording::Time(3), 1, true).unwrap();
|
||||
w.write(&mut h.shutdown_rx, b"4", recording::Time(3), 1, true)
|
||||
.unwrap();
|
||||
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
|
||||
h.dir.expect(MockDirAction::Unlink(
|
||||
CompositeId::new(1, 0),
|
||||
@@ -1261,15 +1316,15 @@ mod tests {
|
||||
drop(w);
|
||||
|
||||
trace!("expecting AsyncSave");
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 1);
|
||||
trace!("expecting planned flush");
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 0);
|
||||
trace!("expecting DatabaseFlushed");
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
|
||||
trace!("expecting DatabaseFlushed again");
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed again
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed again
|
||||
f.ensure_done();
|
||||
h.dir.ensure_done();
|
||||
|
||||
@@ -1286,13 +1341,13 @@ mod tests {
|
||||
}
|
||||
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 0);
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
|
||||
|
||||
// The syncer should shut down cleanly.
|
||||
drop(h.channel);
|
||||
h.db.lock().clear_on_flush();
|
||||
assert_eq!(
|
||||
h.syncer_rcv.try_recv().err(),
|
||||
h.syncer_rx.try_recv().err(),
|
||||
Some(std::sync::mpsc::TryRecvError::Disconnected)
|
||||
);
|
||||
assert!(h.syncer.planned_flushes.is_empty());
|
||||
@@ -1350,16 +1405,17 @@ mod tests {
|
||||
})));
|
||||
f.expect(MockFileAction::SyncAll(Box::new(|| Err(eio()))));
|
||||
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
||||
w.write(b"1234", recording::Time(1), 0, true).unwrap();
|
||||
w.write(&mut h.shutdown_rx, b"1234", recording::Time(1), 0, true)
|
||||
.unwrap();
|
||||
h.dir
|
||||
.expect(MockDirAction::Sync(Box::new(|| Err(nix::Error::EIO))));
|
||||
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
|
||||
drop(w);
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 1);
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 0);
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
|
||||
f.ensure_done();
|
||||
h.dir.ensure_done();
|
||||
|
||||
@@ -1374,7 +1430,7 @@ mod tests {
|
||||
drop(h.channel);
|
||||
h.db.lock().clear_on_flush();
|
||||
assert_eq!(
|
||||
h.syncer_rcv.try_recv().err(),
|
||||
h.syncer_rx.try_recv().err(),
|
||||
Some(std::sync::mpsc::TryRecvError::Disconnected)
|
||||
);
|
||||
assert!(h.syncer.planned_flushes.is_empty());
|
||||
@@ -1424,15 +1480,16 @@ mod tests {
|
||||
Ok(3)
|
||||
})));
|
||||
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
||||
w.write(b"123", recording::Time(2), 0, true).unwrap();
|
||||
w.write(&mut h.shutdown_rx, b"123", recording::Time(2), 0, true)
|
||||
.unwrap();
|
||||
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
|
||||
w.close(Some(1), None).unwrap();
|
||||
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 1);
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 0);
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
|
||||
f.ensure_done();
|
||||
h.dir.ensure_done();
|
||||
|
||||
@@ -1450,7 +1507,8 @@ mod tests {
|
||||
Ok(1)
|
||||
})));
|
||||
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
||||
w.write(b"4", recording::Time(3), 1, true).unwrap();
|
||||
w.write(&mut h.shutdown_rx, b"4", recording::Time(3), 1, true)
|
||||
.unwrap();
|
||||
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
|
||||
h.dir.expect(MockDirAction::Unlink(
|
||||
CompositeId::new(1, 0),
|
||||
@@ -1482,11 +1540,11 @@ mod tests {
|
||||
|
||||
drop(w);
|
||||
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 1);
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 0);
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
|
||||
f.ensure_done();
|
||||
h.dir.ensure_done();
|
||||
|
||||
@@ -1502,13 +1560,13 @@ mod tests {
|
||||
assert!(dir.garbage_unlinked.is_empty());
|
||||
}
|
||||
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
|
||||
|
||||
// The syncer should shut down cleanly.
|
||||
drop(h.channel);
|
||||
h.db.lock().clear_on_flush();
|
||||
assert_eq!(
|
||||
h.syncer_rcv.try_recv().err(),
|
||||
h.syncer_rx.try_recv().err(),
|
||||
Some(std::sync::mpsc::TryRecvError::Disconnected)
|
||||
);
|
||||
assert!(h.syncer.planned_flushes.is_empty());
|
||||
@@ -1555,6 +1613,7 @@ mod tests {
|
||||
})));
|
||||
f1.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
||||
w.write(
|
||||
&mut h.shutdown_rx,
|
||||
b"123",
|
||||
recording::Time(recording::TIME_UNITS_PER_SEC),
|
||||
0,
|
||||
@@ -1564,12 +1623,12 @@ mod tests {
|
||||
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
|
||||
drop(w);
|
||||
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 1);
|
||||
|
||||
// Flush and let 30 seconds go by.
|
||||
h.db.lock().flush("forced").unwrap();
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 1);
|
||||
h.db.clocks().sleep(time::Duration::seconds(30));
|
||||
|
||||
@@ -1595,6 +1654,7 @@ mod tests {
|
||||
})));
|
||||
f2.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
||||
w.write(
|
||||
&mut h.shutdown_rx,
|
||||
b"4",
|
||||
recording::Time(31 * recording::TIME_UNITS_PER_SEC),
|
||||
1,
|
||||
@@ -1605,21 +1665,21 @@ mod tests {
|
||||
|
||||
drop(w);
|
||||
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 2);
|
||||
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 2);
|
||||
let db_flush_count_before = h.db.lock().flushes();
|
||||
assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(31, 0));
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush (no-op)
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush (no-op)
|
||||
assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(61, 0));
|
||||
assert_eq!(h.db.lock().flushes(), db_flush_count_before);
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 1);
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
|
||||
assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(91, 0));
|
||||
assert_eq!(h.db.lock().flushes(), db_flush_count_before + 1);
|
||||
assert_eq!(h.syncer.planned_flushes.len(), 0);
|
||||
assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed
|
||||
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
|
||||
|
||||
f1.ensure_done();
|
||||
f2.ensure_done();
|
||||
@@ -1629,7 +1689,7 @@ mod tests {
|
||||
drop(h.channel);
|
||||
h.db.lock().clear_on_flush();
|
||||
assert_eq!(
|
||||
h.syncer_rcv.try_recv().err(),
|
||||
h.syncer_rx.try_recv().err(),
|
||||
Some(std::sync::mpsc::TryRecvError::Disconnected)
|
||||
);
|
||||
assert!(h.syncer.planned_flushes.is_empty());
|
||||
|
||||
Reference in New Issue
Block a user