// This file is part of Moonfire NVR, a security camera network video recorder. // Copyright (C) 2020 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt. // SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception. //! Writing recordings and deleting old ones. use crate::db::{self, CompositeId}; use crate::dir; use crate::recording::{self, MAX_RECORDING_WALL_DURATION}; use base::clock::{self, Clocks}; use base::shutdown::ShutdownError; use failure::{bail, format_err, Error}; use fnv::FnvHashMap; use log::{debug, trace, warn}; use parking_lot::Mutex; use std::cmp::{self, Ordering}; use std::convert::TryFrom; use std::io; use std::mem; use std::sync::{mpsc, Arc}; use std::thread; use std::time::Duration as StdDuration; use time::{Duration, Timespec}; /// Trait to allow mocking out [crate::dir::SampleFileDir] in syncer tests. /// This is public because it's exposed in the [SyncerChannel] type parameters, /// not because it's of direct use outside this module. pub trait DirWriter: 'static + Send { type File: FileWriter; fn create_file(&self, id: CompositeId) -> Result; fn sync(&self) -> Result<(), nix::Error>; fn unlink_file(&self, id: CompositeId) -> Result<(), nix::Error>; } /// Trait to allow mocking out [std::fs::File] in syncer tests. /// This is public because it's exposed in the [SyncerChannel] type parameters, /// not because it's of direct use outside this module. pub trait FileWriter: 'static { /// As in `std::fs::File::sync_all`. fn sync_all(&self) -> Result<(), io::Error>; /// As in `std::io::Writer::write`. fn write(&mut self, buf: &[u8]) -> Result; } impl DirWriter for Arc { type File = ::std::fs::File; fn create_file(&self, id: CompositeId) -> Result { dir::SampleFileDir::create_file(self, id) } fn sync(&self) -> Result<(), nix::Error> { dir::SampleFileDir::sync(self) } fn unlink_file(&self, id: CompositeId) -> Result<(), nix::Error> { dir::SampleFileDir::unlink_file(self, id) } } impl FileWriter for ::std::fs::File { fn sync_all(&self) -> Result<(), io::Error> { self.sync_all() } fn write(&mut self, buf: &[u8]) -> Result { io::Write::write(self, buf) } } /// A command sent to a [Syncer]. enum SyncerCommand { /// Command sent by [SyncerChannel::async_save_recording]. AsyncSaveRecording(CompositeId, recording::Duration, F), /// Notes that the database has been flushed and garbage collection should be attempted. /// [start_syncer] sets up a database callback to send this command. DatabaseFlushed, /// Command sent by [SyncerChannel::flush]. Flush(mpsc::SyncSender<()>), } /// A channel which can be used to send commands to the syncer. /// Can be cloned to allow multiple threads to send commands. pub struct SyncerChannel(mpsc::Sender>); impl ::std::clone::Clone for SyncerChannel { fn clone(&self) -> Self { SyncerChannel(self.0.clone()) } } /// State of the worker thread created by [start_syncer]. struct Syncer { dir_id: i32, dir: D, db: Arc>, planned_flushes: std::collections::BinaryHeap, shutdown_rx: base::shutdown::Receiver, } /// A plan to flush at a given instant due to a recently-saved recording's `flush_if_sec` parameter. struct PlannedFlush { /// Monotonic time at which this flush should happen. when: Timespec, /// Recording which prompts this flush. If this recording is already flushed at the planned /// time, it can be skipped. recording: CompositeId, /// A human-readable reason for the flush, for logs. reason: String, /// Senders to drop when this time is reached. This is for test instrumentation; see /// [SyncerChannel::flush]. senders: Vec>, } // PlannedFlush is meant for placement in a max-heap which should return the soonest flush. This // PlannedFlush is greater than other if its when is _less_ than the other's. impl Ord for PlannedFlush { fn cmp(&self, other: &Self) -> Ordering { other.when.cmp(&self.when) } } impl PartialOrd for PlannedFlush { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } impl PartialEq for PlannedFlush { fn eq(&self, other: &Self) -> bool { self.when == other.when } } impl Eq for PlannedFlush {} /// Starts a syncer for the given sample file directory. /// /// The lock must not be held on `db` when this is called. /// /// There should be only one syncer per directory, or 0 if operating in read-only mode. /// This function will perform the initial rotation synchronously, so that it is finished before /// file writing starts. Afterward the syncing happens in a background thread. /// /// Returns a `SyncerChannel` which can be used to send commands (and can be cloned freely) and /// a `JoinHandle` for the syncer thread. Commands sent on the channel will be executed or retried /// forever. (TODO: provide some manner of pushback during retry.) At program shutdown, all /// `SyncerChannel` clones should be dropped and then the handle joined to allow all recordings to /// be persisted. /// /// Note that dropping all `SyncerChannel` clones currently includes calling /// `LockedDatabase::clear_on_flush`, as this function installs a hook to watch database flushes. /// TODO: add a join wrapper which arranges for the on flush hook to be removed automatically. pub fn start_syncer( db: Arc>, shutdown_rx: base::shutdown::Receiver, dir_id: i32, ) -> Result<(SyncerChannel<::std::fs::File>, thread::JoinHandle<()>), Error> where C: Clocks + Clone, { let db2 = db.clone(); let (mut syncer, path) = Syncer::new(&db.lock(), shutdown_rx, db2, dir_id)?; syncer.initial_rotation()?; let (snd, rcv) = mpsc::channel(); db.lock().on_flush(Box::new({ let snd = snd.clone(); move || { if let Err(e) = snd.send(SyncerCommand::DatabaseFlushed) { warn!("Unable to notify syncer for dir {} of flush: {}", dir_id, e); } } })); Ok(( SyncerChannel(snd), thread::Builder::new() .name(format!("sync-{}", path)) .spawn(move || while syncer.iter(&rcv) {}) .unwrap(), )) } /// A new retention limit for use in [lower_retention]. pub struct NewLimit { pub stream_id: i32, pub limit: i64, } /// Immediately deletes recordings if necessary to fit within the given new `retain_bytes` limit. /// Note this doesn't change the limit in the database; it only deletes files. /// Pass a limit of 0 to delete all recordings associated with a camera. /// /// This is expected to be performed from `moonfire-nvr config` when no syncer is running. /// It potentially flushes the database twice (before and after the actual deletion). pub fn lower_retention( db: Arc, dir_id: i32, limits: &[NewLimit], ) -> Result<(), Error> { let db2 = db.clone(); let (_tx, rx) = base::shutdown::channel(); let (mut syncer, _) = Syncer::new(&db.lock(), rx, db2, dir_id)?; syncer.do_rotation(|db| { for l in limits { let (fs_bytes_before, extra); { let stream = db .streams_by_id() .get(&l.stream_id) .ok_or_else(|| format_err!("no such stream {}", l.stream_id))?; fs_bytes_before = stream.fs_bytes + stream.fs_bytes_to_add - stream.fs_bytes_to_delete; extra = stream.retain_bytes - l.limit; } if l.limit >= fs_bytes_before { continue; } delete_recordings(db, l.stream_id, extra)?; } Ok(()) }) } /// Enqueues deletion of recordings to bring a stream's disk usage within bounds. /// The next flush will mark the recordings as garbage in the SQLite database, and then they can /// be deleted from disk. fn delete_recordings( db: &mut db::LockedDatabase, stream_id: i32, extra_bytes_needed: i64, ) -> Result<(), Error> { let fs_bytes_needed = { let stream = match db.streams_by_id().get(&stream_id) { None => bail!("no stream {}", stream_id), Some(s) => s, }; stream.fs_bytes + stream.fs_bytes_to_add - stream.fs_bytes_to_delete + extra_bytes_needed - stream.retain_bytes }; let mut fs_bytes_to_delete = 0; if fs_bytes_needed <= 0 { debug!( "{}: have remaining quota of {}", stream_id, base::strutil::encode_size(-fs_bytes_needed) ); return Ok(()); } let mut n = 0; db.delete_oldest_recordings(stream_id, &mut |row| { if fs_bytes_needed >= fs_bytes_to_delete { fs_bytes_to_delete += db::round_up(i64::from(row.sample_file_bytes)); n += 1; return true; } false })?; Ok(()) } impl SyncerChannel { /// Asynchronously syncs the given writer, closes it, records it into the database, and /// starts rotation. fn async_save_recording(&self, id: CompositeId, wall_duration: recording::Duration, f: F) { self.0 .send(SyncerCommand::AsyncSaveRecording(id, wall_duration, f)) .unwrap(); } /// For testing: flushes the syncer, waiting for all currently-queued commands to complete, /// including the next scheduled database flush (if any). Note this doesn't wait for any /// post-database flush garbage collection. pub fn flush(&self) { let (snd, rcv) = mpsc::sync_channel(0); self.0.send(SyncerCommand::Flush(snd)).unwrap(); rcv.recv().unwrap_err(); // syncer should just drop the channel, closing it. } } /// Lists files which should be "abandoned" (deleted without ever recording in the database) /// on opening. fn list_files_to_abandon( dir: &dir::SampleFileDir, streams_to_next: FnvHashMap, ) -> Result, Error> { let mut v = Vec::new(); let mut d = dir.opendir()?; for e in d.iter() { let e = e?; let id = match dir::parse_id(e.file_name().to_bytes()) { Ok(i) => i, Err(_) => continue, }; let next = match streams_to_next.get(&id.stream()) { Some(n) => *n, None => continue, // unknown stream. }; if id.recording() >= next { v.push(id); } } Ok(v) } impl Syncer> { fn new( l: &db::LockedDatabase, shutdown_rx: base::shutdown::Receiver, db: Arc>, dir_id: i32, ) -> Result<(Self, String), Error> { let d = l .sample_file_dirs_by_id() .get(&dir_id) .ok_or_else(|| format_err!("no dir {}", dir_id))?; let dir = d.get()?; // Abandon files. // First, get a list of the streams in question. let streams_to_next: FnvHashMap<_, _> = l .streams_by_id() .iter() .filter_map(|(&k, v)| { if v.sample_file_dir_id == Some(dir_id) { Some((k, v.cum_recordings)) } else { None } }) .collect(); let to_abandon = list_files_to_abandon(&dir, streams_to_next)?; let mut undeletable = 0; for &id in &to_abandon { if let Err(e) = dir.unlink_file(id) { if e == nix::Error::ENOENT { warn!("dir: abandoned recording {} already deleted!", id); } else { warn!("dir: Unable to unlink abandoned recording {}: {}", id, e); undeletable += 1; } } } if undeletable > 0 { bail!("Unable to delete {} abandoned recordings.", undeletable); } Ok(( Syncer { dir_id, shutdown_rx, dir, db, planned_flushes: std::collections::BinaryHeap::new(), }, d.path.clone(), )) } /// Rotates files for all streams and deletes stale files from previous runs. /// Called from main thread. fn initial_rotation(&mut self) -> Result<(), Error> { self.do_rotation(|db| { let streams: Vec = db.streams_by_id().keys().copied().collect(); for &stream_id in &streams { delete_recordings(db, stream_id, 0)?; } Ok(()) }) } /// Helper to do initial or retention-lowering rotation. Called from main thread. fn do_rotation(&mut self, delete_recordings: F) -> Result<(), Error> where F: Fn(&mut db::LockedDatabase) -> Result<(), Error>, { { let mut db = self.db.lock(); delete_recordings(&mut *db)?; db.flush("synchronous deletion")?; } let mut garbage: Vec<_> = { let l = self.db.lock(); let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap(); d.garbage_needs_unlink.iter().copied().collect() }; if !garbage.is_empty() { // Try to delete files; retain ones in `garbage` that don't exist. let mut errors = 0; for &id in &garbage { if let Err(e) = self.dir.unlink_file(id) { if e != nix::Error::ENOENT { warn!("dir: Unable to unlink {}: {}", id, e); errors += 1; } } } if errors > 0 { bail!( "Unable to unlink {} files (see earlier warning messages for details)", errors ); } self.dir.sync()?; self.db.lock().delete_garbage(self.dir_id, &mut garbage)?; self.db.lock().flush("synchronous garbage collection")?; } Ok(()) } } impl Syncer { /// Processes a single command or timeout. /// /// Returns true iff the loop should continue. fn iter(&mut self, cmds: &mpsc::Receiver>) -> bool { // Wait for a command, the next flush timeout (if specified), or channel disconnect. let next_flush = self.planned_flushes.peek().map(|f| f.when); let cmd = match next_flush { None => match cmds.recv() { Err(_) => return false, // all cmd senders are gone. Ok(cmd) => cmd, }, Some(t) => { let now = self.db.clocks().monotonic(); // Calculate the timeout to use, mapping negative durations to 0. let timeout = (t - now) .to_std() .unwrap_or_else(|_| StdDuration::new(0, 0)); match self.db.clocks().recv_timeout(&cmds, timeout) { Err(mpsc::RecvTimeoutError::Disconnected) => return false, // cmd senders gone. Err(mpsc::RecvTimeoutError::Timeout) => { self.flush(); return true; } Ok(cmd) => cmd, } } }; // Have a command; handle it. match cmd { SyncerCommand::AsyncSaveRecording(id, wall_dur, f) => { if self.save(id, wall_dur, f).is_err() { return false; } } SyncerCommand::DatabaseFlushed => { if self.collect_garbage().is_err() { return false; } } SyncerCommand::Flush(flush) => { // The sender is waiting for the supplied writer to be dropped. If there's no // timeout, do so immediately; otherwise wait for that timeout then drop it. if let Some(mut f) = self.planned_flushes.peek_mut() { f.senders.push(flush); } } }; true } /// Collects garbage (without forcing a sync). Called from worker thread. fn collect_garbage(&mut self) -> Result<(), ShutdownError> { trace!("Collecting garbage"); let mut garbage: Vec<_> = { let l = self.db.lock(); let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap(); d.garbage_needs_unlink.iter().copied().collect() }; if garbage.is_empty() { return Ok(()); } let c = &self.db.clocks(); for &id in &garbage { clock::retry(c, &self.shutdown_rx, &mut || { if let Err(e) = self.dir.unlink_file(id) { if e == nix::Error::ENOENT { warn!("dir: recording {} already deleted!", id); return Ok(()); } return Err(e); } Ok(()) })?; } clock::retry(c, &self.shutdown_rx, &mut || self.dir.sync())?; clock::retry(c, &self.shutdown_rx, &mut || { self.db.lock().delete_garbage(self.dir_id, &mut garbage) })?; Ok(()) } /// Saves the given recording and prompts rotation. Called from worker thread. /// Note that this doesn't flush immediately; SQLite transactions are batched to lower SSD /// wear. On the next flush, the old recordings will actually be marked as garbage in the /// database, and shortly afterward actually deleted from disk. fn save( &mut self, id: CompositeId, wall_duration: recording::Duration, f: D::File, ) -> Result<(), ShutdownError> { trace!("Processing save for {}", id); let stream_id = id.stream(); // Free up a like number of bytes. clock::retry(&self.db.clocks(), &self.shutdown_rx, &mut || f.sync_all())?; clock::retry(&self.db.clocks(), &self.shutdown_rx, &mut || { self.dir.sync() })?; let mut db = self.db.lock(); db.mark_synced(id).unwrap(); delete_recordings(&mut db, stream_id, 0).unwrap(); let s = db.streams_by_id().get(&stream_id).unwrap(); let c = db.cameras_by_id().get(&s.camera_id).unwrap(); // Schedule a flush. let how_soon = Duration::seconds(s.flush_if_sec) - wall_duration.to_tm_duration(); let now = self.db.clocks().monotonic(); let when = now + how_soon; let reason = format!( "{} sec after start of {} {}-{} recording {}", s.flush_if_sec, wall_duration, c.short_name, s.type_.as_str(), id ); trace!("scheduling flush in {} because {}", how_soon, &reason); self.planned_flushes.push(PlannedFlush { when, reason, recording: id, senders: Vec::new(), }); Ok(()) } /// Flushes the database if necessary to honor `flush_if_sec` for some recording. /// Called from worker thread when one of the `planned_flushes` arrives. fn flush(&mut self) { trace!("Flushing"); let mut l = self.db.lock(); // Look through the planned flushes and see if any are still relevant. It's possible // they're not because something else (e.g., a syncer for a different sample file dir) // has flushed the database in the meantime. use std::collections::binary_heap::PeekMut; while let Some(f) = self.planned_flushes.peek_mut() { let s = match l.streams_by_id().get(&f.recording.stream()) { Some(s) => s, None => { // Removing streams while running hasn't been implemented yet, so this should // be impossible. warn!( "bug: no stream for {} which was scheduled to be flushed", f.recording ); PeekMut::pop(f); continue; } }; if s.cum_recordings <= f.recording.recording() { // not yet committed. break; } trace!("planned flush ({}) no longer needed", &f.reason); PeekMut::pop(f); } // If there's anything left to do now, try to flush. let f = match self.planned_flushes.peek() { None => return, Some(f) => f, }; let now = self.db.clocks().monotonic(); if f.when > now { return; } if let Err(e) = l.flush(&f.reason) { let d = Duration::minutes(1); warn!( "flush failure on save for reason {}; will retry after {}: {:?}", f.reason, d, e ); self.planned_flushes .peek_mut() .expect("planned_flushes is non-empty") .when = self.db.clocks().monotonic() + Duration::minutes(1); return; } // A successful flush should take care of everything planned. self.planned_flushes.clear(); } } /// Struct for writing a single run (of potentially several recordings) to disk and committing its /// metadata to the database. `Writer` hands off each recording's state to the syncer when done. It /// saves the recording to the database (if I/O errors do not prevent this), retries forever, /// or panics (if further writing on this stream is impossible). pub struct Writer<'a, C: Clocks + Clone, D: DirWriter> { dir: &'a D, db: &'a db::Database, channel: &'a SyncerChannel, stream_id: i32, video_sample_entry_id: i32, state: WriterState, } enum WriterState { Unopened, Open(InnerWriter), Closed(PreviousWriter), } /// State for writing a single recording, used within [Writer]. /// /// Note that the recording created by every `InnerWriter` must be written to the [SyncerChannel] /// with at least one sample. The sample may have zero duration. struct InnerWriter { f: F, r: Arc>, e: recording::SampleIndexEncoder, id: CompositeId, hasher: blake3::Hasher, /// The start time of this recording, based solely on examining the local clock after frames in /// this recording were received. Frames can suffer from various kinds of delay (initial /// buffering, encoding, and network transmission), so this time is set to far in the future on /// construction, given a real value on the first packet, and decreased as less-delayed packets /// are discovered. See design/time.md for details. local_start: recording::Time, /// A sample which has been written to disk but not added to `index`. Index writes are one /// sample behind disk writes because the duration of a sample is the difference between its /// pts and the next sample's pts. A sample is flushed when the next sample is written, when /// the writer is closed cleanly (the caller supplies the next pts), or when the writer is /// closed uncleanly (with a zero duration, which the `.mp4` format allows only at the end). /// /// `unindexed_sample` should always be `Some`, except when a `write` call has aborted on /// shutdown. In that case, the close will be unable to write the full segment. unindexed_sample: Option, } /// A sample which has been written to disk but not included in the index yet. /// The index includes the sample's duration, which is calculated from the /// _following_ sample's pts, so the most recent sample is always unindexed. #[derive(Copy, Clone)] struct UnindexedSample { local_time: recording::Time, pts_90k: i64, // relative to the start of the run, not a single recording. len: i32, is_key: bool, } /// State associated with a run's previous recording; used within [Writer]. #[derive(Copy, Clone)] struct PreviousWriter { end: recording::Time, run_offset: i32, } impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { /// `db` must not be locked. pub fn new( dir: &'a D, db: &'a db::Database, channel: &'a SyncerChannel, stream_id: i32, video_sample_entry_id: i32, ) -> Self { Writer { dir, db, channel, stream_id, video_sample_entry_id, state: WriterState::Unopened, } } /// Opens a new writer. /// On successful return, `self.state` will be `WriterState::Open(w)` with `w` violating the /// invariant that `unindexed_sample` is `Some`. The caller (`write`) is responsible for /// correcting this. fn open(&mut self, shutdown_rx: &mut base::shutdown::Receiver) -> Result<(), Error> { let prev = match self.state { WriterState::Unopened => None, WriterState::Open(_) => return Ok(()), WriterState::Closed(prev) => Some(prev), }; let (id, r) = self.db.lock().add_recording( self.stream_id, db::RecordingToInsert { run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0), start: prev .map(|p| p.end) .unwrap_or(recording::Time(i64::max_value())), video_sample_entry_id: self.video_sample_entry_id, flags: db::RecordingFlags::Growing as i32, ..Default::default() }, )?; let f = clock::retry(&self.db.clocks(), shutdown_rx, &mut || { self.dir.create_file(id) })?; self.state = WriterState::Open(InnerWriter { f, r, e: recording::SampleIndexEncoder::default(), id, hasher: blake3::Hasher::new(), local_start: recording::Time(i64::max_value()), unindexed_sample: None, }); Ok(()) } pub fn previously_opened(&self) -> Result { Ok(match self.state { WriterState::Unopened => false, WriterState::Closed(_) => true, WriterState::Open(_) => bail!("open!"), }) } /// Writes a new frame to this recording. /// `local_time` should be the local clock's time as of when this packet was received. pub fn write( &mut self, shutdown_rx: &mut base::shutdown::Receiver, pkt: &[u8], local_time: recording::Time, pts_90k: i64, is_key: bool, ) -> Result<(), Error> { self.open(shutdown_rx)?; let w = match self.state { WriterState::Open(ref mut w) => w, _ => unreachable!(), }; // Note w's invariant that `unindexed_sample` is `None` may currently be violated. // We must restore it on all success or error paths. if let Some(unindexed) = w.unindexed_sample.take() { let duration = pts_90k - unindexed.pts_90k; if duration <= 0 { w.unindexed_sample = Some(unindexed); // restore invariant. bail!( "pts not monotonically increasing; got {} then {}", unindexed.pts_90k, pts_90k ); } let duration = match i32::try_from(duration) { Ok(d) => d, Err(_) => { w.unindexed_sample = Some(unindexed); // restore invariant. bail!( "excessive pts jump from {} to {}", unindexed.pts_90k, pts_90k ) } }; if let Err(e) = w.add_sample( duration, unindexed.len, unindexed.is_key, unindexed.local_time, self.db, self.stream_id, ) { w.unindexed_sample = Some(unindexed); // restore invariant. return Err(e); } } let mut remaining = pkt; while !remaining.is_empty() { let written = match clock::retry(&self.db.clocks(), shutdown_rx, &mut || w.f.write(remaining)) { Ok(w) => w, Err(e) => { // close() will do nothing because unindexed_sample will be None. log::warn!( "Abandoning incompletely written recording {} on shutdown", w.id ); return Err(e.into()); } }; remaining = &remaining[written..]; } w.unindexed_sample = Some(UnindexedSample { local_time, pts_90k, len: i32::try_from(pkt.len()).unwrap(), is_key, }); w.hasher.update(pkt); Ok(()) } /// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's /// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait, /// swallowing errors and using a zero duration for the last sample. pub fn close(&mut self, next_pts: Option) -> Result<(), Error> { self.state = match mem::replace(&mut self.state, WriterState::Unopened) { WriterState::Open(w) => { let prev = w.close(self.channel, next_pts, self.db, self.stream_id)?; WriterState::Closed(prev) } s => s, }; Ok(()) } } fn clamp(v: i64, min: i64, max: i64) -> i64 { std::cmp::min(std::cmp::max(v, min), max) } impl InnerWriter { fn add_sample( &mut self, duration_90k: i32, bytes: i32, is_key: bool, pkt_local_time: recording::Time, db: &db::Database, stream_id: i32, ) -> Result<(), Error> { let mut l = self.r.lock(); // design/time.md explains these time manipulations in detail. let prev_media_duration_90k = l.media_duration_90k; let media_duration_90k = l.media_duration_90k + duration_90k; let local_start = cmp::min( self.local_start, pkt_local_time - recording::Duration(i64::from(media_duration_90k)), ); let limit = i64::from(media_duration_90k / 2000); // 1/2000th, aka 500 ppm. let start = if l.run_offset == 0 { // Start time isn't anchored to previous recording's end; adjust. local_start } else { l.start }; let wall_duration_90k = media_duration_90k + i32::try_from(clamp(local_start.0 - start.0, -limit, limit)).unwrap(); if wall_duration_90k > i32::try_from(MAX_RECORDING_WALL_DURATION).unwrap() { bail!( "Duration {} exceeds maximum {}", wall_duration_90k, MAX_RECORDING_WALL_DURATION ); } l.wall_duration_90k = wall_duration_90k; l.start = start; self.local_start = local_start; self.e.add_sample(duration_90k, bytes, is_key, &mut l); drop(l); db.lock() .send_live_segment( stream_id, db::LiveSegment { recording: self.id.recording(), is_key, media_off_90k: prev_media_duration_90k..media_duration_90k, }, ) .unwrap(); Ok(()) } fn close( mut self, channel: &SyncerChannel, next_pts: Option, db: &db::Database, stream_id: i32, ) -> Result { let unindexed = self.unindexed_sample.take().ok_or_else(|| { format_err!( "Unable to add recording {} to database due to aborted write", self.id ) })?; let (last_sample_duration, flags) = match next_pts { None => (0, db::RecordingFlags::TrailingZero as i32), Some(p) => (i32::try_from(p - unindexed.pts_90k)?, 0), }; let blake3 = self.hasher.finalize(); let (run_offset, end); self.add_sample( last_sample_duration, unindexed.len, unindexed.is_key, unindexed.local_time, db, stream_id, )?; // This always ends a live segment. let wall_duration; { let mut l = self.r.lock(); l.flags = flags; l.local_time_delta = self.local_start - l.start; l.sample_file_blake3 = Some(*blake3.as_bytes()); wall_duration = recording::Duration(i64::from(l.wall_duration_90k)); run_offset = l.run_offset; end = l.start + wall_duration; } drop(self.r); channel.async_save_recording(self.id, wall_duration, self.f); Ok(PreviousWriter { end, run_offset }) } } impl<'a, C: Clocks + Clone, D: DirWriter> Drop for Writer<'a, C, D> { fn drop(&mut self) { if ::std::thread::panicking() { // This will probably panic again. Don't do it. return; } if let WriterState::Open(w) = mem::replace(&mut self.state, WriterState::Unopened) { // Swallow any error. The caller should only drop the Writer without calling close() // if there's already been an error. The caller should report that. No point in // complaining again. let _ = w.close(self.channel, None, self.db, self.stream_id); } } } #[cfg(test)] mod tests { use super::Writer; use crate::db::{self, CompositeId, VideoSampleEntryToInsert}; use crate::recording; use crate::testutil; use base::clock::{Clocks, SimulatedClocks}; use log::{trace, warn}; use parking_lot::Mutex; use std::collections::VecDeque; use std::io; use std::sync::mpsc; use std::sync::Arc; #[derive(Clone)] struct MockDir(Arc>>); enum MockDirAction { Create( CompositeId, Box Result + Send>, ), Sync(Box Result<(), nix::Error> + Send>), Unlink( CompositeId, Box Result<(), nix::Error> + Send>, ), } impl MockDir { fn new() -> Self { MockDir(Arc::new(Mutex::new(VecDeque::new()))) } fn expect(&self, action: MockDirAction) { self.0.lock().push_back(action); } fn ensure_done(&self) { assert_eq!(self.0.lock().len(), 0); } } impl super::DirWriter for MockDir { type File = MockFile; fn create_file(&self, id: CompositeId) -> Result { match self .0 .lock() .pop_front() .expect("got create_file with no expectation") { MockDirAction::Create(expected_id, ref f) => { assert_eq!(id, expected_id); f(id) } _ => panic!("got create_file({}), expected something else", id), } } fn sync(&self) -> Result<(), nix::Error> { match self .0 .lock() .pop_front() .expect("got sync with no expectation") { MockDirAction::Sync(f) => f(), _ => panic!("got sync, expected something else"), } } fn unlink_file(&self, id: CompositeId) -> Result<(), nix::Error> { match self .0 .lock() .pop_front() .expect("got unlink_file with no expectation") { MockDirAction::Unlink(expected_id, f) => { assert_eq!(id, expected_id); f(id) } _ => panic!("got unlink({}), expected something else", id), } } } impl Drop for MockDir { fn drop(&mut self) { if !::std::thread::panicking() { assert_eq!(self.0.lock().len(), 0); } } } #[derive(Clone)] struct MockFile(Arc>>); enum MockFileAction { SyncAll(Box Result<(), io::Error> + Send>), Write(Box Result + Send>), } impl MockFile { fn new() -> Self { MockFile(Arc::new(Mutex::new(VecDeque::new()))) } fn expect(&self, action: MockFileAction) { self.0.lock().push_back(action); } fn ensure_done(&self) { assert_eq!(self.0.lock().len(), 0); } } impl super::FileWriter for MockFile { fn sync_all(&self) -> Result<(), io::Error> { match self .0 .lock() .pop_front() .expect("got sync_all with no expectation") { MockFileAction::SyncAll(f) => f(), _ => panic!("got sync_all, expected something else"), } } fn write(&mut self, buf: &[u8]) -> Result { match self .0 .lock() .pop_front() .expect("got write with no expectation") { MockFileAction::Write(f) => f(buf), _ => panic!("got write({:?}), expected something else", buf), } } } struct Harness { db: Arc>, dir_id: i32, _tmpdir: ::tempfile::TempDir, dir: MockDir, channel: super::SyncerChannel, _shutdown_tx: base::shutdown::Sender, shutdown_rx: base::shutdown::Receiver, syncer: super::Syncer, syncer_rx: mpsc::Receiver>, } fn new_harness(flush_if_sec: i64) -> Harness { let clocks = SimulatedClocks::new(::time::Timespec::new(0, 0)); let tdb = testutil::TestDb::new_with_flush_if_sec(clocks, flush_if_sec); let dir_id = *tdb .db .lock() .sample_file_dirs_by_id() .keys() .next() .unwrap(); // This starts a real fs-backed syncer. Get rid of it. tdb.db.lock().clear_on_flush(); drop(tdb.syncer_channel); tdb.syncer_join.join().unwrap(); // Start a mock syncer. let dir = MockDir::new(); let (shutdown_tx, shutdown_rx) = base::shutdown::channel(); let syncer = super::Syncer { dir_id: *tdb .db .lock() .sample_file_dirs_by_id() .keys() .next() .unwrap(), dir: dir.clone(), db: tdb.db.clone(), planned_flushes: std::collections::BinaryHeap::new(), shutdown_rx: shutdown_rx.clone(), }; let (syncer_tx, syncer_rx) = mpsc::channel(); tdb.db.lock().on_flush(Box::new({ let snd = syncer_tx.clone(); move || { if let Err(e) = snd.send(super::SyncerCommand::DatabaseFlushed) { warn!("Unable to notify syncer for dir {} of flush: {}", dir_id, e); } } })); Harness { dir_id, dir, db: tdb.db, _tmpdir: tdb.tmpdir, channel: super::SyncerChannel(syncer_tx), _shutdown_tx: shutdown_tx, shutdown_rx, syncer, syncer_rx, } } fn eio() -> io::Error { io::Error::new(io::ErrorKind::Other, "got EIO") } #[test] fn excessive_pts_jump() { testutil::init(); let mut h = new_harness(0); let video_sample_entry_id = h.db.lock() .insert_video_sample_entry(VideoSampleEntryToInsert { width: 1920, height: 1080, pasp_h_spacing: 1, pasp_v_spacing: 1, data: [0u8; 100].to_vec(), rfc6381_codec: "avc1.000000".to_owned(), }) .unwrap(); let mut w = Writer::new( &h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, video_sample_entry_id, ); h.dir.expect(MockDirAction::Create( CompositeId::new(1, 0), Box::new(|_id| Err(nix::Error::EIO)), )); let f = MockFile::new(); h.dir.expect(MockDirAction::Create( CompositeId::new(1, 0), Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }), )); f.expect(MockFileAction::Write(Box::new(|_| Ok(1)))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); w.write(&mut h.shutdown_rx, b"1", recording::Time(1), 0, true) .unwrap(); let e = w .write( &mut h.shutdown_rx, b"2", recording::Time(2), i32::max_value() as i64 + 1, true, ) .unwrap_err(); assert!(e.to_string().contains("excessive pts jump")); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); drop(w); assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave assert_eq!(h.syncer.planned_flushes.len(), 1); assert!(h.syncer.iter(&h.syncer_rx)); // planned flush assert_eq!(h.syncer.planned_flushes.len(), 0); assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed f.ensure_done(); h.dir.ensure_done(); } /// Tests the database flushing while a syncer is still processing a previous flush event. #[test] fn double_flush() { testutil::init(); let mut h = new_harness(0); h.db.lock() .update_retention(&[db::RetentionChange { stream_id: testutil::TEST_STREAM_ID, new_record: true, new_limit: 0, }]) .unwrap(); // Setup: add a 3-byte recording. let video_sample_entry_id = h.db.lock() .insert_video_sample_entry(VideoSampleEntryToInsert { width: 1920, height: 1080, pasp_h_spacing: 1, pasp_v_spacing: 1, data: [0u8; 100].to_vec(), rfc6381_codec: "avc1.000000".to_owned(), }) .unwrap(); let mut w = Writer::new( &h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, video_sample_entry_id, ); let f = MockFile::new(); h.dir.expect(MockDirAction::Create( CompositeId::new(1, 0), Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }), )); f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) }))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); w.write(&mut h.shutdown_rx, b"123", recording::Time(2), 0, true) .unwrap(); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); w.close(Some(1)).unwrap(); assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave assert_eq!(h.syncer.planned_flushes.len(), 1); assert!(h.syncer.iter(&h.syncer_rx)); // planned flush assert_eq!(h.syncer.planned_flushes.len(), 0); assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed f.ensure_done(); h.dir.ensure_done(); // Then a 1-byte recording. let f = MockFile::new(); h.dir.expect(MockDirAction::Create( CompositeId::new(1, 1), Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }), )); f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) }))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); w.write(&mut h.shutdown_rx, b"4", recording::Time(3), 1, true) .unwrap(); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); h.dir.expect(MockDirAction::Unlink( CompositeId::new(1, 0), Box::new({ let db = h.db.clone(); move |_| { // The drop(w) below should cause the old recording to be deleted (moved to // garbage). When the database is flushed, the syncer forces garbage collection // including this unlink. // Do another database flush here, as if from another syncer. db.lock().flush("another syncer running").unwrap(); Ok(()) } }), )); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); drop(w); trace!("expecting AsyncSave"); assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave assert_eq!(h.syncer.planned_flushes.len(), 1); trace!("expecting planned flush"); assert!(h.syncer.iter(&h.syncer_rx)); // planned flush assert_eq!(h.syncer.planned_flushes.len(), 0); trace!("expecting DatabaseFlushed"); assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed trace!("expecting DatabaseFlushed again"); assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed again f.ensure_done(); h.dir.ensure_done(); // Garbage should be marked collected on the next database flush. { let mut l = h.db.lock(); let dir = l.sample_file_dirs_by_id().get(&h.dir_id).unwrap(); assert!(dir.garbage_needs_unlink.is_empty()); assert!(!dir.garbage_unlinked.is_empty()); l.flush("forced gc").unwrap(); let dir = l.sample_file_dirs_by_id().get(&h.dir_id).unwrap(); assert!(dir.garbage_needs_unlink.is_empty()); assert!(dir.garbage_unlinked.is_empty()); } assert_eq!(h.syncer.planned_flushes.len(), 0); assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed // The syncer should shut down cleanly. drop(h.channel); h.db.lock().clear_on_flush(); assert_eq!( h.syncer_rx.try_recv().err(), Some(std::sync::mpsc::TryRecvError::Disconnected) ); assert!(h.syncer.planned_flushes.is_empty()); } #[test] fn write_path_retries() { testutil::init(); let mut h = new_harness(0); let video_sample_entry_id = h.db.lock() .insert_video_sample_entry(VideoSampleEntryToInsert { width: 1920, height: 1080, pasp_h_spacing: 1, pasp_v_spacing: 1, data: [0u8; 100].to_vec(), rfc6381_codec: "avc1.000000".to_owned(), }) .unwrap(); let mut w = Writer::new( &h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, video_sample_entry_id, ); h.dir.expect(MockDirAction::Create( CompositeId::new(1, 0), Box::new(|_id| Err(nix::Error::EIO)), )); let f = MockFile::new(); h.dir.expect(MockDirAction::Create( CompositeId::new(1, 0), Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }), )); f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"1234"); Err(eio()) }))); f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"1234"); Ok(1) }))); f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"234"); Err(eio()) }))); f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"234"); Ok(3) }))); f.expect(MockFileAction::SyncAll(Box::new(|| Err(eio())))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); w.write(&mut h.shutdown_rx, b"1234", recording::Time(1), 0, true) .unwrap(); h.dir .expect(MockDirAction::Sync(Box::new(|| Err(nix::Error::EIO)))); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); drop(w); assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave assert_eq!(h.syncer.planned_flushes.len(), 1); assert!(h.syncer.iter(&h.syncer_rx)); // planned flush assert_eq!(h.syncer.planned_flushes.len(), 0); assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed f.ensure_done(); h.dir.ensure_done(); { let l = h.db.lock(); let s = l.streams_by_id().get(&testutil::TEST_STREAM_ID).unwrap(); assert_eq!(s.bytes_to_add, 0); assert_eq!(s.sample_file_bytes, 4); } // The syncer should shut down cleanly. drop(h.channel); h.db.lock().clear_on_flush(); assert_eq!( h.syncer_rx.try_recv().err(), Some(std::sync::mpsc::TryRecvError::Disconnected) ); assert!(h.syncer.planned_flushes.is_empty()); } #[test] fn gc_path_retries() { testutil::init(); let mut h = new_harness(0); h.db.lock() .update_retention(&[db::RetentionChange { stream_id: testutil::TEST_STREAM_ID, new_record: true, new_limit: 0, }]) .unwrap(); // Setup: add a 3-byte recording. let video_sample_entry_id = h.db.lock() .insert_video_sample_entry(VideoSampleEntryToInsert { width: 1920, height: 1080, pasp_h_spacing: 1, pasp_v_spacing: 1, data: [0u8; 100].to_vec(), rfc6381_codec: "avc1.000000".to_owned(), }) .unwrap(); let mut w = Writer::new( &h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, video_sample_entry_id, ); let f = MockFile::new(); h.dir.expect(MockDirAction::Create( CompositeId::new(1, 0), Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }), )); f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) }))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); w.write(&mut h.shutdown_rx, b"123", recording::Time(2), 0, true) .unwrap(); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); w.close(Some(1)).unwrap(); assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave assert_eq!(h.syncer.planned_flushes.len(), 1); assert!(h.syncer.iter(&h.syncer_rx)); // planned flush assert_eq!(h.syncer.planned_flushes.len(), 0); assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed f.ensure_done(); h.dir.ensure_done(); // Then a 1-byte recording. let f = MockFile::new(); h.dir.expect(MockDirAction::Create( CompositeId::new(1, 1), Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }), )); f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) }))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); w.write(&mut h.shutdown_rx, b"4", recording::Time(3), 1, true) .unwrap(); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); h.dir.expect(MockDirAction::Unlink( CompositeId::new(1, 0), Box::new({ let db = h.db.clone(); move |_| { // The drop(w) below should cause the old recording to be deleted (moved to // garbage). When the database is flushed, the syncer forces garbage collection // including this unlink. // This should have already applied the changes to sample file bytes, even // though the garbage has yet to be collected. let l = db.lock(); let s = l.streams_by_id().get(&testutil::TEST_STREAM_ID).unwrap(); assert_eq!(s.bytes_to_delete, 0); assert_eq!(s.bytes_to_add, 0); assert_eq!(s.sample_file_bytes, 1); Err(nix::Error::EIO) // force a retry. } }), )); h.dir.expect(MockDirAction::Unlink( CompositeId::new(1, 0), Box::new(|_| Ok(())), )); h.dir .expect(MockDirAction::Sync(Box::new(|| Err(nix::Error::EIO)))); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); drop(w); assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave assert_eq!(h.syncer.planned_flushes.len(), 1); assert!(h.syncer.iter(&h.syncer_rx)); // planned flush assert_eq!(h.syncer.planned_flushes.len(), 0); assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed f.ensure_done(); h.dir.ensure_done(); // Garbage should be marked collected on the next flush. { let mut l = h.db.lock(); let dir = l.sample_file_dirs_by_id().get(&h.dir_id).unwrap(); assert!(dir.garbage_needs_unlink.is_empty()); assert!(!dir.garbage_unlinked.is_empty()); l.flush("forced gc").unwrap(); let dir = l.sample_file_dirs_by_id().get(&h.dir_id).unwrap(); assert!(dir.garbage_needs_unlink.is_empty()); assert!(dir.garbage_unlinked.is_empty()); } assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed // The syncer should shut down cleanly. drop(h.channel); h.db.lock().clear_on_flush(); assert_eq!( h.syncer_rx.try_recv().err(), Some(std::sync::mpsc::TryRecvError::Disconnected) ); assert!(h.syncer.planned_flushes.is_empty()); } #[test] fn planned_flush() { testutil::init(); let mut h = new_harness(60); // flush_if_sec=60 // There's a database constraint forbidding a recording starting at t=0, so advance. h.db.clocks().sleep(time::Duration::seconds(1)); // Setup: add a 3-byte recording. let video_sample_entry_id = h.db.lock() .insert_video_sample_entry(VideoSampleEntryToInsert { width: 1920, height: 1080, pasp_h_spacing: 1, pasp_v_spacing: 1, data: [0u8; 100].to_vec(), rfc6381_codec: "avc1.000000".to_owned(), }) .unwrap(); let mut w = Writer::new( &h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, video_sample_entry_id, ); let f1 = MockFile::new(); h.dir.expect(MockDirAction::Create( CompositeId::new(1, 0), Box::new({ let f = f1.clone(); move |_id| Ok(f.clone()) }), )); f1.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) }))); f1.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); w.write( &mut h.shutdown_rx, b"123", recording::Time(recording::TIME_UNITS_PER_SEC), 0, true, ) .unwrap(); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); drop(w); assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave assert_eq!(h.syncer.planned_flushes.len(), 1); // Flush and let 30 seconds go by. h.db.lock().flush("forced").unwrap(); assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed assert_eq!(h.syncer.planned_flushes.len(), 1); h.db.clocks().sleep(time::Duration::seconds(30)); // Then, a 1-byte recording. let mut w = Writer::new( &h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, video_sample_entry_id, ); let f2 = MockFile::new(); h.dir.expect(MockDirAction::Create( CompositeId::new(1, 1), Box::new({ let f = f2.clone(); move |_id| Ok(f.clone()) }), )); f2.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) }))); f2.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); w.write( &mut h.shutdown_rx, b"4", recording::Time(31 * recording::TIME_UNITS_PER_SEC), 1, true, ) .unwrap(); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); drop(w); assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave assert_eq!(h.syncer.planned_flushes.len(), 2); assert_eq!(h.syncer.planned_flushes.len(), 2); let db_flush_count_before = h.db.lock().flushes(); assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(31, 0)); assert!(h.syncer.iter(&h.syncer_rx)); // planned flush (no-op) assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(61, 0)); assert_eq!(h.db.lock().flushes(), db_flush_count_before); assert_eq!(h.syncer.planned_flushes.len(), 1); assert!(h.syncer.iter(&h.syncer_rx)); // planned flush assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(91, 0)); assert_eq!(h.db.lock().flushes(), db_flush_count_before + 1); assert_eq!(h.syncer.planned_flushes.len(), 0); assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed f1.ensure_done(); f2.ensure_done(); h.dir.ensure_done(); // The syncer should shut down cleanly. drop(h.channel); h.db.lock().clear_on_flush(); assert_eq!( h.syncer_rx.try_recv().err(), Some(std::sync::mpsc::TryRecvError::Disconnected) ); assert!(h.syncer.planned_flushes.is_empty()); } }