moonfire-nvr/server/db/writer.rs

1765 lines
61 KiB
Rust
Raw Normal View History

// This file is part of Moonfire NVR, a security camera network video recorder.
// Copyright (C) 2020 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt.
// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception.
//! Writing recordings and deleting old ones.
use crate::db::{self, CompositeId};
use crate::dir;
use crate::recording::{self, MAX_RECORDING_WALL_DURATION};
use base::clock::{self, Clocks};
use base::shutdown::ShutdownError;
use base::FastHashMap;
use base::{bail, err, Error};
use std::cmp::{self, Ordering};
use std::convert::TryFrom;
use std::io;
use std::mem;
2021-10-26 14:47:13 -04:00
use std::path::PathBuf;
use std::sync::Mutex;
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::Duration as StdDuration;
use time::{Duration, Timespec};
use tracing::{debug, trace, warn};
/// Trait to allow mocking out [crate::dir::SampleFileDir] in syncer tests.
/// This is public because it's exposed in the [SyncerChannel] type parameters,
/// not because it's of direct use outside this module.
pub trait DirWriter: 'static + Send {
type File: FileWriter;
2019-07-12 00:59:01 -04:00
fn create_file(&self, id: CompositeId) -> Result<Self::File, nix::Error>;
fn sync(&self) -> Result<(), nix::Error>;
fn unlink_file(&self, id: CompositeId) -> Result<(), nix::Error>;
}
/// Trait to allow mocking out [std::fs::File] in syncer tests.
/// This is public because it's exposed in the [SyncerChannel] type parameters,
/// not because it's of direct use outside this module.
pub trait FileWriter: 'static {
/// As in `std::fs::File::sync_all`.
fn sync_all(&self) -> Result<(), io::Error>;
/// As in `std::io::Writer::write`.
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error>;
}
impl DirWriter for Arc<dir::SampleFileDir> {
type File = ::std::fs::File;
2019-07-12 00:59:01 -04:00
fn create_file(&self, id: CompositeId) -> Result<Self::File, nix::Error> {
dir::SampleFileDir::create_file(self, id)
}
fn sync(&self) -> Result<(), nix::Error> {
dir::SampleFileDir::sync(self)
}
2019-07-12 00:59:01 -04:00
fn unlink_file(&self, id: CompositeId) -> Result<(), nix::Error> {
dir::SampleFileDir::unlink_file(self, id)
}
}
impl FileWriter for ::std::fs::File {
fn sync_all(&self) -> Result<(), io::Error> {
self.sync_all()
}
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
io::Write::write(self, buf)
}
}
/// A command sent to a [Syncer].
enum SyncerCommand<F> {
/// Command sent by [SyncerChannel::async_save_recording].
AsyncSaveRecording(CompositeId, recording::Duration, F),
/// Notes that the database has been flushed and garbage collection should be attempted.
/// [start_syncer] sets up a database callback to send this command.
DatabaseFlushed,
/// Command sent by [SyncerChannel::flush].
Flush(mpsc::SyncSender<()>),
}
/// A channel which can be used to send commands to the syncer.
/// Can be cloned to allow multiple threads to send commands.
pub struct SyncerChannel<F>(mpsc::Sender<SyncerCommand<F>>);
impl<F> ::std::clone::Clone for SyncerChannel<F> {
fn clone(&self) -> Self {
SyncerChannel(self.0.clone())
}
}
/// State of the worker thread created by [start_syncer].
struct Syncer<C: Clocks + Clone, D: DirWriter> {
dir_id: i32,
dir: D,
db: Arc<db::Database<C>>,
2019-01-04 14:56:15 -05:00
planned_flushes: std::collections::BinaryHeap<PlannedFlush>,
shutdown_rx: base::shutdown::Receiver,
2019-01-04 14:56:15 -05:00
}
/// A plan to flush at a given instant due to a recently-saved recording's `flush_if_sec` parameter.
2019-01-04 14:56:15 -05:00
struct PlannedFlush {
/// Monotonic time at which this flush should happen.
when: Timespec,
/// Recording which prompts this flush. If this recording is already flushed at the planned
/// time, it can be skipped.
recording: CompositeId,
/// A human-readable reason for the flush, for logs.
reason: String,
/// Senders to drop when this time is reached. This is for test instrumentation; see
/// [SyncerChannel::flush].
senders: Vec<mpsc::SyncSender<()>>,
2019-01-04 14:56:15 -05:00
}
// PlannedFlush is meant for placement in a max-heap which should return the soonest flush. This
// PlannedFlush is greater than other if its when is _less_ than the other's.
impl Ord for PlannedFlush {
fn cmp(&self, other: &Self) -> Ordering {
other.when.cmp(&self.when)
}
}
impl PartialOrd for PlannedFlush {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
2019-01-04 14:56:15 -05:00
impl PartialEq for PlannedFlush {
fn eq(&self, other: &Self) -> bool {
self.when == other.when
}
}
2019-01-04 14:56:15 -05:00
impl Eq for PlannedFlush {}
/// Starts a syncer for the given sample file directory.
///
/// The lock must not be held on `db` when this is called.
///
/// There should be only one syncer per directory, or 0 if operating in read-only mode.
/// This function will perform the initial rotation synchronously, so that it is finished before
/// file writing starts. Afterward the syncing happens in a background thread.
///
/// Returns a `SyncerChannel` which can be used to send commands (and can be cloned freely) and
/// a `JoinHandle` for the syncer thread. Commands sent on the channel will be executed or retried
/// forever. (TODO: provide some manner of pushback during retry.) At program shutdown, all
/// `SyncerChannel` clones should be dropped and then the handle joined to allow all recordings to
/// be persisted.
///
/// Note that dropping all `SyncerChannel` clones currently includes calling
/// `LockedDatabase::clear_on_flush`, as this function installs a hook to watch database flushes.
/// TODO: add a join wrapper which arranges for the on flush hook to be removed automatically.
pub fn start_syncer<C>(
db: Arc<db::Database<C>>,
shutdown_rx: base::shutdown::Receiver,
dir_id: i32,
) -> Result<(SyncerChannel<::std::fs::File>, thread::JoinHandle<()>), Error>
where
C: Clocks + Clone,
{
let db2 = db.clone();
let (mut syncer, path) = Syncer::new(&db.lock(), shutdown_rx, db2, dir_id)?;
let span = tracing::info_span!("syncer", path = %path.display());
span.in_scope(|| {
tracing::info!("initial rotation");
syncer.initial_rotation()
})?;
let (snd, rcv) = mpsc::channel();
db.lock().on_flush(Box::new({
let snd = snd.clone();
move || {
if let Err(err) = snd.send(SyncerCommand::DatabaseFlushed) {
warn!(%err, "unable to notify syncer for dir {}", dir_id);
}
}
}));
Ok((
SyncerChannel(snd),
thread::Builder::new()
.name(format!("sync-{dir_id}"))
.spawn(move || {
span.in_scope(|| {
tracing::info!("starting");
while syncer.iter(&rcv) {}
})
})
.unwrap(),
))
}
/// A new retention limit for use in [lower_retention].
pub struct NewLimit {
pub stream_id: i32,
pub limit: i64,
}
/// Immediately deletes recordings if necessary to fit within the given new `retain_bytes` limit.
/// Note this doesn't change the limit in the database; it only deletes files.
/// Pass a limit of 0 to delete all recordings associated with a camera.
///
/// This is expected to be performed from `moonfire-nvr config` when no syncer is running.
/// It potentially flushes the database twice (before and after the actual deletion).
pub fn lower_retention(
2023-07-01 13:46:13 -04:00
db: &Arc<db::Database>,
dir_id: i32,
limits: &[NewLimit],
) -> Result<(), Error> {
let db2 = db.clone();
let (_tx, rx) = base::shutdown::channel();
let (mut syncer, _) = Syncer::new(&db.lock(), rx, db2, dir_id)?;
syncer.do_rotation(|db| {
for l in limits {
let (fs_bytes_before, extra);
{
let Some(stream) = db.streams_by_id().get(&l.stream_id) else {
bail!(NotFound, msg("no such stream {}", l.stream_id));
};
fs_bytes_before =
stream.fs_bytes + stream.fs_bytes_to_add - stream.fs_bytes_to_delete;
extra = stream.config.retain_bytes - l.limit;
}
if l.limit >= fs_bytes_before {
continue;
}
delete_recordings(db, l.stream_id, extra)?;
}
Ok(())
})
}
/// Enqueues deletion of recordings to bring a stream's disk usage within bounds.
/// The next flush will mark the recordings as garbage in the SQLite database, and then they can
/// be deleted from disk.
fn delete_recordings(
db: &mut db::LockedDatabase,
stream_id: i32,
extra_bytes_needed: i64,
) -> Result<(), Error> {
let fs_bytes_needed = {
let stream = match db.streams_by_id().get(&stream_id) {
None => bail!(NotFound, msg("no stream {stream_id}")),
Some(s) => s,
};
stream.fs_bytes + stream.fs_bytes_to_add - stream.fs_bytes_to_delete + extra_bytes_needed
- stream.config.retain_bytes
};
let mut fs_bytes_to_delete = 0;
if fs_bytes_needed <= 0 {
debug!(
"{}: have remaining quota of {}",
stream_id,
base::strutil::encode_size(fs_bytes_needed)
);
return Ok(());
}
let mut n = 0;
db.delete_oldest_recordings(stream_id, &mut |row| {
if fs_bytes_needed >= fs_bytes_to_delete {
fs_bytes_to_delete += db::round_up(i64::from(row.sample_file_bytes));
n += 1;
return true;
}
false
})?;
Ok(())
}
impl<F: FileWriter> SyncerChannel<F> {
/// Asynchronously syncs the given writer, closes it, records it into the database, and
/// starts rotation.
fn async_save_recording(&self, id: CompositeId, wall_duration: recording::Duration, f: F) {
self.0
.send(SyncerCommand::AsyncSaveRecording(id, wall_duration, f))
.unwrap();
}
/// For testing: flushes the syncer, waiting for all currently-queued commands to complete,
/// including the next scheduled database flush (if any). Note this doesn't wait for any
/// post-database flush garbage collection.
pub fn flush(&self) {
let (snd, rcv) = mpsc::sync_channel(0);
self.0.send(SyncerCommand::Flush(snd)).unwrap();
rcv.recv().unwrap_err(); // syncer should just drop the channel, closing it.
}
}
/// Lists files which should be "abandoned" (deleted without ever recording in the database)
/// on opening.
fn list_files_to_abandon(
dir: &dir::SampleFileDir,
streams_to_next: FastHashMap<i32, i32>,
) -> Result<Vec<CompositeId>, Error> {
let mut v = Vec::new();
let mut d = dir.opendir()?;
for e in d.iter() {
let e = e?;
let id = match dir::parse_id(e.file_name().to_bytes()) {
Ok(i) => i,
Err(_) => continue,
};
let next = match streams_to_next.get(&id.stream()) {
Some(n) => *n,
None => continue, // unknown stream.
};
if id.recording() >= next {
v.push(id);
}
}
Ok(v)
}
impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
fn new(
l: &db::LockedDatabase,
shutdown_rx: base::shutdown::Receiver,
db: Arc<db::Database<C>>,
dir_id: i32,
2021-10-26 14:47:13 -04:00
) -> Result<(Self, PathBuf), Error> {
let d = l
.sample_file_dirs_by_id()
.get(&dir_id)
.ok_or_else(|| err!(NotFound, msg("no dir {dir_id}")))?;
let dir = d.get()?;
// Abandon files.
// First, get a list of the streams in question.
let streams_to_next: FastHashMap<_, _> = l
.streams_by_id()
.iter()
.filter_map(|(&k, v)| {
if v.sample_file_dir_id == Some(dir_id) {
track cumulative duration and runs This is useful for a combo scrub bar-based UI (#32) + live view UI (#59) in a non-obvious way. When constructing a HTML Media Source Extensions API SourceBuffer, the caller can specify a "mode" of either "segments" or "sequence": In "sequence" mode, playback assumes segments are added sequentially. This is good enough for a live view-only UI (#59) but not for a scrub bar UI in which you may want to seek backward to a segment you've never seen before. You will then need to insert a segment out-of-sequence. Imagine what happens when the user goes forward again until the end of the segment inserted immediately before it. The user should see the chronologically next segment or a pause for loading if it's unavailable. The best approximation of this is to track the mapping of timestamps to segments and insert a VTTCue with an enter/exit handler that seeks to the right position. But seeking isn't instantaneous; the user will likely briefly see first the segment they seeked to before. That's janky. Additionally, the "canplaythrough" event will behave strangely. In "segments" mode, playback respects the timestamps we set: * The obvious choice is to use wall clock timestamps. This is fine if they're known to be fixed and correct. They're not. The currently-recording segment may be "unanchored", meaning its start timestamp is not yet fixed. Older timestamps may overlap if the system clock was stepped between runs. The latter isn't /too/ bad from a user perspective, though it's confusing as a developer. We probably will only end up showing the more recent recording for a given timestamp anyway. But the former is quite annoying. It means we have to throw away part of the SourceBuffer that we may want to seek back (causing UI pauses when that happens) or keep our own spare copy of it (memory bloat). I'd like to avoid the whole mess. * Another approach is to use timestamps that are guaranteed to be in the correct order but that may have gaps. In particular, a timestamp of (recording_id * max_recording_duration) + time_within_recording. But again seeking isn't instantaneous. In my experiments, there's a visible pause between segments that drives me nuts. * Finally, the approach that led me to this schema change. Use timestamps that place each segment after the one before, possibly with an intentional gap between runs (to force a wait where we have an actual gap). This should make the browser's natural playback behavior work properly: it never goes to an incorrect place, and it only waits when/if we want it to. We have to maintain a mapping between its timestamps and segment ids but that's doable. This commit is only the schema change; the new data aren't exposed in the API yet, much less used by a UI. Note that stream.next_recording_id became stream.cum_recordings. I made a slight definition change in the process: recording ids for new streams start at 0 rather than 1. Various tests changed accordingly. The upgrade process makes a best effort to backfill these new fields, but of course it doesn't know the total duration or number of runs of previously deleted rows. That's good enough.
2020-06-09 19:17:32 -04:00
Some((k, v.cum_recordings))
} else {
None
}
})
.collect();
let to_abandon = list_files_to_abandon(&dir, streams_to_next)?;
let mut undeletable = 0;
for &id in &to_abandon {
if let Err(err) = dir.unlink_file(id) {
if err == nix::Error::ENOENT {
warn!(%id, "dir: abandoned recording already deleted");
} else {
warn!(%err, %id, "dir: unable to unlink abandoned recording");
undeletable += 1;
}
}
}
if undeletable > 0 {
bail!(
Unknown,
msg("unable to delete {undeletable} abandoned recordings; see logs")
);
}
Ok((
Syncer {
dir_id,
shutdown_rx,
dir,
db,
planned_flushes: std::collections::BinaryHeap::new(),
},
d.path.clone(),
))
}
/// Rotates files for all streams and deletes stale files from previous runs.
/// Called from main thread.
fn initial_rotation(&mut self) -> Result<(), Error> {
self.do_rotation(|db| {
2021-05-17 17:31:50 -04:00
let streams: Vec<i32> = db.streams_by_id().keys().copied().collect();
for &stream_id in &streams {
delete_recordings(db, stream_id, 0)?;
}
Ok(())
})
}
/// Helper to do initial or retention-lowering rotation. Called from main thread.
fn do_rotation<F>(&mut self, delete_recordings: F) -> Result<(), Error>
where
F: Fn(&mut db::LockedDatabase) -> Result<(), Error>,
{
{
let mut db = self.db.lock();
2023-01-28 14:59:21 -05:00
delete_recordings(&mut db)?;
db.flush("synchronous deletion")?;
}
let mut garbage: Vec<_> = {
let l = self.db.lock();
let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap();
2021-05-17 17:31:50 -04:00
d.garbage_needs_unlink.iter().copied().collect()
};
if !garbage.is_empty() {
// Try to delete files; retain ones in `garbage` that don't exist.
let mut errors = 0;
for &id in &garbage {
if let Err(err) = self.dir.unlink_file(id) {
if err != nix::Error::ENOENT {
warn!(%err, "dir: unable to unlink {}", id);
errors += 1;
}
}
}
if errors > 0 {
bail!(
Unknown,
msg("unable to unlink {errors} files (see earlier warning messages for details)"),
);
}
self.dir.sync()?;
self.db.lock().delete_garbage(self.dir_id, &mut garbage)?;
self.db.lock().flush("synchronous garbage collection")?;
}
Ok(())
}
}
impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
/// Processes a single command or timeout.
///
/// Returns true iff the loop should continue.
fn iter(&mut self, cmds: &mpsc::Receiver<SyncerCommand<D::File>>) -> bool {
// Wait for a command, the next flush timeout (if specified), or channel disconnect.
let next_flush = self.planned_flushes.peek().map(|f| f.when);
let cmd = match next_flush {
None => match cmds.recv() {
Err(_) => return false, // all cmd senders are gone.
Ok(cmd) => cmd,
},
Some(t) => {
let now = self.db.clocks().monotonic();
// Calculate the timeout to use, mapping negative durations to 0.
2021-05-17 17:31:50 -04:00
let timeout = (t - now)
.to_std()
.unwrap_or_else(|_| StdDuration::new(0, 0));
match self.db.clocks().recv_timeout(cmds, timeout) {
Err(mpsc::RecvTimeoutError::Disconnected) => return false, // cmd senders gone.
Err(mpsc::RecvTimeoutError::Timeout) => {
self.flush();
return true;
}
Ok(cmd) => cmd,
}
}
};
// Have a command; handle it.
match cmd {
SyncerCommand::AsyncSaveRecording(id, wall_dur, f) => {
if self.save(id, wall_dur, f).is_err() {
return false;
}
}
SyncerCommand::DatabaseFlushed => {
if self.collect_garbage().is_err() {
return false;
}
}
SyncerCommand::Flush(flush) => {
// The sender is waiting for the supplied writer to be dropped. If there's no
// timeout, do so immediately; otherwise wait for that timeout then drop it.
if let Some(mut f) = self.planned_flushes.peek_mut() {
f.senders.push(flush);
}
}
};
true
}
/// Collects garbage (without forcing a sync). Called from worker thread.
fn collect_garbage(&mut self) -> Result<(), ShutdownError> {
trace!("Collecting garbage");
let mut garbage: Vec<_> = {
let l = self.db.lock();
let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap();
2021-05-17 17:31:50 -04:00
d.garbage_needs_unlink.iter().copied().collect()
};
if garbage.is_empty() {
return Ok(());
}
let c = &self.db.clocks();
for &id in &garbage {
clock::retry(c, &self.shutdown_rx, &mut || {
if let Err(e) = self.dir.unlink_file(id) {
if e == nix::Error::ENOENT {
warn!("dir: recording {} already deleted!", id);
return Ok(());
}
return Err(e);
}
Ok(())
})?;
}
clock::retry(c, &self.shutdown_rx, &mut || self.dir.sync())?;
clock::retry(c, &self.shutdown_rx, &mut || {
self.db.lock().delete_garbage(self.dir_id, &mut garbage)
})?;
Ok(())
}
/// Saves the given recording and prompts rotation. Called from worker thread.
/// Note that this doesn't flush immediately; SQLite transactions are batched to lower SSD
/// wear. On the next flush, the old recordings will actually be marked as garbage in the
/// database, and shortly afterward actually deleted from disk.
fn save(
&mut self,
id: CompositeId,
wall_duration: recording::Duration,
f: D::File,
) -> Result<(), ShutdownError> {
trace!("Processing save for {}", id);
let stream_id = id.stream();
// Free up a like number of bytes.
clock::retry(&self.db.clocks(), &self.shutdown_rx, &mut || f.sync_all())?;
clock::retry(&self.db.clocks(), &self.shutdown_rx, &mut || {
self.dir.sync()
})?;
let mut db = self.db.lock();
db.mark_synced(id).unwrap();
delete_recordings(&mut db, stream_id, 0).unwrap();
let s = db.streams_by_id().get(&stream_id).unwrap();
let c = db.cameras_by_id().get(&s.camera_id).unwrap();
// Schedule a flush.
let how_soon =
Duration::seconds(i64::from(s.config.flush_if_sec)) - wall_duration.to_tm_duration();
let now = self.db.clocks().monotonic();
2019-01-04 14:56:15 -05:00
let when = now + how_soon;
let reason = format!(
"{} sec after start of {} {}-{} recording {}",
s.config.flush_if_sec,
wall_duration,
c.short_name,
s.type_.as_str(),
id
);
trace!("scheduling flush in {} because {}", how_soon, &reason);
2019-01-04 14:56:15 -05:00
self.planned_flushes.push(PlannedFlush {
when,
reason,
recording: id,
senders: Vec::new(),
2019-01-04 14:56:15 -05:00
});
Ok(())
}
/// Flushes the database if necessary to honor `flush_if_sec` for some recording.
/// Called from worker thread when one of the `planned_flushes` arrives.
2019-01-04 14:56:15 -05:00
fn flush(&mut self) {
trace!("Flushing");
2019-01-04 14:56:15 -05:00
let mut l = self.db.lock();
// Look through the planned flushes and see if any are still relevant. It's possible
// they're not because something else (e.g., a syncer for a different sample file dir)
// has flushed the database in the meantime.
use std::collections::binary_heap::PeekMut;
while let Some(f) = self.planned_flushes.peek_mut() {
let s = match l.streams_by_id().get(&f.recording.stream()) {
Some(s) => s,
None => {
// Removing streams while running hasn't been implemented yet, so this should
// be impossible.
warn!(
"bug: no stream for {} which was scheduled to be flushed",
f.recording
);
2019-01-04 14:56:15 -05:00
PeekMut::pop(f);
continue;
}
};
if s.cum_recordings <= f.recording.recording() {
// not yet committed.
2019-01-04 14:56:15 -05:00
break;
}
trace!("planned flush ({}) no longer needed", &f.reason);
PeekMut::pop(f);
}
// If there's anything left to do now, try to flush.
let f = match self.planned_flushes.peek() {
None => return,
Some(f) => f,
};
let now = self.db.clocks().monotonic();
if f.when > now {
return;
}
if let Err(e) = l.flush(&f.reason) {
let d = Duration::minutes(1);
warn!(
"flush failure on save for reason {}; will retry after {}: {:?}",
f.reason, d, e
);
self.planned_flushes
.peek_mut()
.expect("planned_flushes is non-empty")
.when = self.db.clocks().monotonic() + Duration::minutes(1);
2019-01-04 14:56:15 -05:00
return;
}
2019-01-04 14:56:15 -05:00
// A successful flush should take care of everything planned.
self.planned_flushes.clear();
}
}
/// Struct for writing a single run (of potentially several recordings) to disk and committing its
/// metadata to the database. `Writer` hands off each recording's state to the syncer when done. It
/// saves the recording to the database (if I/O errors do not prevent this), retries forever,
/// or panics (if further writing on this stream is impossible).
pub struct Writer<'a, C: Clocks + Clone, D: DirWriter> {
dir: &'a D,
db: &'a db::Database<C>,
channel: &'a SyncerChannel<D::File>,
stream_id: i32,
state: WriterState<D::File>,
}
2023-01-28 14:59:21 -05:00
// clippy points out that the `Open` variant is significantly larger and
// suggests boxing it. There's no benefit to this given that we don't have a lot
// of `WriterState`s active at once, and they should cycle between `Open` and
// `Closed`.
#[allow(clippy::large_enum_variant)]
enum WriterState<F: FileWriter> {
Unopened,
Open(InnerWriter<F>),
Closed(PreviousWriter),
}
/// State for writing a single recording, used within [Writer].
///
/// Note that the recording created by every `InnerWriter` must be written to the [SyncerChannel]
/// with at least one sample. The sample may have zero duration.
struct InnerWriter<F: FileWriter> {
f: F,
r: Arc<Mutex<db::RecordingToInsert>>,
e: recording::SampleIndexEncoder,
id: CompositeId,
video_sample_entry_id: i32,
hasher: blake3::Hasher,
/// The start time of this recording, based solely on examining the local clock after frames in
/// this recording were received. Frames can suffer from various kinds of delay (initial
/// buffering, encoding, and network transmission), so this time is set to far in the future on
/// construction, given a real value on the first packet, and decreased as less-delayed packets
/// are discovered. See design/time.md for details.
local_start: recording::Time,
/// A sample which has been written to disk but not added to `index`. Index writes are one
/// sample behind disk writes because the duration of a sample is the difference between its
/// pts and the next sample's pts. A sample is flushed when the next sample is written, when
/// the writer is closed cleanly (the caller supplies the next pts), or when the writer is
/// closed uncleanly (with a zero duration, which the `.mp4` format allows only at the end).
///
/// `unindexed_sample` should always be `Some`, except when a `write` call has aborted on
/// shutdown. In that case, the close will be unable to write the full segment.
unindexed_sample: Option<UnindexedSample>,
}
/// A sample which has been written to disk but not included in the index yet.
/// The index includes the sample's duration, which is calculated from the
/// _following_ sample's pts, so the most recent sample is always unindexed.
#[derive(Copy, Clone)]
struct UnindexedSample {
local_time: recording::Time,
pts_90k: i64, // relative to the start of the run, not a single recording.
len: i32,
is_key: bool,
}
/// State associated with a run's previous recording; used within [Writer].
#[derive(Copy, Clone)]
struct PreviousWriter {
end: recording::Time,
run_offset: i32,
}
impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
/// `db` must not be locked.
pub fn new(
dir: &'a D,
db: &'a db::Database<C>,
channel: &'a SyncerChannel<D::File>,
stream_id: i32,
) -> Self {
Writer {
dir,
db,
channel,
stream_id,
state: WriterState::Unopened,
}
}
/// Opens a new recording if not already open.
///
/// On successful return, `self.state` will be `WriterState::Open(w)` with `w` violating the
/// invariant that `unindexed_sample` is `Some`. The caller (`write`) is responsible for
/// correcting this.
fn open(
&mut self,
shutdown_rx: &mut base::shutdown::Receiver,
video_sample_entry_id: i32,
) -> Result<(), Error> {
let prev = match self.state {
WriterState::Unopened => None,
WriterState::Open(ref o) => {
if o.video_sample_entry_id != video_sample_entry_id {
bail!(Internal, msg("inconsistent video_sample_entry_id"));
}
return Ok(());
}
WriterState::Closed(prev) => Some(prev),
};
let (id, r) = self.db.lock().add_recording(
self.stream_id,
db::RecordingToInsert {
run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0),
start: prev
.map(|p| p.end)
.unwrap_or(recording::Time(i64::max_value())),
video_sample_entry_id,
flags: db::RecordingFlags::Growing as i32,
..Default::default()
},
)?;
let f = clock::retry(&self.db.clocks(), shutdown_rx, &mut || {
self.dir.create_file(id)
})
.map_err(|e| err!(Cancelled, source(e)))?;
self.state = WriterState::Open(InnerWriter {
f,
r,
2021-05-17 17:31:50 -04:00
e: recording::SampleIndexEncoder::default(),
id,
hasher: blake3::Hasher::new(),
local_start: recording::Time(i64::max_value()),
unindexed_sample: None,
video_sample_entry_id,
});
Ok(())
}
pub fn previously_opened(&self) -> Result<bool, Error> {
Ok(match self.state {
WriterState::Unopened => false,
WriterState::Closed(_) => true,
WriterState::Open(_) => bail!(Internal, msg("open!")),
})
}
/// Writes a new frame to this recording.
/// `local_time` should be the local clock's time as of when this packet was received.
pub fn write(
&mut self,
shutdown_rx: &mut base::shutdown::Receiver,
pkt: &[u8],
local_time: recording::Time,
pts_90k: i64,
is_key: bool,
video_sample_entry_id: i32,
) -> Result<(), Error> {
self.open(shutdown_rx, video_sample_entry_id)?;
let w = match self.state {
WriterState::Open(ref mut w) => w,
_ => unreachable!(),
};
// Note w's invariant that `unindexed_sample` is `None` may currently be violated.
// We must restore it on all success or error paths.
if let Some(unindexed) = w.unindexed_sample.take() {
let duration = pts_90k - unindexed.pts_90k;
if duration <= 0 {
w.unindexed_sample = Some(unindexed); // restore invariant.
bail!(
InvalidArgument,
msg(
"pts not monotonically increasing; got {} then {}",
unindexed.pts_90k,
pts_90k,
),
);
}
let duration = match i32::try_from(duration) {
Ok(d) => d,
Err(_) => {
w.unindexed_sample = Some(unindexed); // restore invariant.
bail!(
InvalidArgument,
msg(
"excessive pts jump from {} to {}",
unindexed.pts_90k,
pts_90k,
),
)
}
};
if let Err(e) = w.add_sample(
duration,
unindexed.len,
unindexed.is_key,
unindexed.local_time,
self.db,
self.stream_id,
) {
w.unindexed_sample = Some(unindexed); // restore invariant.
return Err(e);
}
}
let mut remaining = pkt;
while !remaining.is_empty() {
let written =
match clock::retry(&self.db.clocks(), shutdown_rx, &mut || w.f.write(remaining)) {
Ok(w) => w,
Err(e) => {
// close() will do nothing because unindexed_sample will be None.
tracing::warn!(
"abandoning incompletely written recording {} on shutdown",
w.id
);
bail!(Cancelled, source(e));
}
};
remaining = &remaining[written..];
}
w.unindexed_sample = Some(UnindexedSample {
local_time,
pts_90k,
len: i32::try_from(pkt.len()).unwrap(),
is_key,
});
w.hasher.update(pkt);
Ok(())
}
/// Cleanly closes a single recording within this writer, using a supplied
/// pts of the next sample for the last sample's duration (if known).
///
/// The `Writer` may be used again, causing another recording to be created
/// within the same run.
///
/// If the `Writer` is dropped without `close`, the `Drop` trait impl will
/// close, swallowing errors and using a zero duration for the last sample.
pub fn close(&mut self, next_pts: Option<i64>, reason: Option<String>) -> Result<(), Error> {
self.state = match mem::replace(&mut self.state, WriterState::Unopened) {
WriterState::Open(w) => {
let prev = w.close(self.channel, next_pts, self.db, self.stream_id, reason)?;
WriterState::Closed(prev)
}
s => s,
};
Ok(())
}
}
fn clamp(v: i64, min: i64, max: i64) -> i64 {
std::cmp::min(std::cmp::max(v, min), max)
}
impl<F: FileWriter> InnerWriter<F> {
fn add_sample<C: Clocks + Clone>(
&mut self,
duration_90k: i32,
bytes: i32,
is_key: bool,
pkt_local_time: recording::Time,
db: &db::Database<C>,
stream_id: i32,
) -> Result<(), Error> {
let mut l = self.r.lock().unwrap();
// design/time.md explains these time manipulations in detail.
let prev_media_duration_90k = l.media_duration_90k;
let media_duration_90k = l.media_duration_90k + duration_90k;
let local_start = cmp::min(
self.local_start,
pkt_local_time - recording::Duration(i64::from(media_duration_90k)),
);
let limit = i64::from(media_duration_90k / 2000); // 1/2000th, aka 500 ppm.
let start = if l.run_offset == 0 {
// Start time isn't anchored to previous recording's end; adjust.
local_start
} else {
l.start
};
let wall_duration_90k = media_duration_90k
+ i32::try_from(clamp(local_start.0 - start.0, -limit, limit)).unwrap();
if wall_duration_90k > i32::try_from(MAX_RECORDING_WALL_DURATION).unwrap() {
bail!(
OutOfRange,
msg("Duration {wall_duration_90k} exceeds maximum {MAX_RECORDING_WALL_DURATION}"),
);
}
l.wall_duration_90k = wall_duration_90k;
l.start = start;
self.local_start = local_start;
self.e.add_sample(duration_90k, bytes, is_key, &mut l);
drop(l);
db.lock()
.send_live_segment(
stream_id,
db::LiveFrame {
recording: self.id.recording(),
is_key,
media_off_90k: prev_media_duration_90k..media_duration_90k,
},
)
.unwrap();
Ok(())
}
fn close<C: Clocks + Clone>(
mut self,
channel: &SyncerChannel<F>,
next_pts: Option<i64>,
db: &db::Database<C>,
stream_id: i32,
reason: Option<String>,
) -> Result<PreviousWriter, Error> {
let unindexed = self.unindexed_sample.take().ok_or_else(|| {
err!(
FailedPrecondition,
msg(
"unable to add recording {} to database due to aborted write",
self.id,
),
)
})?;
let (last_sample_duration, flags) = match next_pts {
None => (0, db::RecordingFlags::TrailingZero as i32),
Some(p) => (
i32::try_from(p - unindexed.pts_90k).map_err(|_| {
err!(
OutOfRange,
msg(
"pts {} following {} creates invalid duration",
p,
unindexed.pts_90k
)
)
})?,
0,
),
};
let blake3 = self.hasher.finalize();
let (run_offset, end);
self.add_sample(
last_sample_duration,
unindexed.len,
unindexed.is_key,
unindexed.local_time,
db,
stream_id,
)?;
// This always ends a live segment.
let wall_duration;
{
let mut l = self.r.lock().unwrap();
l.flags = flags;
l.local_time_delta = self.local_start - l.start;
2021-05-17 17:31:50 -04:00
l.sample_file_blake3 = Some(*blake3.as_bytes());
l.end_reason = reason;
wall_duration = recording::Duration(i64::from(l.wall_duration_90k));
run_offset = l.run_offset;
end = l.start + wall_duration;
}
drop(self.r);
channel.async_save_recording(self.id, wall_duration, self.f);
Ok(PreviousWriter { end, run_offset })
}
}
impl<'a, C: Clocks + Clone, D: DirWriter> Drop for Writer<'a, C, D> {
fn drop(&mut self) {
if ::std::thread::panicking() {
// This will probably panic again. Don't do it.
return;
}
if let WriterState::Open(w) = mem::replace(&mut self.state, WriterState::Unopened) {
// Swallow any error. The caller should only drop the Writer without calling close()
// if there's already been an error. The caller should report that. No point in
// complaining again.
let _ = w.close(
self.channel,
None,
self.db,
self.stream_id,
Some("drop".to_owned()),
);
}
}
}
#[cfg(test)]
mod tests {
use super::Writer;
use crate::db::{self, CompositeId, VideoSampleEntryToInsert};
use crate::recording;
use crate::testutil;
use base::clock::{Clocks, SimulatedClocks};
use std::collections::VecDeque;
use std::io;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use tracing::{trace, warn};
#[derive(Clone)]
struct MockDir(Arc<Mutex<VecDeque<MockDirAction>>>);
enum MockDirAction {
Create(
CompositeId,
Box<dyn Fn(CompositeId) -> Result<MockFile, nix::Error> + Send>,
),
2019-07-12 00:59:01 -04:00
Sync(Box<dyn Fn() -> Result<(), nix::Error> + Send>),
Unlink(
CompositeId,
Box<dyn Fn(CompositeId) -> Result<(), nix::Error> + Send>,
),
}
impl MockDir {
fn new() -> Self {
MockDir(Arc::new(Mutex::new(VecDeque::new())))
}
fn expect(&self, action: MockDirAction) {
self.0.lock().unwrap().push_back(action);
}
fn ensure_done(&self) {
assert_eq!(self.0.lock().unwrap().len(), 0);
}
}
impl super::DirWriter for MockDir {
type File = MockFile;
2019-07-12 00:59:01 -04:00
fn create_file(&self, id: CompositeId) -> Result<Self::File, nix::Error> {
match self
.0
.lock()
.unwrap()
.pop_front()
.expect("got create_file with no expectation")
{
MockDirAction::Create(expected_id, ref f) => {
assert_eq!(id, expected_id);
f(id)
}
_ => panic!("got create_file({id}), expected something else"),
}
}
2019-07-12 00:59:01 -04:00
fn sync(&self) -> Result<(), nix::Error> {
match self
.0
.lock()
.unwrap()
.pop_front()
.expect("got sync with no expectation")
{
MockDirAction::Sync(f) => f(),
_ => panic!("got sync, expected something else"),
}
}
2019-07-12 00:59:01 -04:00
fn unlink_file(&self, id: CompositeId) -> Result<(), nix::Error> {
match self
.0
.lock()
.unwrap()
.pop_front()
.expect("got unlink_file with no expectation")
{
MockDirAction::Unlink(expected_id, f) => {
assert_eq!(id, expected_id);
f(id)
}
_ => panic!("got unlink({id}), expected something else"),
}
}
}
impl Drop for MockDir {
fn drop(&mut self) {
if !::std::thread::panicking() {
assert_eq!(self.0.lock().unwrap().len(), 0);
}
}
}
#[derive(Clone)]
struct MockFile(Arc<Mutex<VecDeque<MockFileAction>>>);
enum MockFileAction {
SyncAll(Box<dyn Fn() -> Result<(), io::Error> + Send>),
Write(Box<dyn Fn(&[u8]) -> Result<usize, io::Error> + Send>),
}
impl MockFile {
fn new() -> Self {
MockFile(Arc::new(Mutex::new(VecDeque::new())))
}
fn expect(&self, action: MockFileAction) {
self.0.lock().unwrap().push_back(action);
}
fn ensure_done(&self) {
assert_eq!(self.0.lock().unwrap().len(), 0);
}
}
impl super::FileWriter for MockFile {
fn sync_all(&self) -> Result<(), io::Error> {
match self
.0
.lock()
.unwrap()
.pop_front()
.expect("got sync_all with no expectation")
{
MockFileAction::SyncAll(f) => f(),
_ => panic!("got sync_all, expected something else"),
}
}
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
match self
.0
.lock()
.unwrap()
.pop_front()
.expect("got write with no expectation")
{
MockFileAction::Write(f) => f(buf),
_ => panic!("got write({buf:?}), expected something else"),
}
}
}
struct Harness {
db: Arc<db::Database<SimulatedClocks>>,
dir_id: i32,
2021-05-17 16:08:01 -04:00
_tmpdir: ::tempfile::TempDir,
dir: MockDir,
channel: super::SyncerChannel<MockFile>,
_shutdown_tx: base::shutdown::Sender,
shutdown_rx: base::shutdown::Receiver,
syncer: super::Syncer<SimulatedClocks, MockDir>,
syncer_rx: mpsc::Receiver<super::SyncerCommand<MockFile>>,
}
fn new_harness(flush_if_sec: u32) -> Harness {
let clocks = SimulatedClocks::new(::time::Timespec::new(0, 0));
let tdb = testutil::TestDb::new_with_flush_if_sec(clocks, flush_if_sec);
let dir_id = *tdb
.db
.lock()
.sample_file_dirs_by_id()
.keys()
.next()
.unwrap();
// This starts a real fs-backed syncer. Get rid of it.
tdb.db.lock().clear_on_flush();
drop(tdb.syncer_channel);
tdb.syncer_join.join().unwrap();
// Start a mock syncer.
let dir = MockDir::new();
let (shutdown_tx, shutdown_rx) = base::shutdown::channel();
let syncer = super::Syncer {
dir_id: *tdb
.db
.lock()
.sample_file_dirs_by_id()
.keys()
.next()
.unwrap(),
dir: dir.clone(),
db: tdb.db.clone(),
2019-01-04 14:56:15 -05:00
planned_flushes: std::collections::BinaryHeap::new(),
shutdown_rx: shutdown_rx.clone(),
};
let (syncer_tx, syncer_rx) = mpsc::channel();
tdb.db.lock().on_flush(Box::new({
let snd = syncer_tx.clone();
move || {
if let Err(err) = snd.send(super::SyncerCommand::DatabaseFlushed) {
warn!(%err, "unable to notify syncer for dir {} of flush", dir_id);
}
}
}));
Harness {
dir_id,
dir,
db: tdb.db,
_tmpdir: tdb.tmpdir,
channel: super::SyncerChannel(syncer_tx),
_shutdown_tx: shutdown_tx,
shutdown_rx,
syncer,
syncer_rx,
}
}
fn eio() -> io::Error {
io::Error::new(io::ErrorKind::Other, "got EIO")
}
#[test]
fn excessive_pts_jump() {
testutil::init();
let mut h = new_harness(0);
let video_sample_entry_id =
h.db.lock()
.insert_video_sample_entry(VideoSampleEntryToInsert {
width: 1920,
height: 1080,
pasp_h_spacing: 1,
pasp_v_spacing: 1,
data: [0u8; 100].to_vec(),
rfc6381_codec: "avc1.000000".to_owned(),
})
.unwrap();
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID);
h.dir.expect(MockDirAction::Create(
CompositeId::new(1, 0),
Box::new(|_id| Err(nix::Error::EIO)),
));
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(
CompositeId::new(1, 0),
Box::new({
let f = f.clone();
move |_id| Ok(f.clone())
}),
));
f.expect(MockFileAction::Write(Box::new(|_| Ok(1))));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(
&mut h.shutdown_rx,
b"1",
recording::Time(1),
0,
true,
video_sample_entry_id,
)
.unwrap();
let e = w
.write(
&mut h.shutdown_rx,
b"2",
recording::Time(2),
i32::max_value() as i64 + 1,
true,
video_sample_entry_id,
)
.unwrap_err();
assert!(e.to_string().contains("excessive pts jump"));
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
drop(w);
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
f.ensure_done();
h.dir.ensure_done();
}
/// Tests the database flushing while a syncer is still processing a previous flush event.
#[test]
fn double_flush() {
testutil::init();
let mut h = new_harness(0);
h.db.lock()
.update_retention(&[db::RetentionChange {
stream_id: testutil::TEST_STREAM_ID,
new_record: true,
new_limit: 0,
}])
.unwrap();
// Setup: add a 3-byte recording.
let video_sample_entry_id =
h.db.lock()
.insert_video_sample_entry(VideoSampleEntryToInsert {
width: 1920,
height: 1080,
pasp_h_spacing: 1,
pasp_v_spacing: 1,
data: [0u8; 100].to_vec(),
rfc6381_codec: "avc1.000000".to_owned(),
})
.unwrap();
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID);
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(
CompositeId::new(1, 0),
Box::new({
let f = f.clone();
move |_id| Ok(f.clone())
}),
));
f.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"123");
Ok(3)
})));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(
&mut h.shutdown_rx,
b"123",
recording::Time(2),
0,
true,
video_sample_entry_id,
)
.unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
w.close(Some(1), None).unwrap();
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
f.ensure_done();
h.dir.ensure_done();
// Then a 1-byte recording.
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(
CompositeId::new(1, 1),
Box::new({
let f = f.clone();
move |_id| Ok(f.clone())
}),
));
f.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"4");
Ok(1)
})));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(
&mut h.shutdown_rx,
b"4",
recording::Time(3),
1,
true,
video_sample_entry_id,
)
.unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
h.dir.expect(MockDirAction::Unlink(
CompositeId::new(1, 0),
Box::new({
let db = h.db.clone();
move |_| {
// The drop(w) below should cause the old recording to be deleted (moved to
// garbage). When the database is flushed, the syncer forces garbage collection
// including this unlink.
// Do another database flush here, as if from another syncer.
db.lock().flush("another syncer running").unwrap();
Ok(())
}
}),
));
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
drop(w);
trace!("expecting AsyncSave");
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
trace!("expecting planned flush");
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
assert_eq!(h.syncer.planned_flushes.len(), 0);
trace!("expecting DatabaseFlushed");
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
trace!("expecting DatabaseFlushed again");
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed again
f.ensure_done();
h.dir.ensure_done();
// Garbage should be marked collected on the next database flush.
{
let mut l = h.db.lock();
let dir = l.sample_file_dirs_by_id().get(&h.dir_id).unwrap();
assert!(dir.garbage_needs_unlink.is_empty());
assert!(!dir.garbage_unlinked.is_empty());
l.flush("forced gc").unwrap();
let dir = l.sample_file_dirs_by_id().get(&h.dir_id).unwrap();
assert!(dir.garbage_needs_unlink.is_empty());
assert!(dir.garbage_unlinked.is_empty());
}
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
// The syncer should shut down cleanly.
drop(h.channel);
h.db.lock().clear_on_flush();
assert_eq!(
h.syncer_rx.try_recv().err(),
Some(std::sync::mpsc::TryRecvError::Disconnected)
);
assert!(h.syncer.planned_flushes.is_empty());
}
#[test]
fn write_path_retries() {
testutil::init();
let mut h = new_harness(0);
let video_sample_entry_id =
h.db.lock()
.insert_video_sample_entry(VideoSampleEntryToInsert {
width: 1920,
height: 1080,
pasp_h_spacing: 1,
pasp_v_spacing: 1,
data: [0u8; 100].to_vec(),
rfc6381_codec: "avc1.000000".to_owned(),
})
.unwrap();
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID);
h.dir.expect(MockDirAction::Create(
CompositeId::new(1, 0),
Box::new(|_id| Err(nix::Error::EIO)),
));
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(
CompositeId::new(1, 0),
Box::new({
let f = f.clone();
move |_id| Ok(f.clone())
}),
));
f.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"1234");
Err(eio())
})));
f.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"1234");
Ok(1)
})));
f.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"234");
Err(eio())
})));
f.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"234");
Ok(3)
})));
f.expect(MockFileAction::SyncAll(Box::new(|| Err(eio()))));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(
&mut h.shutdown_rx,
b"1234",
recording::Time(1),
0,
true,
video_sample_entry_id,
)
.unwrap();
h.dir
.expect(MockDirAction::Sync(Box::new(|| Err(nix::Error::EIO))));
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
drop(w);
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
f.ensure_done();
h.dir.ensure_done();
{
let l = h.db.lock();
let s = l.streams_by_id().get(&testutil::TEST_STREAM_ID).unwrap();
assert_eq!(s.bytes_to_add, 0);
assert_eq!(s.sample_file_bytes, 4);
}
// The syncer should shut down cleanly.
drop(h.channel);
h.db.lock().clear_on_flush();
assert_eq!(
h.syncer_rx.try_recv().err(),
Some(std::sync::mpsc::TryRecvError::Disconnected)
);
assert!(h.syncer.planned_flushes.is_empty());
}
#[test]
fn gc_path_retries() {
testutil::init();
let mut h = new_harness(0);
h.db.lock()
.update_retention(&[db::RetentionChange {
stream_id: testutil::TEST_STREAM_ID,
new_record: true,
new_limit: 0,
}])
.unwrap();
// Setup: add a 3-byte recording.
let video_sample_entry_id =
h.db.lock()
.insert_video_sample_entry(VideoSampleEntryToInsert {
width: 1920,
height: 1080,
pasp_h_spacing: 1,
pasp_v_spacing: 1,
data: [0u8; 100].to_vec(),
rfc6381_codec: "avc1.000000".to_owned(),
})
.unwrap();
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID);
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(
CompositeId::new(1, 0),
Box::new({
let f = f.clone();
move |_id| Ok(f.clone())
}),
));
f.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"123");
Ok(3)
})));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(
&mut h.shutdown_rx,
b"123",
recording::Time(2),
0,
true,
video_sample_entry_id,
)
.unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
w.close(Some(1), None).unwrap();
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
f.ensure_done();
h.dir.ensure_done();
// Then a 1-byte recording.
let f = MockFile::new();
h.dir.expect(MockDirAction::Create(
CompositeId::new(1, 1),
Box::new({
let f = f.clone();
move |_id| Ok(f.clone())
}),
));
f.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"4");
Ok(1)
})));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(
&mut h.shutdown_rx,
b"4",
recording::Time(3),
1,
true,
video_sample_entry_id,
)
.unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
h.dir.expect(MockDirAction::Unlink(
CompositeId::new(1, 0),
Box::new({
let db = h.db.clone();
move |_| {
// The drop(w) below should cause the old recording to be deleted (moved to
// garbage). When the database is flushed, the syncer forces garbage collection
// including this unlink.
// This should have already applied the changes to sample file bytes, even
// though the garbage has yet to be collected.
let l = db.lock();
let s = l.streams_by_id().get(&testutil::TEST_STREAM_ID).unwrap();
assert_eq!(s.bytes_to_delete, 0);
assert_eq!(s.bytes_to_add, 0);
assert_eq!(s.sample_file_bytes, 1);
Err(nix::Error::EIO) // force a retry.
}
}),
));
h.dir.expect(MockDirAction::Unlink(
CompositeId::new(1, 0),
Box::new(|_| Ok(())),
));
h.dir
.expect(MockDirAction::Sync(Box::new(|| Err(nix::Error::EIO))));
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
drop(w);
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
f.ensure_done();
h.dir.ensure_done();
// Garbage should be marked collected on the next flush.
{
let mut l = h.db.lock();
let dir = l.sample_file_dirs_by_id().get(&h.dir_id).unwrap();
assert!(dir.garbage_needs_unlink.is_empty());
assert!(!dir.garbage_unlinked.is_empty());
l.flush("forced gc").unwrap();
let dir = l.sample_file_dirs_by_id().get(&h.dir_id).unwrap();
assert!(dir.garbage_needs_unlink.is_empty());
assert!(dir.garbage_unlinked.is_empty());
}
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
// The syncer should shut down cleanly.
drop(h.channel);
h.db.lock().clear_on_flush();
assert_eq!(
h.syncer_rx.try_recv().err(),
Some(std::sync::mpsc::TryRecvError::Disconnected)
);
assert!(h.syncer.planned_flushes.is_empty());
}
#[test]
fn planned_flush() {
testutil::init();
let mut h = new_harness(60); // flush_if_sec=60
// There's a database constraint forbidding a recording starting at t=0, so advance.
h.db.clocks().sleep(time::Duration::seconds(1));
// Setup: add a 3-byte recording.
let video_sample_entry_id =
h.db.lock()
.insert_video_sample_entry(VideoSampleEntryToInsert {
width: 1920,
height: 1080,
pasp_h_spacing: 1,
pasp_v_spacing: 1,
data: [0u8; 100].to_vec(),
rfc6381_codec: "avc1.000000".to_owned(),
})
.unwrap();
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID);
let f1 = MockFile::new();
h.dir.expect(MockDirAction::Create(
CompositeId::new(1, 0),
Box::new({
let f = f1.clone();
move |_id| Ok(f.clone())
}),
));
f1.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"123");
Ok(3)
})));
f1.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(
&mut h.shutdown_rx,
b"123",
recording::Time(recording::TIME_UNITS_PER_SEC),
0,
true,
video_sample_entry_id,
)
.unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
drop(w);
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 1);
// Flush and let 30 seconds go by.
h.db.lock().flush("forced").unwrap();
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
assert_eq!(h.syncer.planned_flushes.len(), 1);
h.db.clocks().sleep(time::Duration::seconds(30));
// Then, a 1-byte recording.
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID);
let f2 = MockFile::new();
h.dir.expect(MockDirAction::Create(
CompositeId::new(1, 1),
Box::new({
let f = f2.clone();
move |_id| Ok(f.clone())
}),
));
f2.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"4");
Ok(1)
})));
f2.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(
&mut h.shutdown_rx,
b"4",
recording::Time(31 * recording::TIME_UNITS_PER_SEC),
1,
true,
video_sample_entry_id,
)
.unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
drop(w);
assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave
assert_eq!(h.syncer.planned_flushes.len(), 2);
assert_eq!(h.syncer.planned_flushes.len(), 2);
let db_flush_count_before = h.db.lock().flushes();
assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(31, 0));
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush (no-op)
assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(61, 0));
assert_eq!(h.db.lock().flushes(), db_flush_count_before);
assert_eq!(h.syncer.planned_flushes.len(), 1);
assert!(h.syncer.iter(&h.syncer_rx)); // planned flush
assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(91, 0));
assert_eq!(h.db.lock().flushes(), db_flush_count_before + 1);
assert_eq!(h.syncer.planned_flushes.len(), 0);
assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed
f1.ensure_done();
f2.ensure_done();
h.dir.ensure_done();
// The syncer should shut down cleanly.
drop(h.channel);
h.db.lock().clear_on_flush();
assert_eq!(
h.syncer_rx.try_recv().err(),
Some(std::sync::mpsc::TryRecvError::Disconnected)
);
assert!(h.syncer.planned_flushes.is_empty());
}
}