1020 lines
39 KiB
Rust
1020 lines
39 KiB
Rust
// This file is part of Moonfire NVR, a security camera network video recorder.
|
|
// Copyright (C) 2018 Scott Lamb <slamb@slamb.org>
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// In addition, as a special exception, the copyright holders give
|
|
// permission to link the code of portions of this program with the
|
|
// OpenSSL library under certain conditions as described in each
|
|
// individual source file, and distribute linked combinations including
|
|
// the two.
|
|
//
|
|
// You must obey the GNU General Public License in all respects for all
|
|
// of the code used other than OpenSSL. If you modify file(s) with this
|
|
// exception, you may extend this exception to your version of the
|
|
// file(s), but you are not obligated to do so. If you do not wish to do
|
|
// so, delete this exception statement from your version. If you delete
|
|
// this exception statement from all source files in the program, then
|
|
// also delete it here.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
//! Sample file directory management.
|
|
//!
|
|
//! This includes opening files for serving, rotating away old files, and saving new files.
|
|
|
|
use base::clock::{self, Clocks};
|
|
use db::{self, CompositeId};
|
|
use dir;
|
|
use failure::Error;
|
|
use fnv::FnvHashMap;
|
|
use parking_lot::Mutex;
|
|
use recording;
|
|
use openssl::hash;
|
|
use std::cmp;
|
|
use std::io;
|
|
use std::mem;
|
|
use std::os::unix::ffi::OsStrExt;
|
|
use std::sync::Arc;
|
|
use std::sync::mpsc;
|
|
use std::thread;
|
|
|
|
pub trait DirWriter : 'static + Send {
|
|
type File : FileWriter;
|
|
|
|
fn create_file(&self, id: CompositeId) -> Result<Self::File, io::Error>;
|
|
fn sync(&self) -> Result<(), io::Error>;
|
|
fn unlink_file(&self, id: CompositeId) -> Result<(), io::Error>;
|
|
}
|
|
|
|
pub trait FileWriter : 'static {
|
|
/// As in `std::fs::File::sync_all`.
|
|
fn sync_all(&self) -> Result<(), io::Error>;
|
|
|
|
/// As in `std::io::Writer::write`.
|
|
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error>;
|
|
}
|
|
|
|
impl DirWriter for Arc<dir::SampleFileDir> {
|
|
type File = ::std::fs::File;
|
|
|
|
fn create_file(&self, id: CompositeId) -> Result<Self::File, io::Error> {
|
|
dir::SampleFileDir::create_file(self, id)
|
|
}
|
|
fn sync(&self) -> Result<(), io::Error> { dir::SampleFileDir::sync(self) }
|
|
fn unlink_file(&self, id: CompositeId) -> Result<(), io::Error> {
|
|
dir::SampleFileDir::unlink_file(self, id)
|
|
}
|
|
}
|
|
|
|
impl FileWriter for ::std::fs::File {
|
|
fn sync_all(&self) -> Result<(), io::Error> { self.sync_all() }
|
|
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> { io::Write::write(self, buf) }
|
|
}
|
|
|
|
/// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct.
|
|
enum SyncerCommand<F> {
|
|
AsyncSaveRecording(CompositeId, F),
|
|
DatabaseFlushed,
|
|
Flush(mpsc::SyncSender<()>),
|
|
}
|
|
|
|
/// A channel which can be used to send commands to the syncer.
|
|
/// Can be cloned to allow multiple threads to send commands.
|
|
pub struct SyncerChannel<F>(mpsc::Sender<SyncerCommand<F>>);
|
|
|
|
impl<F> ::std::clone::Clone for SyncerChannel<F> {
|
|
fn clone(&self) -> Self { SyncerChannel(self.0.clone()) }
|
|
}
|
|
|
|
/// State of the worker thread.
|
|
struct Syncer<C: Clocks, D: DirWriter> {
|
|
clocks: C,
|
|
dir_id: i32,
|
|
dir: D,
|
|
db: Arc<db::Database>,
|
|
}
|
|
|
|
/// Starts a syncer for the given sample file directory.
|
|
///
|
|
/// The lock must not be held on `db` when this is called.
|
|
///
|
|
/// There should be only one syncer per directory, or 0 if operating in read-only mode.
|
|
/// This function will perform the initial rotation synchronously, so that it is finished before
|
|
/// file writing starts. Afterward the syncing happens in a background thread.
|
|
///
|
|
/// Returns a `SyncerChannel` which can be used to send commands (and can be cloned freely) and
|
|
/// a `JoinHandle` for the syncer thread. Commands sent on the channel will be executed or retried
|
|
/// forever. (TODO: provide some manner of pushback during retry.) At program shutdown, all
|
|
/// `SyncerChannel` clones should be dropped and then the handle joined to allow all recordings to
|
|
/// be persisted.
|
|
///
|
|
/// Note that dropping all `SyncerChannel` clones currently includes calling
|
|
/// `LockedDatabase::clear_on_flush`, as this function installs a hook to watch database flushes.
|
|
/// TODO: add a join wrapper which arranges for the on flush hook to be removed automatically.
|
|
pub fn start_syncer(db: Arc<db::Database>, dir_id: i32)
|
|
-> Result<(SyncerChannel<::std::fs::File>, thread::JoinHandle<()>), Error> {
|
|
let db2 = db.clone();
|
|
let (mut syncer, path) = Syncer::new(&db.lock(), db2, dir_id)?;
|
|
syncer.initial_rotation()?;
|
|
let (snd, rcv) = mpsc::channel();
|
|
db.lock().on_flush(Box::new({
|
|
let snd = snd.clone();
|
|
move || if let Err(e) = snd.send(SyncerCommand::DatabaseFlushed) {
|
|
warn!("Unable to notify syncer for dir {} of flush: {}", dir_id, e);
|
|
}
|
|
}));
|
|
Ok((SyncerChannel(snd),
|
|
thread::Builder::new()
|
|
.name(format!("sync-{}", path))
|
|
.spawn(move || syncer.run(rcv)).unwrap()))
|
|
}
|
|
|
|
pub struct NewLimit {
|
|
pub stream_id: i32,
|
|
pub limit: i64,
|
|
}
|
|
|
|
/// Deletes recordings if necessary to fit within the given new `retain_bytes` limit.
|
|
/// Note this doesn't change the limit in the database; it only deletes files.
|
|
/// Pass a limit of 0 to delete all recordings associated with a camera.
|
|
pub fn lower_retention(db: Arc<db::Database>, dir_id: i32, limits: &[NewLimit])
|
|
-> Result<(), Error> {
|
|
let db2 = db.clone();
|
|
let (mut syncer, _) = Syncer::new(&db.lock(), db2, dir_id)?;
|
|
syncer.do_rotation(|db| {
|
|
for l in limits {
|
|
let (bytes_before, extra);
|
|
{
|
|
let stream = db.streams_by_id().get(&l.stream_id)
|
|
.ok_or_else(|| format_err!("no such stream {}", l.stream_id))?;
|
|
bytes_before = stream.sample_file_bytes + stream.bytes_to_add -
|
|
stream.bytes_to_delete;
|
|
extra = stream.retain_bytes - l.limit;
|
|
}
|
|
if l.limit >= bytes_before { continue }
|
|
delete_recordings(db, l.stream_id, extra)?;
|
|
let stream = db.streams_by_id().get(&l.stream_id).unwrap();
|
|
info!("stream {}, deleting: {}->{}", l.stream_id, bytes_before,
|
|
stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete);
|
|
}
|
|
Ok(())
|
|
})
|
|
}
|
|
|
|
/// Deletes recordings to bring a stream's disk usage within bounds.
|
|
fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32,
|
|
extra_bytes_needed: i64) -> Result<(), Error> {
|
|
let bytes_needed = {
|
|
let stream = match db.streams_by_id().get(&stream_id) {
|
|
None => bail!("no stream {}", stream_id),
|
|
Some(s) => s,
|
|
};
|
|
stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete + extra_bytes_needed
|
|
- stream.retain_bytes
|
|
};
|
|
let mut bytes_to_delete = 0;
|
|
if bytes_needed <= 0 {
|
|
debug!("{}: have remaining quota of {}", stream_id, -bytes_needed);
|
|
return Ok(());
|
|
}
|
|
let mut n = 0;
|
|
db.delete_oldest_recordings(stream_id, &mut |row| {
|
|
if bytes_needed >= bytes_to_delete {
|
|
bytes_to_delete += row.sample_file_bytes as i64;
|
|
n += 1;
|
|
return true;
|
|
}
|
|
false
|
|
})?;
|
|
info!("{}: deleting {} bytes in {} recordings ({} bytes needed)",
|
|
stream_id, bytes_to_delete, n, bytes_needed);
|
|
Ok(())
|
|
}
|
|
|
|
impl<F: FileWriter> SyncerChannel<F> {
|
|
/// Asynchronously syncs the given writer, closes it, records it into the database, and
|
|
/// starts rotation.
|
|
fn async_save_recording(&self, id: CompositeId, f: F) {
|
|
self.0.send(SyncerCommand::AsyncSaveRecording(id, f)).unwrap();
|
|
}
|
|
|
|
/// For testing: flushes the syncer, waiting for all currently-queued commands to complete.
|
|
pub fn flush(&self) {
|
|
let (snd, rcv) = mpsc::sync_channel(0);
|
|
self.0.send(SyncerCommand::Flush(snd)).unwrap();
|
|
rcv.recv().unwrap_err(); // syncer should just drop the channel, closing it.
|
|
}
|
|
}
|
|
|
|
impl Syncer<clock::RealClocks, Arc<dir::SampleFileDir>> {
|
|
fn new(l: &db::LockedDatabase, db: Arc<db::Database>, dir_id: i32)
|
|
-> Result<(Self, String), Error> {
|
|
let d = l.sample_file_dirs_by_id()
|
|
.get(&dir_id)
|
|
.ok_or_else(|| format_err!("no dir {}", dir_id))?;
|
|
let dir = d.get()?;
|
|
|
|
// Abandon files.
|
|
// First, get a list of the streams in question.
|
|
let streams_to_next: FnvHashMap<_, _> =
|
|
l.streams_by_id()
|
|
.iter()
|
|
.filter_map(|(&k, v)| {
|
|
if v.sample_file_dir_id == Some(dir_id) {
|
|
Some((k, v.next_recording_id))
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.collect();
|
|
let to_abandon = Syncer::list_files_to_abandon(&d.path, streams_to_next)?;
|
|
let mut undeletable = 0;
|
|
for &id in &to_abandon {
|
|
if let Err(e) = dir.unlink_file(id) {
|
|
if e.kind() == io::ErrorKind::NotFound {
|
|
warn!("dir: abandoned recording {} already deleted!", id);
|
|
} else {
|
|
warn!("dir: Unable to unlink abandoned recording {}: {}", id, e);
|
|
undeletable += 1;
|
|
}
|
|
}
|
|
}
|
|
if undeletable > 0 {
|
|
bail!("Unable to delete {} abandoned recordings.", undeletable);
|
|
}
|
|
|
|
Ok((Syncer {
|
|
clocks: clock::RealClocks{},
|
|
dir_id,
|
|
dir,
|
|
db,
|
|
}, d.path.clone()))
|
|
}
|
|
|
|
/// Lists files which should be "abandoned" (deleted without ever recording in the database)
|
|
/// on opening.
|
|
fn list_files_to_abandon(path: &str, streams_to_next: FnvHashMap<i32, i32>)
|
|
-> Result<Vec<CompositeId>, Error> {
|
|
let mut v = Vec::new();
|
|
for e in ::std::fs::read_dir(path)? {
|
|
let e = e?;
|
|
let id = match dir::parse_id(e.file_name().as_bytes()) {
|
|
Ok(i) => i,
|
|
Err(_) => continue,
|
|
};
|
|
let next = match streams_to_next.get(&id.stream()) {
|
|
Some(n) => *n,
|
|
None => continue, // unknown stream.
|
|
};
|
|
if id.recording() >= next {
|
|
v.push(id);
|
|
}
|
|
}
|
|
Ok(v)
|
|
}
|
|
|
|
/// Rotates files for all streams and deletes stale files from previous runs.
|
|
/// Called from main thread.
|
|
fn initial_rotation(&mut self) -> Result<(), Error> {
|
|
self.do_rotation(|db| {
|
|
let streams: Vec<i32> = db.streams_by_id().keys().map(|&id| id).collect();
|
|
for &stream_id in &streams {
|
|
delete_recordings(db, stream_id, 0)?;
|
|
}
|
|
Ok(())
|
|
})
|
|
}
|
|
|
|
/// Helper to do initial or retention-lowering rotation. Called from main thread.
|
|
fn do_rotation<F>(&mut self, delete_recordings: F) -> Result<(), Error>
|
|
where F: Fn(&mut db::LockedDatabase) -> Result<(), Error> {
|
|
{
|
|
let mut db = self.db.lock();
|
|
delete_recordings(&mut *db)?;
|
|
db.flush("synchronous deletion")?;
|
|
}
|
|
let mut garbage: Vec<_> = {
|
|
let l = self.db.lock();
|
|
let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap();
|
|
d.garbage.iter().map(|id| *id).collect()
|
|
};
|
|
if !garbage.is_empty() {
|
|
// Try to delete files; retain ones in `garbage` that don't exist.
|
|
let mut errors = 0;
|
|
for &id in &garbage {
|
|
if let Err(e) = self.dir.unlink_file(id) {
|
|
if e.kind() != io::ErrorKind::NotFound {
|
|
warn!("dir: Unable to unlink {}: {}", id, e);
|
|
errors += 1;
|
|
}
|
|
}
|
|
}
|
|
if errors > 0 {
|
|
bail!("Unable to unlink {} files (see earlier warning messages for details)",
|
|
errors);
|
|
}
|
|
self.dir.sync()?;
|
|
self.db.lock().delete_garbage(self.dir_id, &mut garbage)?;
|
|
self.db.lock().flush("synchronous garbage collection")?;
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl<C: Clocks, D: DirWriter> Syncer<C, D> {
|
|
fn run(&mut self, cmds: mpsc::Receiver<SyncerCommand<D::File>>) {
|
|
loop {
|
|
match cmds.recv() {
|
|
Err(_) => return, // all senders have closed the channel; shutdown
|
|
Ok(SyncerCommand::AsyncSaveRecording(id, f)) => self.save(id, f),
|
|
Ok(SyncerCommand::DatabaseFlushed) => self.collect_garbage(),
|
|
Ok(SyncerCommand::Flush(_)) => {}, // just drop the supplied sender, closing it.
|
|
};
|
|
}
|
|
}
|
|
|
|
/// Collects garbage (without forcing a sync). Called from worker thread.
|
|
fn collect_garbage(&mut self) {
|
|
let mut garbage: Vec<_> = {
|
|
let l = self.db.lock();
|
|
let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap();
|
|
d.garbage.iter().map(|id| *id).collect()
|
|
};
|
|
if garbage.is_empty() {
|
|
return;
|
|
}
|
|
let c = &self.clocks;
|
|
for &id in &garbage {
|
|
c.retry_forever(&mut || {
|
|
if let Err(e) = self.dir.unlink_file(id) {
|
|
if e.kind() == io::ErrorKind::NotFound {
|
|
warn!("dir: recording {} already deleted!", id);
|
|
return Ok(());
|
|
}
|
|
return Err(e);
|
|
}
|
|
Ok(())
|
|
});
|
|
}
|
|
c.retry_forever(&mut || self.dir.sync());
|
|
c.retry_forever(&mut || self.db.lock().delete_garbage(self.dir_id, &mut garbage));
|
|
}
|
|
|
|
/// Saves the given recording and causes rotation to happen. Called from worker thread.
|
|
///
|
|
/// Note that part of rotation is deferred for the next cycle (saved writing or program startup)
|
|
/// so that there can be only one dir sync and database transaction per save.
|
|
/// Internal helper for `save`. This is separated out so that the question-mark operator
|
|
/// can be used in the many error paths.
|
|
fn save(&mut self, id: CompositeId, f: D::File) {
|
|
let stream_id = id.stream();
|
|
|
|
// Free up a like number of bytes.
|
|
self.clocks.retry_forever(&mut || f.sync_all());
|
|
self.clocks.retry_forever(&mut || self.dir.sync());
|
|
let mut db = self.db.lock();
|
|
db.mark_synced(id).unwrap();
|
|
delete_recordings(&mut db, stream_id, 0).unwrap();
|
|
let reason = {
|
|
let s = db.streams_by_id().get(&stream_id).unwrap();
|
|
let c = db.cameras_by_id().get(&s.camera_id).unwrap();
|
|
let unflushed = s.unflushed();
|
|
if unflushed < s.flush_if {
|
|
debug!("{}-{}: unflushed={} < if={}, not flushing",
|
|
c.short_name, s.type_.as_str(), unflushed, s.flush_if);
|
|
return;
|
|
}
|
|
format!("{}-{}: unflushed={} >= if={}",
|
|
c.short_name, s.type_.as_str(), unflushed, s.flush_if)
|
|
};
|
|
|
|
if let Err(e) = db.flush(&reason) {
|
|
// Don't retry the commit now in case it causes extra flash write cycles.
|
|
// It's not necessary for correctness to flush before proceeding.
|
|
// Just wait until the next flush would happen naturally.
|
|
warn!("flush failure on save for reason {}; leaving unflushed for now: {:?}",
|
|
reason, e);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Struct for writing a single run (of potentially several recordings) to disk and committing its
|
|
/// metadata to the database. `Writer` hands off each recording's state to the syncer when done. It
|
|
/// saves the recording to the database (if I/O errors do not prevent this), retries forever,
|
|
/// or panics (if further writing on this stream is impossible).
|
|
pub struct Writer<'a, C: Clocks, D: DirWriter> {
|
|
clocks: &'a C,
|
|
dir: &'a D,
|
|
db: &'a db::Database,
|
|
channel: &'a SyncerChannel<D::File>,
|
|
stream_id: i32,
|
|
video_sample_entry_id: i32,
|
|
state: WriterState<D::File>,
|
|
}
|
|
|
|
enum WriterState<F: FileWriter> {
|
|
Unopened,
|
|
Open(InnerWriter<F>),
|
|
Closed(PreviousWriter),
|
|
}
|
|
|
|
/// State for writing a single recording, used within `Writer`.
|
|
///
|
|
/// Note that the recording created by every `InnerWriter` must be written to the `SyncerChannel`
|
|
/// with at least one sample. The sample may have zero duration.
|
|
struct InnerWriter<F: FileWriter> {
|
|
f: F,
|
|
r: Arc<Mutex<db::RecordingToInsert>>,
|
|
e: recording::SampleIndexEncoder,
|
|
id: CompositeId,
|
|
hasher: hash::Hasher,
|
|
|
|
/// The start time of this segment, based solely on examining the local clock after frames in
|
|
/// this segment were received. Frames can suffer from various kinds of delay (initial
|
|
/// buffering, encoding, and network transmission), so this time is set to far in the future on
|
|
/// construction, given a real value on the first packet, and decreased as less-delayed packets
|
|
/// are discovered. See design/time.md for details.
|
|
local_start: recording::Time,
|
|
|
|
adjuster: ClockAdjuster,
|
|
|
|
/// A sample which has been written to disk but not added to `index`. Index writes are one
|
|
/// sample behind disk writes because the duration of a sample is the difference between its
|
|
/// pts and the next sample's pts. A sample is flushed when the next sample is written, when
|
|
/// the writer is closed cleanly (the caller supplies the next pts), or when the writer is
|
|
/// closed uncleanly (with a zero duration, which the `.mp4` format allows only at the end).
|
|
///
|
|
/// Invariant: this should always be `Some` (briefly violated during `write` call only).
|
|
unflushed_sample: Option<UnflushedSample>,
|
|
}
|
|
|
|
/// Adjusts durations given by the camera to correct its clock frequency error.
|
|
#[derive(Copy, Clone, Debug)]
|
|
struct ClockAdjuster {
|
|
/// Every `every_minus_1 + 1` units, add `-ndir`.
|
|
/// Note i32::max_value() disables adjustment.
|
|
every_minus_1: i32,
|
|
|
|
/// Should be 1 or -1 (unless disabled).
|
|
ndir: i32,
|
|
|
|
/// Keeps accumulated difference from previous values.
|
|
cur: i32,
|
|
}
|
|
|
|
impl ClockAdjuster {
|
|
fn new(local_time_delta: Option<i64>) -> Self {
|
|
// Pick an adjustment rate to correct local_time_delta over the next minute (the
|
|
// desired duration of a single recording). Cap the rate at 500 ppm (which corrects
|
|
// 2,700/90,000ths of a second over a minute) to prevent noticeably speeding up or slowing
|
|
// down playback.
|
|
let (every_minus_1, ndir) = match local_time_delta {
|
|
Some(d) if d <= -2700 => (1999, 1),
|
|
Some(d) if d >= 2700 => (1999, -1),
|
|
Some(d) if d < -60 => ((60 * 90000) / -(d as i32) - 1, 1),
|
|
Some(d) if d > 60 => ((60 * 90000) / (d as i32) - 1, -1),
|
|
_ => (i32::max_value(), 0),
|
|
};
|
|
ClockAdjuster{
|
|
every_minus_1,
|
|
ndir,
|
|
cur: 0,
|
|
}
|
|
}
|
|
|
|
fn adjust(&mut self, mut val: i32) -> i32 {
|
|
self.cur += val;
|
|
|
|
// The "val > self.ndir" here is so that if decreasing durations (ndir == 1), we don't
|
|
// cause a duration of 1 to become a duration of 0. It has no effect when increasing
|
|
// durations. (There's no danger of a duration of 0 becoming a duration of 1; cur wouldn't
|
|
// be newly > self.every_minus_1.)
|
|
while self.cur > self.every_minus_1 && val > self.ndir {
|
|
val -= self.ndir;
|
|
self.cur -= self.every_minus_1 + 1;
|
|
}
|
|
val
|
|
}
|
|
}
|
|
|
|
#[derive(Copy, Clone)]
|
|
struct UnflushedSample {
|
|
local_time: recording::Time,
|
|
pts_90k: i64,
|
|
len: i32,
|
|
is_key: bool,
|
|
}
|
|
|
|
/// State associated with a run's previous recording; used within `Writer`.
|
|
#[derive(Copy, Clone)]
|
|
struct PreviousWriter {
|
|
end: recording::Time,
|
|
local_time_delta: recording::Duration,
|
|
run_offset: i32,
|
|
}
|
|
|
|
impl<'a, C: Clocks, D: DirWriter> Writer<'a, C, D> {
|
|
pub fn new(clocks: &'a C, dir: &'a D, db: &'a db::Database, channel: &'a SyncerChannel<D::File>,
|
|
stream_id: i32, video_sample_entry_id: i32) -> Self {
|
|
Writer {
|
|
clocks,
|
|
dir,
|
|
db,
|
|
channel,
|
|
stream_id,
|
|
video_sample_entry_id,
|
|
state: WriterState::Unopened,
|
|
}
|
|
}
|
|
|
|
/// Opens a new writer.
|
|
/// This returns a writer that violates the invariant that `unflushed_sample` is `Some`.
|
|
/// The caller (`write`) is responsible for correcting this.
|
|
fn open(&mut self) -> Result<&mut InnerWriter<D::File>, Error> {
|
|
let prev = match self.state {
|
|
WriterState::Unopened => None,
|
|
WriterState::Open(ref mut w) => return Ok(w),
|
|
WriterState::Closed(prev) => Some(prev),
|
|
};
|
|
let (id, r) = self.db.lock().add_recording(self.stream_id, db::RecordingToInsert {
|
|
run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0),
|
|
start: prev.map(|p| p.end).unwrap_or(recording::Time(i64::max_value())),
|
|
video_sample_entry_id: self.video_sample_entry_id,
|
|
flags: db::RecordingFlags::Growing as i32,
|
|
..Default::default()
|
|
})?;
|
|
let f = self.clocks.retry_forever(&mut || self.dir.create_file(id));
|
|
|
|
self.state = WriterState::Open(InnerWriter {
|
|
f,
|
|
r,
|
|
e: recording::SampleIndexEncoder::new(),
|
|
id,
|
|
hasher: hash::Hasher::new(hash::MessageDigest::sha1())?,
|
|
local_start: recording::Time(i64::max_value()),
|
|
adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)),
|
|
unflushed_sample: None,
|
|
});
|
|
match self.state {
|
|
WriterState::Open(ref mut w) => Ok(w),
|
|
_ => unreachable!(),
|
|
}
|
|
}
|
|
|
|
pub fn previously_opened(&self) -> Result<bool, Error> {
|
|
Ok(match self.state {
|
|
WriterState::Unopened => false,
|
|
WriterState::Closed(_) => true,
|
|
WriterState::Open(_) => bail!("open!"),
|
|
})
|
|
}
|
|
|
|
/// Writes a new frame to this segment.
|
|
/// `local_time` should be the local clock's time as of when this packet was received.
|
|
pub fn write(&mut self, pkt: &[u8], local_time: recording::Time, pts_90k: i64,
|
|
is_key: bool) -> Result<(), Error> {
|
|
let w = self.open()?;
|
|
|
|
// Note w's invariant that `unflushed_sample` is `None` may currently be violated.
|
|
// We must restore it on all success or error paths.
|
|
|
|
if let Some(unflushed) = w.unflushed_sample.take() {
|
|
let duration = (pts_90k - unflushed.pts_90k) as i32;
|
|
if duration <= 0 {
|
|
// Restore invariant.
|
|
w.unflushed_sample = Some(unflushed);
|
|
bail!("pts not monotonically increasing; got {} then {}",
|
|
unflushed.pts_90k, pts_90k);
|
|
}
|
|
let duration = w.adjuster.adjust(duration);
|
|
w.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time);
|
|
}
|
|
let mut remaining = pkt;
|
|
while !remaining.is_empty() {
|
|
let written = self.clocks.retry_forever(&mut || w.f.write(remaining));
|
|
remaining = &remaining[written..];
|
|
}
|
|
w.unflushed_sample = Some(UnflushedSample {
|
|
local_time,
|
|
pts_90k,
|
|
len: pkt.len() as i32,
|
|
is_key,
|
|
});
|
|
w.hasher.update(pkt).unwrap();
|
|
Ok(())
|
|
}
|
|
|
|
/// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's
|
|
/// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait,
|
|
/// swallowing errors and using a zero duration for the last sample.
|
|
pub fn close(&mut self, next_pts: Option<i64>) {
|
|
self.state = match mem::replace(&mut self.state, WriterState::Unopened) {
|
|
WriterState::Open(w) => {
|
|
let prev = w.close(self.channel, next_pts);
|
|
WriterState::Closed(prev)
|
|
},
|
|
s => s,
|
|
};
|
|
}
|
|
}
|
|
|
|
impl<F: FileWriter> InnerWriter<F> {
|
|
fn add_sample(&mut self, duration_90k: i32, bytes: i32, is_key: bool,
|
|
pkt_local_time: recording::Time) {
|
|
let mut l = self.r.lock();
|
|
self.e.add_sample(duration_90k, bytes, is_key, &mut l);
|
|
let new = pkt_local_time - recording::Duration(l.duration_90k as i64);
|
|
self.local_start = cmp::min(self.local_start, new);
|
|
if l.run_offset == 0 { // start time isn't anchored to previous recording's end; adjust.
|
|
l.start = self.local_start;
|
|
}
|
|
}
|
|
|
|
fn close(mut self, channel: &SyncerChannel<F>, next_pts: Option<i64>) -> PreviousWriter {
|
|
let unflushed = self.unflushed_sample.take().expect("should always be an unflushed sample");
|
|
let (duration, flags) = match next_pts {
|
|
None => (self.adjuster.adjust(0), db::RecordingFlags::TrailingZero as i32),
|
|
Some(p) => (self.adjuster.adjust((p - unflushed.pts_90k) as i32), 0),
|
|
};
|
|
let mut sha1_bytes = [0u8; 20];
|
|
sha1_bytes.copy_from_slice(&self.hasher.finish().unwrap()[..]);
|
|
let (local_time_delta, run_offset, end);
|
|
self.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time);
|
|
{
|
|
let mut l = self.r.lock();
|
|
l.flags = flags;
|
|
local_time_delta = self.local_start - l.start;
|
|
l.local_time_delta = local_time_delta;
|
|
l.sample_file_sha1 = sha1_bytes;
|
|
run_offset = l.run_offset;
|
|
end = l.start + recording::Duration(l.duration_90k as i64);
|
|
}
|
|
drop(self.r);
|
|
channel.async_save_recording(self.id, self.f);
|
|
PreviousWriter {
|
|
end,
|
|
local_time_delta,
|
|
run_offset,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<'a, C: Clocks, D: DirWriter> Drop for Writer<'a, C, D> {
|
|
fn drop(&mut self) {
|
|
if ::std::thread::panicking() {
|
|
// This will probably panic again. Don't do it.
|
|
return;
|
|
}
|
|
if let WriterState::Open(w) = mem::replace(&mut self.state, WriterState::Unopened) {
|
|
// Swallow any error. The caller should only drop the Writer without calling close()
|
|
// if there's already been an error. The caller should report that. No point in
|
|
// complaining again.
|
|
let _ = w.close(self.channel, None);
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use base::clock::SimulatedClocks;
|
|
use db::{self, CompositeId};
|
|
use parking_lot::Mutex;
|
|
use recording;
|
|
use std::collections::VecDeque;
|
|
use std::io;
|
|
use std::sync::Arc;
|
|
use std::sync::mpsc;
|
|
use super::{ClockAdjuster, Writer};
|
|
use testutil;
|
|
|
|
#[derive(Clone)]
|
|
struct MockDir(Arc<Mutex<VecDeque<MockDirAction>>>);
|
|
|
|
enum MockDirAction {
|
|
Create(CompositeId, Box<Fn(CompositeId) -> Result<MockFile, io::Error> + Send>),
|
|
Sync(Box<Fn() -> Result<(), io::Error> + Send>),
|
|
Unlink(CompositeId, Box<Fn(CompositeId) -> Result<(), io::Error> + Send>),
|
|
}
|
|
|
|
impl MockDir {
|
|
fn new() -> Self { MockDir(Arc::new(Mutex::new(VecDeque::new()))) }
|
|
fn expect(&self, action: MockDirAction) { self.0.lock().push_back(action); }
|
|
fn ensure_done(&self) { assert_eq!(self.0.lock().len(), 0); }
|
|
}
|
|
|
|
impl super::DirWriter for MockDir {
|
|
type File = MockFile;
|
|
|
|
fn create_file(&self, id: CompositeId) -> Result<Self::File, io::Error> {
|
|
match self.0.lock().pop_front().expect("got create_file with no expectation") {
|
|
MockDirAction::Create(expected_id, ref f) => {
|
|
assert_eq!(id, expected_id);
|
|
f(id)
|
|
},
|
|
_ => panic!("got create_file({}), expected something else", id),
|
|
}
|
|
}
|
|
fn sync(&self) -> Result<(), io::Error> {
|
|
match self.0.lock().pop_front().expect("got sync with no expectation") {
|
|
MockDirAction::Sync(f) => f(),
|
|
_ => panic!("got sync, expected something else"),
|
|
}
|
|
}
|
|
fn unlink_file(&self, id: CompositeId) -> Result<(), io::Error> {
|
|
match self.0.lock().pop_front().expect("got unlink_file with no expectation") {
|
|
MockDirAction::Unlink(expected_id, f) => {
|
|
assert_eq!(id, expected_id);
|
|
f(id)
|
|
},
|
|
_ => panic!("got unlink({}), expected something else", id),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for MockDir {
|
|
fn drop(&mut self) {
|
|
if !::std::thread::panicking() {
|
|
assert_eq!(self.0.lock().len(), 0);
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct MockFile(Arc<Mutex<VecDeque<MockFileAction>>>);
|
|
|
|
enum MockFileAction {
|
|
SyncAll(Box<Fn() -> Result<(), io::Error> + Send>),
|
|
Write(Box<Fn(&[u8]) -> Result<usize, io::Error> + Send>),
|
|
}
|
|
|
|
impl MockFile {
|
|
fn new() -> Self { MockFile(Arc::new(Mutex::new(VecDeque::new()))) }
|
|
fn expect(&self, action: MockFileAction) { self.0.lock().push_back(action); }
|
|
fn ensure_done(&self) { assert_eq!(self.0.lock().len(), 0); }
|
|
}
|
|
|
|
impl super::FileWriter for MockFile {
|
|
fn sync_all(&self) -> Result<(), io::Error> {
|
|
match self.0.lock().pop_front().expect("got sync_all with no expectation") {
|
|
MockFileAction::SyncAll(f) => f(),
|
|
_ => panic!("got sync_all, expected something else"),
|
|
}
|
|
}
|
|
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
|
|
match self.0.lock().pop_front().expect("got write with no expectation") {
|
|
MockFileAction::Write(f) => f(buf),
|
|
_ => panic!("got write({:?}), expected something else", buf),
|
|
}
|
|
}
|
|
}
|
|
|
|
struct Harness {
|
|
clocks: SimulatedClocks,
|
|
db: Arc<db::Database>,
|
|
dir_id: i32,
|
|
_tmpdir: ::tempdir::TempDir,
|
|
dir: MockDir,
|
|
channel: super::SyncerChannel<MockFile>,
|
|
join: ::std::thread::JoinHandle<()>,
|
|
}
|
|
|
|
fn new_harness() -> Harness {
|
|
let tdb = testutil::TestDb::new();
|
|
let dir_id = *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap();
|
|
|
|
// This starts a real fs-backed syncer. Get rid of it.
|
|
tdb.db.lock().clear_on_flush();
|
|
drop(tdb.syncer_channel);
|
|
tdb.syncer_join.join().unwrap();
|
|
|
|
// Start a mocker syncer.
|
|
let clocks = SimulatedClocks::new(::time::Timespec::new(0, 0));
|
|
let dir = MockDir::new();
|
|
let mut syncer = super::Syncer {
|
|
clocks: clocks.clone(),
|
|
dir_id: *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap(),
|
|
dir: dir.clone(),
|
|
db: tdb.db.clone(),
|
|
};
|
|
let (snd, rcv) = mpsc::channel();
|
|
tdb.db.lock().on_flush(Box::new({
|
|
let snd = snd.clone();
|
|
move || if let Err(e) = snd.send(super::SyncerCommand::DatabaseFlushed) {
|
|
warn!("Unable to notify syncer for dir {} of flush: {}", dir_id, e);
|
|
}
|
|
}));
|
|
let join = ::std::thread::Builder::new()
|
|
.name("mock-syncer".to_owned())
|
|
.spawn(move || syncer.run(rcv)).unwrap();
|
|
|
|
Harness {
|
|
clocks,
|
|
dir_id,
|
|
dir,
|
|
db: tdb.db,
|
|
_tmpdir: tdb.tmpdir,
|
|
channel: super::SyncerChannel(snd),
|
|
join,
|
|
}
|
|
}
|
|
|
|
fn eio() -> io::Error { io::Error::new(io::ErrorKind::Other, "got EIO") }
|
|
|
|
#[test]
|
|
fn write_path_retries() {
|
|
testutil::init();
|
|
let h = new_harness();
|
|
let video_sample_entry_id = h.db.lock().insert_video_sample_entry(
|
|
1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap();
|
|
{
|
|
let mut w = Writer::new(&h.clocks, &h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
|
|
video_sample_entry_id);
|
|
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), Box::new(|_id| Err(eio()))));
|
|
let f = MockFile::new();
|
|
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
|
|
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
|
|
f.expect(MockFileAction::Write(Box::new(|buf| {
|
|
assert_eq!(buf, b"1234");
|
|
Err(eio())
|
|
})));
|
|
f.expect(MockFileAction::Write(Box::new(|buf| {
|
|
assert_eq!(buf, b"1234");
|
|
Ok(1)
|
|
})));
|
|
f.expect(MockFileAction::Write(Box::new(|buf| {
|
|
assert_eq!(buf, b"234");
|
|
Err(eio())
|
|
})));
|
|
f.expect(MockFileAction::Write(Box::new(|buf| {
|
|
assert_eq!(buf, b"234");
|
|
Ok(3)
|
|
})));
|
|
f.expect(MockFileAction::SyncAll(Box::new(|| Err(eio()))));
|
|
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
|
w.write(b"1234", recording::Time(1), 0, true).unwrap();
|
|
h.dir.expect(MockDirAction::Sync(Box::new(|| Err(eio()))));
|
|
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
|
|
drop(w);
|
|
h.channel.flush();
|
|
f.ensure_done();
|
|
h.dir.ensure_done();
|
|
}
|
|
|
|
{
|
|
let l = h.db.lock();
|
|
let s = l.streams_by_id().get(&testutil::TEST_STREAM_ID).unwrap();
|
|
assert_eq!(s.bytes_to_add, 0);
|
|
assert_eq!(s.sample_file_bytes, 4);
|
|
}
|
|
drop(h.channel);
|
|
h.db.lock().clear_on_flush();
|
|
h.join.join().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn gc_path_retries() {
|
|
testutil::init();
|
|
let h = new_harness();
|
|
h.db.lock().update_retention(&[db::RetentionChange {
|
|
stream_id: testutil::TEST_STREAM_ID,
|
|
new_record: true,
|
|
new_limit: 3,
|
|
}]).unwrap();
|
|
|
|
// Setup: add a 3-byte recording.
|
|
let video_sample_entry_id = h.db.lock().insert_video_sample_entry(
|
|
1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap();
|
|
{
|
|
let mut w = Writer::new(&h.clocks, &h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
|
|
video_sample_entry_id);
|
|
let f = MockFile::new();
|
|
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
|
|
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
|
|
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) })));
|
|
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
|
w.write(b"123", recording::Time(2), 0, true).unwrap();
|
|
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
|
|
w.close(Some(1));
|
|
h.channel.flush();
|
|
f.ensure_done();
|
|
h.dir.ensure_done();
|
|
|
|
// Then a 1-byte recording.
|
|
let f = MockFile::new();
|
|
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2),
|
|
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
|
|
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) })));
|
|
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
|
w.write(b"4", recording::Time(3), 1, true).unwrap();
|
|
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
|
|
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new({
|
|
let db = h.db.clone();
|
|
move |_| {
|
|
// The drop(w) below should cause the old recording to be deleted (moved to
|
|
// garbage). When the database is flushed, the syncer forces garbage collection
|
|
// including this unlink.
|
|
|
|
// This should have already applied the changes to sample file bytes, even
|
|
// though the garbage has yet to be collected.
|
|
let l = db.lock();
|
|
let s = l.streams_by_id().get(&testutil::TEST_STREAM_ID).unwrap();
|
|
assert_eq!(s.bytes_to_delete, 0);
|
|
assert_eq!(s.bytes_to_add, 0);
|
|
assert_eq!(s.sample_file_bytes, 1);
|
|
Err(eio()) // force a retry.
|
|
}
|
|
})));
|
|
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new(|_| Ok(()))));
|
|
h.dir.expect(MockDirAction::Sync(Box::new(|| Err(eio()))));
|
|
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
|
|
drop(w);
|
|
h.channel.flush(); // wait until the Save...
|
|
h.channel.flush(); // ...and the DatabaseFlush are processed.
|
|
f.ensure_done();
|
|
h.dir.ensure_done();
|
|
}
|
|
|
|
// Garbage should be marked collected on the next flush.
|
|
{
|
|
let mut l = h.db.lock();
|
|
assert!(!l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage.is_empty());
|
|
l.flush("forced gc").unwrap();
|
|
assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage.is_empty());
|
|
}
|
|
|
|
// The syncer should shut down cleanly.
|
|
drop(h.channel);
|
|
h.db.lock().clear_on_flush();
|
|
h.join.join().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn adjust() {
|
|
testutil::init();
|
|
|
|
// no-ops.
|
|
for v in &[None, Some(0), Some(-10), Some(10)] {
|
|
let mut a = ClockAdjuster::new(*v);
|
|
for _ in 0..1800 {
|
|
assert_eq!(3000, a.adjust(3000), "v={:?}", *v);
|
|
}
|
|
}
|
|
|
|
// typical, 100 ppm adjustment.
|
|
let mut a = ClockAdjuster::new(Some(-540));
|
|
let mut total = 0;
|
|
for _ in 0..1800 {
|
|
let new = a.adjust(3000);
|
|
assert!(new == 2999 || new == 3000);
|
|
total += new;
|
|
}
|
|
let expected = 1800*3000 - 540;
|
|
assert!(total == expected || total == expected + 1, "total={} vs expected={}",
|
|
total, expected);
|
|
|
|
a = ClockAdjuster::new(Some(540));
|
|
let mut total = 0;
|
|
for _ in 0..1800 {
|
|
let new = a.adjust(3000);
|
|
assert!(new == 3000 || new == 3001);
|
|
total += new;
|
|
}
|
|
let expected = 1800*3000 + 540;
|
|
assert!(total == expected || total == expected + 1, "total={} vs expected={}",
|
|
total, expected);
|
|
|
|
// capped at 500 ppm (change of 2,700/90,000ths over 1 minute).
|
|
a = ClockAdjuster::new(Some(-1_000_000));
|
|
total = 0;
|
|
for _ in 0..1800 {
|
|
let new = a.adjust(3000);
|
|
assert!(new == 2998 || new == 2999, "new={}", new);
|
|
total += new;
|
|
}
|
|
let expected = 1800*3000 - 2700;
|
|
assert!(total == expected || total == expected + 1, "total={} vs expected={}",
|
|
total, expected);
|
|
|
|
a = ClockAdjuster::new(Some(1_000_000));
|
|
total = 0;
|
|
for _ in 0..1800 {
|
|
let new = a.adjust(3000);
|
|
assert!(new == 3001 || new == 3002, "new={}", new);
|
|
total += new;
|
|
}
|
|
let expected = 1800*3000 + 2700;
|
|
assert!(total == expected || total == expected + 1, "total={} vs expected={}",
|
|
total, expected);
|
|
}
|
|
}
|