diff --git a/Cargo.lock b/Cargo.lock index 3d7d515..7d647ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -489,6 +489,17 @@ dependencies = [ "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "moonfire-base" +version = "0.0.1" +dependencies = [ + "failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "moonfire-db" version = "0.0.1" @@ -499,6 +510,7 @@ dependencies = [ "libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "moonfire-base 0.0.1", "mylog 0.1.0 (git+https://github.com/scottlamb/mylog)", "openssl 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -538,6 +550,7 @@ dependencies = [ "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "memmap 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "mime 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "moonfire-base 0.0.1", "moonfire-db 0.0.1", "moonfire-ffmpeg 0.0.1", "mylog 0.1.0 (git+https://github.com/scottlamb/mylog)", diff --git a/Cargo.toml b/Cargo.toml index f9d27bc..c2940f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ nightly = ["moonfire-db/nightly", "parking_lot/nightly"] bundled = ["rusqlite/bundled"] [workspace] -members = ["db", "ffmpeg"] +members = ["base", "db", "ffmpeg"] [dependencies] byteorder = "1.0" @@ -30,6 +30,7 @@ libc = "0.2" log = { version = "0.4", features = ["release_max_level_info"] } memmap = "0.6" mime = "0.3" +moonfire-base = { path = "base" } moonfire-db = { path = "db" } moonfire-ffmpeg = { path = "ffmpeg" } mylog = { git = "https://github.com/scottlamb/mylog" } diff --git a/base/Cargo.toml b/base/Cargo.toml new file mode 100644 index 0000000..d30a722 --- /dev/null +++ b/base/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "moonfire-base" +version = "0.0.1" +authors = ["Scott Lamb "] +readme = "../README.md" + +[features] +nightly = [] + +[lib] +path = "lib.rs" + +[dependencies] +failure = "0.1.1" +libc = "0.2" +log = "0.4" +parking_lot = { version = "0.5", features = [] } +time = "0.1" diff --git a/src/clock.rs b/base/clock.rs similarity index 74% rename from src/clock.rs rename to base/clock.rs index 7e547dd..287efb9 100644 --- a/src/clock.rs +++ b/base/clock.rs @@ -1,5 +1,5 @@ -// This file is part of Moonfire NVR, a security camera digital video recorder. -// Copyright (C) 2016 Scott Lamb +// This file is part of Moonfire NVR, a security camera network video recorder. +// Copyright (C) 2018 Scott Lamb // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -30,14 +30,16 @@ //! Clock interface and implementations for testability. +use failure::Error; use libc; -#[cfg(test)] use parking_lot::Mutex; +use parking_lot::Mutex; use std::mem; +use std::sync::Arc; use std::thread; use time::{Duration, Timespec}; /// Abstract interface to the system clocks. This is for testability. -pub trait Clocks : Sync { +pub trait Clocks : Clone + Sync + 'static { /// Gets the current time from `CLOCK_REALTIME`. fn realtime(&self) -> Timespec; @@ -46,12 +48,21 @@ pub trait Clocks : Sync { /// Causes the current thread to sleep for the specified time. fn sleep(&self, how_long: Duration); + + fn retry_forever>(&self, f: &mut FnMut() -> Result) -> T { + loop { + let e = match f() { + Ok(t) => return t, + Err(e) => e.into(), + }; + let sleep_time = Duration::seconds(1); + warn!("sleeping for {:?} after error: {:?}", sleep_time, e); + self.sleep(sleep_time); + } + } } -/// Singleton "real" clocks. -pub static REAL: RealClocks = RealClocks {}; - -/// Real clocks; see static `REAL` instance. +#[derive(Clone)] pub struct RealClocks {} impl RealClocks { @@ -78,13 +89,13 @@ impl Clocks for RealClocks { /// Logs a warning if the TimerGuard lives "too long", using the label created by a supplied /// function. -pub struct TimerGuard<'a, C: Clocks + 'a, S: AsRef, F: FnOnce() -> S + 'a> { +pub struct TimerGuard<'a, C: Clocks, S: AsRef, F: FnOnce() -> S + 'a> { clocks: &'a C, label_f: Option, start: Timespec, } -impl<'a, C: Clocks + 'a, S: AsRef, F: FnOnce() -> S + 'a> TimerGuard<'a, C, S, F> { +impl<'a, C: Clocks, S: AsRef, F: FnOnce() -> S + 'a> TimerGuard<'a, C, S, F> { pub fn new(clocks: &'a C, label_f: F) -> Self { TimerGuard { clocks, @@ -94,7 +105,7 @@ impl<'a, C: Clocks + 'a, S: AsRef, F: FnOnce() -> S + 'a> TimerGuard<'a, C, } } -impl<'a, C: Clocks + 'a, S: AsRef, F: FnOnce() -> S + 'a> Drop for TimerGuard<'a, C, S, F> { +impl<'a, C: Clocks, S: AsRef, F: FnOnce() -> S + 'a> Drop for TimerGuard<'a, C, S, F> { fn drop(&mut self) { let elapsed = self.clocks.monotonic() - self.start; if elapsed.num_seconds() >= 1 { @@ -105,30 +116,30 @@ impl<'a, C: Clocks + 'a, S: AsRef, F: FnOnce() -> S + 'a> Drop for TimerGua } /// Simulated clock for testing. -#[cfg(test)] -pub struct SimulatedClocks { +#[derive(Clone)] +pub struct SimulatedClocks(Arc); + +struct SimulatedClocksInner { boot: Timespec, uptime: Mutex, } -#[cfg(test)] impl SimulatedClocks { - pub fn new(boot: Timespec) -> SimulatedClocks { - SimulatedClocks { + pub fn new(boot: Timespec) -> Self { + SimulatedClocks(Arc::new(SimulatedClocksInner { boot: boot, uptime: Mutex::new(Duration::seconds(0)), - } + })) } } -#[cfg(test)] impl Clocks for SimulatedClocks { - fn realtime(&self) -> Timespec { self.boot + *self.uptime.lock() } - fn monotonic(&self) -> Timespec { Timespec::new(0, 0) + *self.uptime.lock() } + fn realtime(&self) -> Timespec { self.0.boot + *self.0.uptime.lock() } + fn monotonic(&self) -> Timespec { Timespec::new(0, 0) + *self.0.uptime.lock() } /// Advances the clock by the specified amount without actually sleeping. fn sleep(&self, how_long: Duration) { - let mut l = self.uptime.lock(); + let mut l = self.0.uptime.lock(); *l = *l + how_long; } } diff --git a/base/lib.rs b/base/lib.rs new file mode 100644 index 0000000..3c9ea0b --- /dev/null +++ b/base/lib.rs @@ -0,0 +1,37 @@ +// This file is part of Moonfire NVR, a security camera network video recorder. +// Copyright (C) 2018 Scott Lamb +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// In addition, as a special exception, the copyright holders give +// permission to link the code of portions of this program with the +// OpenSSL library under certain conditions as described in each +// individual source file, and distribute linked combinations including +// the two. +// +// You must obey the GNU General Public License in all respects for all +// of the code used other than OpenSSL. If you modify file(s) with this +// exception, you may extend this exception to your version of the +// file(s), but you are not obligated to do so. If you do not wish to do +// so, delete this exception statement from your version. If you delete +// this exception statement from all source files in the program, then +// also delete it here. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +extern crate failure; +extern crate libc; +#[macro_use] extern crate log; +extern crate parking_lot; +extern crate time; + +pub mod clock; diff --git a/db/Cargo.toml b/db/Cargo.toml index 0af2418..3f1d7e3 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -15,8 +15,9 @@ failure = "0.1.1" fnv = "1.0" lazy_static = "1.0" libc = "0.2" -log = { version = "0.4", features = ["release_max_level_info"] } +log = "0.4" lru-cache = "0.1" +moonfire-base = { path = "../base" } mylog = { git = "https://github.com/scottlamb/mylog" } openssl = "0.10" parking_lot = { version = "0.5", features = [] } diff --git a/db/db.rs b/db/db.rs index 197d2c5..687c189 100644 --- a/db/db.rs +++ b/db/db.rs @@ -1697,6 +1697,9 @@ pub struct Database( impl Drop for Database { fn drop(&mut self) { + if ::std::thread::panicking() { + return; // don't flush while panicking. + } if let Some(m) = self.0.take() { if let Err(e) = m.into_inner().flush("drop") { error!("Final database flush failed: {}", e); diff --git a/db/dir.rs b/db/dir.rs index 50b6fbc..a71d808 100644 --- a/db/dir.rs +++ b/db/dir.rs @@ -1,5 +1,5 @@ -// This file is part of Moonfire NVR, a security camera digital video recorder. -// Copyright (C) 2016 Scott Lamb +// This file is part of Moonfire NVR, a security camera network video recorder. +// Copyright (C) 2018 Scott Lamb // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -32,16 +32,11 @@ //! //! This includes opening files for serving, rotating away old files, and saving new files. -use db::{self, CompositeId}; +use db::CompositeId; use failure::{Error, Fail}; -use fnv::FnvHashMap; use libc::{self, c_char}; -use parking_lot::Mutex; use protobuf::{self, Message}; -use recording; -use openssl::hash; use schema; -use std::cmp; use std::ffi; use std::fs; use std::io::{self, Read, Write}; @@ -49,8 +44,6 @@ use std::mem; use std::os::unix::ffi::OsStrExt; use std::os::unix::io::FromRawFd; use std::sync::Arc; -use std::sync::mpsc; -use std::thread; /// A sample file directory. Typically one per physical disk drive. /// @@ -257,11 +250,16 @@ impl SampleFileDir { } /// Opens the given sample file for reading. - pub fn open_sample_file(&self, composite_id: CompositeId) -> Result { + pub fn open_file(&self, composite_id: CompositeId) -> Result { let p = SampleFileDir::get_rel_pathname(composite_id); unsafe { self.fd.openat(p.as_ptr(), libc::O_RDONLY, 0) } } + pub fn create_file(&self, composite_id: CompositeId) -> Result { + let p = SampleFileDir::get_rel_pathname(composite_id); + unsafe { self.fd.openat(p.as_ptr(), libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, 0o600) } + } + pub(crate) fn write_meta(&self, meta: &schema::DirMeta) -> Result<(), Error> { write_meta(&self.fd, meta) } @@ -278,9 +276,9 @@ impl SampleFileDir { } /// Unlinks the given sample file within this directory. - fn unlink(fd: &Fd, id: CompositeId) -> Result<(), io::Error> { + pub(crate) fn unlink_file(&self, id: CompositeId) -> Result<(), io::Error> { let p = SampleFileDir::get_rel_pathname(id); - let res = unsafe { libc::unlinkat(fd.0, p.as_ptr(), 0) }; + let res = unsafe { libc::unlinkat(self.fd.0, p.as_ptr(), 0) }; if res < 0 { return Err(io::Error::last_os_error()) } @@ -288,612 +286,11 @@ impl SampleFileDir { } /// Syncs the directory itself. - fn sync(&self) -> Result<(), io::Error> { + pub(crate) fn sync(&self) -> Result<(), io::Error> { self.fd.sync() } } -/// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct. -enum SyncerCommand { - AsyncSaveRecording(CompositeId, fs::File), - DatabaseFlushed, - Flush(mpsc::SyncSender<()>), -} - -/// A channel which can be used to send commands to the syncer. -/// Can be cloned to allow multiple threads to send commands. -#[derive(Clone)] -pub struct SyncerChannel(mpsc::Sender); - -/// State of the worker thread. -struct Syncer { - dir_id: i32, - dir: Arc, - db: Arc, -} - -/// Starts a syncer for the given sample file directory. -/// -/// The lock must not be held on `db` when this is called. -/// -/// There should be only one syncer per directory, or 0 if operating in read-only mode. -/// This function will perform the initial rotation synchronously, so that it is finished before -/// file writing starts. Afterward the syncing happens in a background thread. -/// -/// Returns a `SyncerChannel` which can be used to send commands (and can be cloned freely) and -/// a `JoinHandle` for the syncer thread. Commands sent on the channel will be executed or retried -/// forever. (TODO: provide some manner of pushback during retry.) At program shutdown, all -/// `SyncerChannel` clones should be dropped and then the handle joined to allow all recordings to -/// be persisted. -/// -/// Note that dropping all `SyncerChannel` clones currently includes calling -/// `LockedDatabase::clear_on_flush`, as this function installs a hook to watch database flushes. -/// TODO: add a join wrapper which arranges for the on flush hook to be removed automatically. -pub fn start_syncer(db: Arc, dir_id: i32) - -> Result<(SyncerChannel, thread::JoinHandle<()>), Error> { - let db2 = db.clone(); - let (mut syncer, path) = Syncer::new(&db.lock(), db2, dir_id)?; - syncer.initial_rotation()?; - let (snd, rcv) = mpsc::channel(); - db.lock().on_flush(Box::new({ - let snd = snd.clone(); - move || if let Err(e) = snd.send(SyncerCommand::DatabaseFlushed) { - warn!("Unable to notify syncer for dir {} of flush: {}", dir_id, e); - } - })); - Ok((SyncerChannel(snd), - thread::Builder::new() - .name(format!("sync-{}", path)) - .spawn(move || syncer.run(rcv)).unwrap())) -} - -pub struct NewLimit { - pub stream_id: i32, - pub limit: i64, -} - -/// Deletes recordings if necessary to fit within the given new `retain_bytes` limit. -/// Note this doesn't change the limit in the database; it only deletes files. -/// Pass a limit of 0 to delete all recordings associated with a camera. -pub fn lower_retention(db: Arc, dir_id: i32, limits: &[NewLimit]) - -> Result<(), Error> { - let db2 = db.clone(); - let (mut syncer, _) = Syncer::new(&db.lock(), db2, dir_id)?; - syncer.do_rotation(|db| { - for l in limits { - let (bytes_before, extra); - { - let stream = db.streams_by_id().get(&l.stream_id) - .ok_or_else(|| format_err!("no such stream {}", l.stream_id))?; - bytes_before = stream.sample_file_bytes + stream.bytes_to_add - - stream.bytes_to_delete; - extra = stream.retain_bytes - l.limit; - } - if l.limit >= bytes_before { continue } - delete_recordings(db, l.stream_id, extra)?; - let stream = db.streams_by_id().get(&l.stream_id).unwrap(); - info!("stream {}, deleting: {}->{}", l.stream_id, bytes_before, - stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete); - } - Ok(()) - }) -} - -/// Deletes recordings to bring a stream's disk usage within bounds. -fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32, - extra_bytes_needed: i64) -> Result<(), Error> { - let bytes_needed = { - let stream = match db.streams_by_id().get(&stream_id) { - None => bail!("no stream {}", stream_id), - Some(s) => s, - }; - stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete + extra_bytes_needed - - stream.retain_bytes - }; - let mut bytes_to_delete = 0; - if bytes_needed <= 0 { - debug!("{}: have remaining quota of {}", stream_id, -bytes_needed); - return Ok(()); - } - let mut n = 0; - db.delete_oldest_recordings(stream_id, &mut |row| { - n += 1; - if bytes_needed >= bytes_to_delete { - bytes_to_delete += row.sample_file_bytes as i64; - n += 1; - return true; - } - false - })?; - info!("{}: deleting {} bytes in {} recordings ({} bytes needed)", - stream_id, bytes_to_delete, n, bytes_needed); - Ok(()) -} - -impl SyncerChannel { - /// Asynchronously syncs the given writer, closes it, records it into the database, and - /// starts rotation. - fn async_save_recording(&self, id: CompositeId, f: fs::File) { - self.0.send(SyncerCommand::AsyncSaveRecording(id, f)).unwrap(); - } - - /// For testing: flushes the syncer, waiting for all currently-queued commands to complete. - pub fn flush(&self) { - let (snd, rcv) = mpsc::sync_channel(0); - self.0.send(SyncerCommand::Flush(snd)).unwrap(); - rcv.recv().unwrap_err(); // syncer should just drop the channel, closing it. - } -} - -impl Syncer { - fn new(l: &db::LockedDatabase, db: Arc, dir_id: i32) - -> Result<(Self, String), Error> { - let d = l.sample_file_dirs_by_id() - .get(&dir_id) - .ok_or_else(|| format_err!("no dir {}", dir_id))?; - let dir = d.get()?; - - // Abandon files. - // First, get a list of the streams in question. - let streams_to_next: FnvHashMap<_, _> = - l.streams_by_id() - .iter() - .filter_map(|(&k, v)| { - if v.sample_file_dir_id == Some(dir_id) { - Some((k, v.next_recording_id)) - } else { - None - } - }) - .collect(); - let to_abandon = Syncer::list_files_to_abandon(&d.path, streams_to_next)?; - let mut undeletable = 0; - for &id in &to_abandon { - if let Err(e) = SampleFileDir::unlink(&dir.fd, id) { - if e.kind() == io::ErrorKind::NotFound { - warn!("dir: abandoned recording {} already deleted!", id); - } else { - warn!("dir: Unable to unlink abandoned recording {}: {}", id, e); - undeletable += 1; - } - } - } - if undeletable > 0 { - bail!("Unable to delete {} abandoned recordings.", undeletable); - } - - Ok((Syncer { - dir_id, - dir, - db, - }, d.path.clone())) - } - - /// Lists files which should be "abandoned" (deleted without ever recording in the database) - /// on opening. - fn list_files_to_abandon(path: &str, streams_to_next: FnvHashMap) - -> Result, Error> { - let mut v = Vec::new(); - for e in ::std::fs::read_dir(path)? { - let e = e?; - let id = match parse_id(e.file_name().as_bytes()) { - Ok(i) => i, - Err(_) => continue, - }; - let next = match streams_to_next.get(&id.stream()) { - Some(n) => *n, - None => continue, // unknown stream. - }; - if id.recording() >= next { - v.push(id); - } - } - Ok(v) - } - - fn run(&mut self, cmds: mpsc::Receiver) { - loop { - match cmds.recv() { - Err(_) => return, // all senders have closed the channel; shutdown - Ok(SyncerCommand::AsyncSaveRecording(id, f)) => self.save(id, f), - Ok(SyncerCommand::DatabaseFlushed) => { - retry_forever(&mut || self.collect_garbage(true)) - }, - Ok(SyncerCommand::Flush(_)) => {}, // just drop the supplied sender, closing it. - }; - } - } - - /// Rotates files for all streams and deletes stale files from previous runs. - /// Called from main thread. - fn initial_rotation(&mut self) -> Result<(), Error> { - self.do_rotation(|db| { - let streams: Vec = db.streams_by_id().keys().map(|&id| id).collect(); - for &stream_id in &streams { - delete_recordings(db, stream_id, 0)?; - } - Ok(()) - }) - } - - /// Helper to do initial or retention-lowering rotation. Called from main thread. - fn do_rotation(&mut self, delete_recordings: F) -> Result<(), Error> - where F: FnOnce(&mut db::LockedDatabase) -> Result<(), Error> { - { - let mut db = self.db.lock(); - delete_recordings(&mut *db)?; - db.flush("synchronous deletion")?; - } - self.collect_garbage(false)?; - self.db.lock().flush("synchronous garbage collection") - } - - /// Helper for collecting garbage; called from main or worker threads. - fn collect_garbage(&mut self, warn_on_missing: bool) -> Result<(), Error> { - let mut garbage: Vec<_> = { - let l = self.db.lock(); - let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap(); - d.garbage.iter().map(|id| *id).collect() - }; - let len_before = garbage.len(); - garbage.retain(|&id| { - if let Err(e) = SampleFileDir::unlink(&self.dir.fd, id) { - if e.kind() == io::ErrorKind::NotFound { - if warn_on_missing { - warn!("dir: recording {} already deleted!", id); - } - } else { - warn!("dir: Unable to unlink {}: {}", id, e); - return false; - } - } - true - }); - let res = if len_before > garbage.len() { - Err(format_err!("Unable to unlink {} files (see earlier warning messages for details)", - len_before - garbage.len())) - } else { - Ok(()) - }; - if garbage.is_empty() { - // No progress. - return res; - } - if let Err(e) = self.dir.sync() { - error!("unable to sync dir: {}", e); - return res.and(Err(e.into())); - } - if let Err(e) = self.db.lock().delete_garbage(self.dir_id, &mut garbage) { - error!("unable to delete garbage ({} files) for dir {}: {}", - self.dir_id, garbage.len(), e); - return res.and(Err(e.into())); - } - res - } - - /// Saves the given recording and causes rotation to happen. Called from worker thread. - /// - /// Note that part of rotation is deferred for the next cycle (saved writing or program startup) - /// so that there can be only one dir sync and database transaction per save. - /// Internal helper for `save`. This is separated out so that the question-mark operator - /// can be used in the many error paths. - fn save(&mut self, id: CompositeId, f: fs::File) { - let stream_id = id.stream(); - - // Free up a like number of bytes. - retry_forever(&mut || delete_recordings(&mut self.db.lock(), stream_id, 0)); - retry_forever(&mut || f.sync_all()); - retry_forever(&mut || self.dir.sync()); - let mut db = self.db.lock(); - db.mark_synced(id).unwrap(); - let reason = { - let s = db.streams_by_id().get(&stream_id).unwrap(); - let c = db.cameras_by_id().get(&s.camera_id).unwrap(); - let unflushed = s.unflushed(); - if unflushed < s.flush_if { - debug!("{}-{}: unflushed={} < if={}, not flushing", - c.short_name, s.type_.as_str(), unflushed, s.flush_if); - return; - } - format!("{}-{}: unflushed={} >= if={}", - c.short_name, s.type_.as_str(), unflushed, s.flush_if) - }; - - if let Err(e) = db.flush(&reason) { - // Don't retry the commit now in case it causes extra flash write cycles. - // It's not necessary for correctness to flush before proceeding. - // Just wait until the next flush would happen naturally. - warn!("flush failure on save for reason {}; leaving unflushed for now: {:?}", - reason, e); - } - } -} - -fn retry_forever>(f: &mut FnMut() -> Result) -> T { - let sleep_time = ::std::time::Duration::new(1, 0); - loop { - let e = match f() { - Ok(t) => return t, - Err(e) => e.into(), - }; - warn!("sleeping for {:?} after error: {:?}", sleep_time, e); - thread::sleep(sleep_time); - } -} - -/// Struct for writing a single run (of potentially several recordings) to disk and committing its -/// metadata to the database. `Writer` hands off each recording's state to the syncer when done. It -/// saves the recording to the database (if I/O errors do not prevent this), retries forever, -/// or panics (if further writing on this stream is impossible). -pub struct Writer<'a> { - dir: &'a SampleFileDir, - db: &'a db::Database, - channel: &'a SyncerChannel, - stream_id: i32, - video_sample_entry_id: i32, - state: WriterState, -} - -enum WriterState { - Unopened, - Open(InnerWriter), - Closed(PreviousWriter), -} - -/// State for writing a single recording, used within `Writer`. -/// -/// Note that the recording created by every `InnerWriter` must be written to the `SyncerChannel` -/// with at least one sample. The sample may have zero duration. -struct InnerWriter { - f: fs::File, - r: Arc>, - e: recording::SampleIndexEncoder, - id: CompositeId, - hasher: hash::Hasher, - - /// The start time of this segment, based solely on examining the local clock after frames in - /// this segment were received. Frames can suffer from various kinds of delay (initial - /// buffering, encoding, and network transmission), so this time is set to far in the future on - /// construction, given a real value on the first packet, and decreased as less-delayed packets - /// are discovered. See design/time.md for details. - local_start: recording::Time, - - adjuster: ClockAdjuster, - - /// A sample which has been written to disk but not added to `index`. Index writes are one - /// sample behind disk writes because the duration of a sample is the difference between its - /// pts and the next sample's pts. A sample is flushed when the next sample is written, when - /// the writer is closed cleanly (the caller supplies the next pts), or when the writer is - /// closed uncleanly (with a zero duration, which the `.mp4` format allows only at the end). - /// - /// Invariant: this should always be `Some` (briefly violated during `write` call only). - unflushed_sample: Option, -} - -/// Adjusts durations given by the camera to correct its clock frequency error. -#[derive(Copy, Clone, Debug)] -struct ClockAdjuster { - /// Every `every_minus_1 + 1` units, add `-ndir`. - /// Note i32::max_value() disables adjustment. - every_minus_1: i32, - - /// Should be 1 or -1 (unless disabled). - ndir: i32, - - /// Keeps accumulated difference from previous values. - cur: i32, -} - -impl ClockAdjuster { - fn new(local_time_delta: Option) -> Self { - // Pick an adjustment rate to correct local_time_delta over the next minute (the - // desired duration of a single recording). Cap the rate at 500 ppm (which corrects - // 2,700/90,000ths of a second over a minute) to prevent noticeably speeding up or slowing - // down playback. - let (every_minus_1, ndir) = match local_time_delta { - Some(d) if d <= -2700 => (1999, 1), - Some(d) if d >= 2700 => (1999, -1), - Some(d) if d < -60 => ((60 * 90000) / -(d as i32) - 1, 1), - Some(d) if d > 60 => ((60 * 90000) / (d as i32) - 1, -1), - _ => (i32::max_value(), 0), - }; - ClockAdjuster{ - every_minus_1, - ndir, - cur: 0, - } - } - - fn adjust(&mut self, mut val: i32) -> i32 { - self.cur += val; - - // The "val > self.ndir" here is so that if decreasing durations (ndir == 1), we don't - // cause a duration of 1 to become a duration of 0. It has no effect when increasing - // durations. (There's no danger of a duration of 0 becoming a duration of 1; cur wouldn't - // be newly > self.every_minus_1.) - while self.cur > self.every_minus_1 && val > self.ndir { - val -= self.ndir; - self.cur -= self.every_minus_1 + 1; - } - val - } -} - -#[derive(Copy, Clone)] -struct UnflushedSample { - local_time: recording::Time, - pts_90k: i64, - len: i32, - is_key: bool, -} - -/// State associated with a run's previous recording; used within `Writer`. -#[derive(Copy, Clone)] -struct PreviousWriter { - end: recording::Time, - local_time_delta: recording::Duration, - run_offset: i32, -} - -impl<'a> Writer<'a> { - pub fn new(dir: &'a SampleFileDir, db: &'a db::Database, channel: &'a SyncerChannel, - stream_id: i32, video_sample_entry_id: i32) -> Self { - Writer { - dir, - db, - channel, - stream_id, - video_sample_entry_id, - state: WriterState::Unopened, - } - } - - /// Opens a new writer. - /// This returns a writer that violates the invariant that `unflushed_sample` is `Some`. - /// The caller (`write`) is responsible for correcting this. - fn open(&mut self) -> Result<&mut InnerWriter, Error> { - let prev = match self.state { - WriterState::Unopened => None, - WriterState::Open(ref mut w) => return Ok(w), - WriterState::Closed(prev) => Some(prev), - }; - let (id, r) = self.db.lock().add_recording(self.stream_id, db::RecordingToInsert { - run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0), - start: prev.map(|p| p.end).unwrap_or(recording::Time(i64::max_value())), - video_sample_entry_id: self.video_sample_entry_id, - flags: db::RecordingFlags::Growing as i32, - ..Default::default() - })?; - let p = SampleFileDir::get_rel_pathname(id); - let f = retry_forever(&mut || unsafe { - self.dir.fd.openat(p.as_ptr(), libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, 0o600) - }); - - self.state = WriterState::Open(InnerWriter { - f, - r, - e: recording::SampleIndexEncoder::new(), - id, - hasher: hash::Hasher::new(hash::MessageDigest::sha1())?, - local_start: recording::Time(i64::max_value()), - adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)), - unflushed_sample: None, - }); - match self.state { - WriterState::Open(ref mut w) => Ok(w), - _ => unreachable!(), - } - } - - pub fn previously_opened(&self) -> Result { - Ok(match self.state { - WriterState::Unopened => false, - WriterState::Closed(_) => true, - WriterState::Open(_) => bail!("open!"), - }) - } - - /// Writes a new frame to this segment. - /// `local_time` should be the local clock's time as of when this packet was received. - pub fn write(&mut self, pkt: &[u8], local_time: recording::Time, pts_90k: i64, - is_key: bool) -> Result<(), Error> { - let w = self.open()?; - - // Note w's invariant that `unflushed_sample` is `None` may currently be violated. - // We must restore it on all success or error paths. - - if let Some(unflushed) = w.unflushed_sample.take() { - let duration = (pts_90k - unflushed.pts_90k) as i32; - if duration <= 0 { - // Restore invariant. - w.unflushed_sample = Some(unflushed); - bail!("pts not monotonically increasing; got {} then {}", - unflushed.pts_90k, pts_90k); - } - let duration = w.adjuster.adjust(duration); - w.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time); - } - let mut remaining = pkt; - while !remaining.is_empty() { - let written = retry_forever(&mut || w.f.write(remaining)); - remaining = &remaining[written..]; - } - w.unflushed_sample = Some(UnflushedSample { - local_time, - pts_90k, - len: pkt.len() as i32, - is_key, - }); - w.hasher.update(pkt).unwrap(); - Ok(()) - } - - /// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's - /// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait, - /// swallowing errors and using a zero duration for the last sample. - pub fn close(&mut self, next_pts: Option) { - self.state = match mem::replace(&mut self.state, WriterState::Unopened) { - WriterState::Open(w) => { - let prev = w.close(self.channel, next_pts); - WriterState::Closed(prev) - }, - s => s, - }; - } -} - -impl InnerWriter { - fn add_sample(&mut self, duration_90k: i32, bytes: i32, is_key: bool, - pkt_local_time: recording::Time) { - let mut l = self.r.lock(); - self.e.add_sample(duration_90k, bytes, is_key, &mut l); - let new = pkt_local_time - recording::Duration(l.duration_90k as i64); - self.local_start = cmp::min(self.local_start, new); - if l.run_offset == 0 { // start time isn't anchored to previous recording's end; adjust. - l.start = self.local_start; - } - } - - fn close(mut self, channel: &SyncerChannel, next_pts: Option) -> PreviousWriter { - let unflushed = self.unflushed_sample.take().expect("should always be an unflushed sample"); - let (duration, flags) = match next_pts { - None => (self.adjuster.adjust(0), db::RecordingFlags::TrailingZero as i32), - Some(p) => (self.adjuster.adjust((p - unflushed.pts_90k) as i32), 0), - }; - let mut sha1_bytes = [0u8; 20]; - sha1_bytes.copy_from_slice(&self.hasher.finish().unwrap()[..]); - let (local_time_delta, run_offset, end); - self.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time); - { - let mut l = self.r.lock(); - l.flags = flags; - local_time_delta = self.local_start - l.start; - l.local_time_delta = local_time_delta; - l.sample_file_sha1 = sha1_bytes; - run_offset = l.run_offset; - end = l.start + recording::Duration(l.duration_90k as i64); - } - drop(self.r); - channel.async_save_recording(self.id, self.f); - PreviousWriter { - end, - local_time_delta, - run_offset, - } - } -} - -impl<'a> Drop for Writer<'a> { - fn drop(&mut self) { - if let WriterState::Open(w) = mem::replace(&mut self.state, WriterState::Unopened) { - // Swallow any error. The caller should only drop the Writer without calling close() - // if there's already been an error. The caller should report that. No point in - // complaining again. - let _ = w.close(self.channel, None); - } - } -} - /// Parse a composite id filename. /// /// These are exactly 16 bytes, lowercase hex. @@ -914,68 +311,6 @@ pub(crate) fn parse_id(id: &[u8]) -> Result { #[cfg(test)] mod tests { - use super::ClockAdjuster; - use testutil; - - #[test] - fn adjust() { - testutil::init(); - - // no-ops. - for v in &[None, Some(0), Some(-10), Some(10)] { - let mut a = ClockAdjuster::new(*v); - for _ in 0..1800 { - assert_eq!(3000, a.adjust(3000), "v={:?}", *v); - } - } - - // typical, 100 ppm adjustment. - let mut a = ClockAdjuster::new(Some(-540)); - let mut total = 0; - for _ in 0..1800 { - let new = a.adjust(3000); - assert!(new == 2999 || new == 3000); - total += new; - } - let expected = 1800*3000 - 540; - assert!(total == expected || total == expected + 1, "total={} vs expected={}", - total, expected); - - a = ClockAdjuster::new(Some(540)); - let mut total = 0; - for _ in 0..1800 { - let new = a.adjust(3000); - assert!(new == 3000 || new == 3001); - total += new; - } - let expected = 1800*3000 + 540; - assert!(total == expected || total == expected + 1, "total={} vs expected={}", - total, expected); - - // capped at 500 ppm (change of 2,700/90,000ths over 1 minute). - a = ClockAdjuster::new(Some(-1_000_000)); - total = 0; - for _ in 0..1800 { - let new = a.adjust(3000); - assert!(new == 2998 || new == 2999, "new={}", new); - total += new; - } - let expected = 1800*3000 - 2700; - assert!(total == expected || total == expected + 1, "total={} vs expected={}", - total, expected); - - a = ClockAdjuster::new(Some(1_000_000)); - total = 0; - for _ in 0..1800 { - let new = a.adjust(3000); - assert!(new == 3001 || new == 3002, "new={}", new); - total += new; - } - let expected = 1800*3000 + 2700; - assert!(total == expected || total == expected + 1, "total={} vs expected={}", - total, expected); - } - #[test] fn parse_id() { use super::parse_id; diff --git a/db/lib.rs b/db/lib.rs index 1bbbc33..b5fa8e4 100644 --- a/db/lib.rs +++ b/db/lib.rs @@ -36,12 +36,14 @@ extern crate fnv; extern crate libc; #[macro_use] extern crate log; extern crate lru_cache; +extern crate moonfire_base as base; extern crate mylog; extern crate openssl; extern crate parking_lot; extern crate protobuf; extern crate regex; extern crate rusqlite; +extern crate tempdir; extern crate time; extern crate uuid; @@ -53,6 +55,7 @@ mod raw; pub mod recording; mod schema; pub mod upgrade; +pub mod writer; // This is only for #[cfg(test)], but it's also used by the dependent crate, and it appears that // #[cfg(test)] is not passed on to dependencies. diff --git a/db/raw.rs b/db/raw.rs index 7c2ad78..e0ebae1 100644 --- a/db/raw.rs +++ b/db/raw.rs @@ -31,7 +31,7 @@ //! Raw database access: SQLite statements which do not touch any cached state. use db::{self, CompositeId, FromSqlUuid}; -use failure::Error; +use failure::{Error, ResultExt}; use fnv::FnvHashSet; use recording; use rusqlite; @@ -191,7 +191,8 @@ pub(crate) fn get_db_uuid(conn: &rusqlite::Connection) -> Result { /// Inserts the specified recording (for from `try_flush` only). pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: CompositeId, r: &db::RecordingToInsert) -> Result<(), Error> { - let mut stmt = tx.prepare_cached(INSERT_RECORDING_SQL)?; + let mut stmt = tx.prepare_cached(INSERT_RECORDING_SQL) + .with_context(|e| format!("can't prepare recording insert: {}", e))?; stmt.execute_named(&[ (":composite_id", &id.0), (":stream_id", &(id.stream() as i64)), @@ -205,15 +206,16 @@ pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: Com (":video_samples", &r.video_samples), (":video_sync_samples", &r.video_sync_samples), (":video_sample_entry_id", &r.video_sample_entry_id), - ])?; + ]).with_context(|e| format!("unable to insert recording for {:#?}: {}", r, e))?; - let mut stmt = tx.prepare_cached(INSERT_RECORDING_PLAYBACK_SQL)?; + let mut stmt = tx.prepare_cached(INSERT_RECORDING_PLAYBACK_SQL) + .with_context(|e| format!("can't prepare recording_playback insert: {}", e))?; let sha1 = &r.sample_file_sha1[..]; stmt.execute_named(&[ (":composite_id", &id.0), (":sample_file_sha1", &sha1), (":video_index", &r.video_index), - ])?; + ]).with_context(|e| format!("unable to insert recording_playback for {:#?}: {}", r, e))?; Ok(()) } diff --git a/db/testutil.rs b/db/testutil.rs index 8e57373..cde4d6f 100644 --- a/db/testutil.rs +++ b/db/testutil.rs @@ -28,8 +28,6 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -extern crate tempdir; - use db; use dir; use fnv::FnvHashMap; @@ -38,8 +36,10 @@ use rusqlite; use std::env; use std::sync::{self, Arc}; use std::thread; +use tempdir::TempDir; use time; use uuid::Uuid; +use writer; static INIT: sync::Once = sync::ONCE_INIT; @@ -66,16 +66,16 @@ pub fn init() { pub struct TestDb { pub db: Arc, pub dirs_by_stream_id: Arc>>, - pub syncer_channel: dir::SyncerChannel, + pub syncer_channel: writer::SyncerChannel<::std::fs::File>, pub syncer_join: thread::JoinHandle<()>, - pub tmpdir: tempdir::TempDir, + pub tmpdir: TempDir, pub test_camera_uuid: Uuid, } impl TestDb { /// Creates a test database with one camera. pub fn new() -> TestDb { - let tmpdir = tempdir::TempDir::new("moonfire-nvr-test").unwrap(); + let tmpdir = TempDir::new("moonfire-nvr-test").unwrap(); let mut conn = rusqlite::Connection::open_in_memory().unwrap(); db::Database::init(&mut conn).unwrap(); @@ -113,7 +113,7 @@ impl TestDb { let mut dirs_by_stream_id = FnvHashMap::default(); dirs_by_stream_id.insert(TEST_STREAM_ID, dir.clone()); let (syncer_channel, syncer_join) = - dir::start_syncer(db.clone(), sample_file_dir_id).unwrap(); + writer::start_syncer(db.clone(), sample_file_dir_id).unwrap(); TestDb { db, dirs_by_stream_id: Arc::new(dirs_by_stream_id), diff --git a/db/writer.rs b/db/writer.rs new file mode 100644 index 0000000..0e53ab0 --- /dev/null +++ b/db/writer.rs @@ -0,0 +1,1022 @@ +// This file is part of Moonfire NVR, a security camera network video recorder. +// Copyright (C) 2018 Scott Lamb +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// In addition, as a special exception, the copyright holders give +// permission to link the code of portions of this program with the +// OpenSSL library under certain conditions as described in each +// individual source file, and distribute linked combinations including +// the two. +// +// You must obey the GNU General Public License in all respects for all +// of the code used other than OpenSSL. If you modify file(s) with this +// exception, you may extend this exception to your version of the +// file(s), but you are not obligated to do so. If you do not wish to do +// so, delete this exception statement from your version. If you delete +// this exception statement from all source files in the program, then +// also delete it here. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Sample file directory management. +//! +//! This includes opening files for serving, rotating away old files, and saving new files. + +use base::clock::{self, Clocks}; +use db::{self, CompositeId}; +use dir; +use failure::Error; +use fnv::FnvHashMap; +use parking_lot::Mutex; +use recording; +use openssl::hash; +use std::cmp; +use std::io; +use std::mem; +use std::os::unix::ffi::OsStrExt; +use std::sync::Arc; +use std::sync::mpsc; +use std::thread; + +pub trait DirWriter : 'static + Send { + type File : FileWriter; + + fn create_file(&self, id: CompositeId) -> Result; + fn sync(&self) -> Result<(), io::Error>; + fn unlink_file(&self, id: CompositeId) -> Result<(), io::Error>; +} + +pub trait FileWriter : 'static { + /// As in `std::fs::File::sync_all`. + fn sync_all(&self) -> Result<(), io::Error>; + + /// As in `std::io::Writer::write`. + fn write(&mut self, buf: &[u8]) -> Result; +} + +impl DirWriter for Arc { + type File = ::std::fs::File; + + fn create_file(&self, id: CompositeId) -> Result { + dir::SampleFileDir::create_file(self, id) + } + fn sync(&self) -> Result<(), io::Error> { dir::SampleFileDir::sync(self) } + fn unlink_file(&self, id: CompositeId) -> Result<(), io::Error> { + dir::SampleFileDir::unlink_file(self, id) + } +} + +impl FileWriter for ::std::fs::File { + fn sync_all(&self) -> Result<(), io::Error> { self.sync_all() } + fn write(&mut self, buf: &[u8]) -> Result { io::Write::write(self, buf) } +} + +/// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct. +enum SyncerCommand { + AsyncSaveRecording(CompositeId, F), + DatabaseFlushed, + Flush(mpsc::SyncSender<()>), +} + +/// A channel which can be used to send commands to the syncer. +/// Can be cloned to allow multiple threads to send commands. +pub struct SyncerChannel(mpsc::Sender>); + +impl ::std::clone::Clone for SyncerChannel { + fn clone(&self) -> Self { SyncerChannel(self.0.clone()) } +} + +/// State of the worker thread. +struct Syncer { + clocks: C, + dir_id: i32, + dir: D, + db: Arc, +} + +/// Starts a syncer for the given sample file directory. +/// +/// The lock must not be held on `db` when this is called. +/// +/// There should be only one syncer per directory, or 0 if operating in read-only mode. +/// This function will perform the initial rotation synchronously, so that it is finished before +/// file writing starts. Afterward the syncing happens in a background thread. +/// +/// Returns a `SyncerChannel` which can be used to send commands (and can be cloned freely) and +/// a `JoinHandle` for the syncer thread. Commands sent on the channel will be executed or retried +/// forever. (TODO: provide some manner of pushback during retry.) At program shutdown, all +/// `SyncerChannel` clones should be dropped and then the handle joined to allow all recordings to +/// be persisted. +/// +/// Note that dropping all `SyncerChannel` clones currently includes calling +/// `LockedDatabase::clear_on_flush`, as this function installs a hook to watch database flushes. +/// TODO: add a join wrapper which arranges for the on flush hook to be removed automatically. +pub fn start_syncer(db: Arc, dir_id: i32) + -> Result<(SyncerChannel<::std::fs::File>, thread::JoinHandle<()>), Error> { + let db2 = db.clone(); + let (mut syncer, path) = Syncer::new(&db.lock(), db2, dir_id)?; + syncer.initial_rotation()?; + let (snd, rcv) = mpsc::channel(); + db.lock().on_flush(Box::new({ + let snd = snd.clone(); + move || if let Err(e) = snd.send(SyncerCommand::DatabaseFlushed) { + warn!("Unable to notify syncer for dir {} of flush: {}", dir_id, e); + } + })); + Ok((SyncerChannel(snd), + thread::Builder::new() + .name(format!("sync-{}", path)) + .spawn(move || syncer.run(rcv)).unwrap())) +} + +pub struct NewLimit { + pub stream_id: i32, + pub limit: i64, +} + +/// Deletes recordings if necessary to fit within the given new `retain_bytes` limit. +/// Note this doesn't change the limit in the database; it only deletes files. +/// Pass a limit of 0 to delete all recordings associated with a camera. +pub fn lower_retention(db: Arc, dir_id: i32, limits: &[NewLimit]) + -> Result<(), Error> { + let db2 = db.clone(); + let (mut syncer, _) = Syncer::new(&db.lock(), db2, dir_id)?; + syncer.do_rotation(|db| { + for l in limits { + let (bytes_before, extra); + { + let stream = db.streams_by_id().get(&l.stream_id) + .ok_or_else(|| format_err!("no such stream {}", l.stream_id))?; + bytes_before = stream.sample_file_bytes + stream.bytes_to_add - + stream.bytes_to_delete; + extra = stream.retain_bytes - l.limit; + } + if l.limit >= bytes_before { continue } + delete_recordings(db, l.stream_id, extra)?; + let stream = db.streams_by_id().get(&l.stream_id).unwrap(); + info!("stream {}, deleting: {}->{}", l.stream_id, bytes_before, + stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete); + } + Ok(()) + }) +} + +/// Deletes recordings to bring a stream's disk usage within bounds. +fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32, + extra_bytes_needed: i64) -> Result<(), Error> { + let bytes_needed = { + let stream = match db.streams_by_id().get(&stream_id) { + None => bail!("no stream {}", stream_id), + Some(s) => s, + }; + error!("sample_file_bytes={} to_add={} to_delete={} extra_needed={} retain={}", + stream.sample_file_bytes, stream.bytes_to_add, stream.bytes_to_delete, + extra_bytes_needed, stream.retain_bytes); + stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete + extra_bytes_needed + - stream.retain_bytes + }; + let mut bytes_to_delete = 0; + if bytes_needed <= 0 { + debug!("{}: have remaining quota of {}", stream_id, -bytes_needed); + return Ok(()); + } + let mut n = 0; + db.delete_oldest_recordings(stream_id, &mut |row| { + if bytes_needed >= bytes_to_delete { + bytes_to_delete += row.sample_file_bytes as i64; + n += 1; + return true; + } + false + })?; + info!("{}: deleting {} bytes in {} recordings ({} bytes needed)", + stream_id, bytes_to_delete, n, bytes_needed); + Ok(()) +} + +impl SyncerChannel { + /// Asynchronously syncs the given writer, closes it, records it into the database, and + /// starts rotation. + fn async_save_recording(&self, id: CompositeId, f: F) { + self.0.send(SyncerCommand::AsyncSaveRecording(id, f)).unwrap(); + } + + /// For testing: flushes the syncer, waiting for all currently-queued commands to complete. + pub fn flush(&self) { + let (snd, rcv) = mpsc::sync_channel(0); + self.0.send(SyncerCommand::Flush(snd)).unwrap(); + rcv.recv().unwrap_err(); // syncer should just drop the channel, closing it. + } +} + +impl Syncer> { + fn new(l: &db::LockedDatabase, db: Arc, dir_id: i32) + -> Result<(Self, String), Error> { + let d = l.sample_file_dirs_by_id() + .get(&dir_id) + .ok_or_else(|| format_err!("no dir {}", dir_id))?; + let dir = d.get()?; + + // Abandon files. + // First, get a list of the streams in question. + let streams_to_next: FnvHashMap<_, _> = + l.streams_by_id() + .iter() + .filter_map(|(&k, v)| { + if v.sample_file_dir_id == Some(dir_id) { + Some((k, v.next_recording_id)) + } else { + None + } + }) + .collect(); + let to_abandon = Syncer::list_files_to_abandon(&d.path, streams_to_next)?; + let mut undeletable = 0; + for &id in &to_abandon { + if let Err(e) = dir.unlink_file(id) { + if e.kind() == io::ErrorKind::NotFound { + warn!("dir: abandoned recording {} already deleted!", id); + } else { + warn!("dir: Unable to unlink abandoned recording {}: {}", id, e); + undeletable += 1; + } + } + } + if undeletable > 0 { + bail!("Unable to delete {} abandoned recordings.", undeletable); + } + + Ok((Syncer { + clocks: clock::RealClocks{}, + dir_id, + dir, + db, + }, d.path.clone())) + } + + /// Lists files which should be "abandoned" (deleted without ever recording in the database) + /// on opening. + fn list_files_to_abandon(path: &str, streams_to_next: FnvHashMap) + -> Result, Error> { + let mut v = Vec::new(); + for e in ::std::fs::read_dir(path)? { + let e = e?; + let id = match dir::parse_id(e.file_name().as_bytes()) { + Ok(i) => i, + Err(_) => continue, + }; + let next = match streams_to_next.get(&id.stream()) { + Some(n) => *n, + None => continue, // unknown stream. + }; + if id.recording() >= next { + v.push(id); + } + } + Ok(v) + } + + /// Rotates files for all streams and deletes stale files from previous runs. + /// Called from main thread. + fn initial_rotation(&mut self) -> Result<(), Error> { + self.do_rotation(|db| { + let streams: Vec = db.streams_by_id().keys().map(|&id| id).collect(); + for &stream_id in &streams { + delete_recordings(db, stream_id, 0)?; + } + Ok(()) + }) + } + + /// Helper to do initial or retention-lowering rotation. Called from main thread. + fn do_rotation(&mut self, delete_recordings: F) -> Result<(), Error> + where F: Fn(&mut db::LockedDatabase) -> Result<(), Error> { + { + let mut db = self.db.lock(); + delete_recordings(&mut *db)?; + db.flush("synchronous deletion")?; + } + let mut garbage: Vec<_> = { + let l = self.db.lock(); + let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap(); + d.garbage.iter().map(|id| *id).collect() + }; + if !garbage.is_empty() { + // Try to delete files; retain ones in `garbage` that don't exist. + let mut errors = 0; + for &id in &garbage { + if let Err(e) = self.dir.unlink_file(id) { + if e.kind() != io::ErrorKind::NotFound { + warn!("dir: Unable to unlink {}: {}", id, e); + errors += 1; + } + } + } + if errors > 0 { + bail!("Unable to unlink {} files (see earlier warning messages for details)", + errors); + } + self.dir.sync()?; + self.db.lock().delete_garbage(self.dir_id, &mut garbage)?; + self.db.lock().flush("synchronous garbage collection")?; + } + Ok(()) + } +} + +impl Syncer { + fn run(&mut self, cmds: mpsc::Receiver>) { + loop { + match cmds.recv() { + Err(_) => return, // all senders have closed the channel; shutdown + Ok(SyncerCommand::AsyncSaveRecording(id, f)) => self.save(id, f), + Ok(SyncerCommand::DatabaseFlushed) => self.collect_garbage(), + Ok(SyncerCommand::Flush(_)) => {}, // just drop the supplied sender, closing it. + }; + } + } + + /// Collects garbage (without forcing a sync). Called from worker thread. + fn collect_garbage(&mut self) { + let mut garbage: Vec<_> = { + let l = self.db.lock(); + let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap(); + d.garbage.iter().map(|id| *id).collect() + }; + if garbage.is_empty() { + return; + } + let c = &self.clocks; + for &id in &garbage { + c.retry_forever(&mut || { + if let Err(e) = self.dir.unlink_file(id) { + if e.kind() == io::ErrorKind::NotFound { + warn!("dir: recording {} already deleted!", id); + return Ok(()); + } + return Err(e); + } + Ok(()) + }); + } + c.retry_forever(&mut || self.dir.sync()); + c.retry_forever(&mut || self.db.lock().delete_garbage(self.dir_id, &mut garbage)); + } + + /// Saves the given recording and causes rotation to happen. Called from worker thread. + /// + /// Note that part of rotation is deferred for the next cycle (saved writing or program startup) + /// so that there can be only one dir sync and database transaction per save. + /// Internal helper for `save`. This is separated out so that the question-mark operator + /// can be used in the many error paths. + fn save(&mut self, id: CompositeId, f: D::File) { + let stream_id = id.stream(); + + // Free up a like number of bytes. + self.clocks.retry_forever(&mut || f.sync_all()); + self.clocks.retry_forever(&mut || self.dir.sync()); + let mut db = self.db.lock(); + db.mark_synced(id).unwrap(); + delete_recordings(&mut db, stream_id, 0).unwrap(); + let reason = { + let s = db.streams_by_id().get(&stream_id).unwrap(); + let c = db.cameras_by_id().get(&s.camera_id).unwrap(); + let unflushed = s.unflushed(); + if unflushed < s.flush_if { + debug!("{}-{}: unflushed={} < if={}, not flushing", + c.short_name, s.type_.as_str(), unflushed, s.flush_if); + return; + } + format!("{}-{}: unflushed={} >= if={}", + c.short_name, s.type_.as_str(), unflushed, s.flush_if) + }; + + if let Err(e) = db.flush(&reason) { + // Don't retry the commit now in case it causes extra flash write cycles. + // It's not necessary for correctness to flush before proceeding. + // Just wait until the next flush would happen naturally. + warn!("flush failure on save for reason {}; leaving unflushed for now: {:?}", + reason, e); + } + } +} + +/// Struct for writing a single run (of potentially several recordings) to disk and committing its +/// metadata to the database. `Writer` hands off each recording's state to the syncer when done. It +/// saves the recording to the database (if I/O errors do not prevent this), retries forever, +/// or panics (if further writing on this stream is impossible). +pub struct Writer<'a, C: Clocks, D: DirWriter> { + clocks: &'a C, + dir: &'a D, + db: &'a db::Database, + channel: &'a SyncerChannel, + stream_id: i32, + video_sample_entry_id: i32, + state: WriterState, +} + +enum WriterState { + Unopened, + Open(InnerWriter), + Closed(PreviousWriter), +} + +/// State for writing a single recording, used within `Writer`. +/// +/// Note that the recording created by every `InnerWriter` must be written to the `SyncerChannel` +/// with at least one sample. The sample may have zero duration. +struct InnerWriter { + f: F, + r: Arc>, + e: recording::SampleIndexEncoder, + id: CompositeId, + hasher: hash::Hasher, + + /// The start time of this segment, based solely on examining the local clock after frames in + /// this segment were received. Frames can suffer from various kinds of delay (initial + /// buffering, encoding, and network transmission), so this time is set to far in the future on + /// construction, given a real value on the first packet, and decreased as less-delayed packets + /// are discovered. See design/time.md for details. + local_start: recording::Time, + + adjuster: ClockAdjuster, + + /// A sample which has been written to disk but not added to `index`. Index writes are one + /// sample behind disk writes because the duration of a sample is the difference between its + /// pts and the next sample's pts. A sample is flushed when the next sample is written, when + /// the writer is closed cleanly (the caller supplies the next pts), or when the writer is + /// closed uncleanly (with a zero duration, which the `.mp4` format allows only at the end). + /// + /// Invariant: this should always be `Some` (briefly violated during `write` call only). + unflushed_sample: Option, +} + +/// Adjusts durations given by the camera to correct its clock frequency error. +#[derive(Copy, Clone, Debug)] +struct ClockAdjuster { + /// Every `every_minus_1 + 1` units, add `-ndir`. + /// Note i32::max_value() disables adjustment. + every_minus_1: i32, + + /// Should be 1 or -1 (unless disabled). + ndir: i32, + + /// Keeps accumulated difference from previous values. + cur: i32, +} + +impl ClockAdjuster { + fn new(local_time_delta: Option) -> Self { + // Pick an adjustment rate to correct local_time_delta over the next minute (the + // desired duration of a single recording). Cap the rate at 500 ppm (which corrects + // 2,700/90,000ths of a second over a minute) to prevent noticeably speeding up or slowing + // down playback. + let (every_minus_1, ndir) = match local_time_delta { + Some(d) if d <= -2700 => (1999, 1), + Some(d) if d >= 2700 => (1999, -1), + Some(d) if d < -60 => ((60 * 90000) / -(d as i32) - 1, 1), + Some(d) if d > 60 => ((60 * 90000) / (d as i32) - 1, -1), + _ => (i32::max_value(), 0), + }; + ClockAdjuster{ + every_minus_1, + ndir, + cur: 0, + } + } + + fn adjust(&mut self, mut val: i32) -> i32 { + self.cur += val; + + // The "val > self.ndir" here is so that if decreasing durations (ndir == 1), we don't + // cause a duration of 1 to become a duration of 0. It has no effect when increasing + // durations. (There's no danger of a duration of 0 becoming a duration of 1; cur wouldn't + // be newly > self.every_minus_1.) + while self.cur > self.every_minus_1 && val > self.ndir { + val -= self.ndir; + self.cur -= self.every_minus_1 + 1; + } + val + } +} + +#[derive(Copy, Clone)] +struct UnflushedSample { + local_time: recording::Time, + pts_90k: i64, + len: i32, + is_key: bool, +} + +/// State associated with a run's previous recording; used within `Writer`. +#[derive(Copy, Clone)] +struct PreviousWriter { + end: recording::Time, + local_time_delta: recording::Duration, + run_offset: i32, +} + +impl<'a, C: Clocks, D: DirWriter> Writer<'a, C, D> { + pub fn new(clocks: &'a C, dir: &'a D, db: &'a db::Database, channel: &'a SyncerChannel, + stream_id: i32, video_sample_entry_id: i32) -> Self { + Writer { + clocks, + dir, + db, + channel, + stream_id, + video_sample_entry_id, + state: WriterState::Unopened, + } + } + + /// Opens a new writer. + /// This returns a writer that violates the invariant that `unflushed_sample` is `Some`. + /// The caller (`write`) is responsible for correcting this. + fn open(&mut self) -> Result<&mut InnerWriter, Error> { + let prev = match self.state { + WriterState::Unopened => None, + WriterState::Open(ref mut w) => return Ok(w), + WriterState::Closed(prev) => Some(prev), + }; + let (id, r) = self.db.lock().add_recording(self.stream_id, db::RecordingToInsert { + run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0), + start: prev.map(|p| p.end).unwrap_or(recording::Time(i64::max_value())), + video_sample_entry_id: self.video_sample_entry_id, + flags: db::RecordingFlags::Growing as i32, + ..Default::default() + })?; + let f = self.clocks.retry_forever(&mut || self.dir.create_file(id)); + + self.state = WriterState::Open(InnerWriter { + f, + r, + e: recording::SampleIndexEncoder::new(), + id, + hasher: hash::Hasher::new(hash::MessageDigest::sha1())?, + local_start: recording::Time(i64::max_value()), + adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)), + unflushed_sample: None, + }); + match self.state { + WriterState::Open(ref mut w) => Ok(w), + _ => unreachable!(), + } + } + + pub fn previously_opened(&self) -> Result { + Ok(match self.state { + WriterState::Unopened => false, + WriterState::Closed(_) => true, + WriterState::Open(_) => bail!("open!"), + }) + } + + /// Writes a new frame to this segment. + /// `local_time` should be the local clock's time as of when this packet was received. + pub fn write(&mut self, pkt: &[u8], local_time: recording::Time, pts_90k: i64, + is_key: bool) -> Result<(), Error> { + let w = self.open()?; + + // Note w's invariant that `unflushed_sample` is `None` may currently be violated. + // We must restore it on all success or error paths. + + if let Some(unflushed) = w.unflushed_sample.take() { + let duration = (pts_90k - unflushed.pts_90k) as i32; + if duration <= 0 { + // Restore invariant. + w.unflushed_sample = Some(unflushed); + bail!("pts not monotonically increasing; got {} then {}", + unflushed.pts_90k, pts_90k); + } + let duration = w.adjuster.adjust(duration); + w.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time); + } + let mut remaining = pkt; + while !remaining.is_empty() { + let written = self.clocks.retry_forever(&mut || w.f.write(remaining)); + remaining = &remaining[written..]; + } + w.unflushed_sample = Some(UnflushedSample { + local_time, + pts_90k, + len: pkt.len() as i32, + is_key, + }); + w.hasher.update(pkt).unwrap(); + Ok(()) + } + + /// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's + /// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait, + /// swallowing errors and using a zero duration for the last sample. + pub fn close(&mut self, next_pts: Option) { + self.state = match mem::replace(&mut self.state, WriterState::Unopened) { + WriterState::Open(w) => { + let prev = w.close(self.channel, next_pts); + WriterState::Closed(prev) + }, + s => s, + }; + } +} + +impl InnerWriter { + fn add_sample(&mut self, duration_90k: i32, bytes: i32, is_key: bool, + pkt_local_time: recording::Time) { + let mut l = self.r.lock(); + self.e.add_sample(duration_90k, bytes, is_key, &mut l); + let new = pkt_local_time - recording::Duration(l.duration_90k as i64); + self.local_start = cmp::min(self.local_start, new); + if l.run_offset == 0 { // start time isn't anchored to previous recording's end; adjust. + l.start = self.local_start; + } + } + + fn close(mut self, channel: &SyncerChannel, next_pts: Option) -> PreviousWriter { + let unflushed = self.unflushed_sample.take().expect("should always be an unflushed sample"); + let (duration, flags) = match next_pts { + None => (self.adjuster.adjust(0), db::RecordingFlags::TrailingZero as i32), + Some(p) => (self.adjuster.adjust((p - unflushed.pts_90k) as i32), 0), + }; + let mut sha1_bytes = [0u8; 20]; + sha1_bytes.copy_from_slice(&self.hasher.finish().unwrap()[..]); + let (local_time_delta, run_offset, end); + self.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time); + { + let mut l = self.r.lock(); + l.flags = flags; + local_time_delta = self.local_start - l.start; + l.local_time_delta = local_time_delta; + l.sample_file_sha1 = sha1_bytes; + run_offset = l.run_offset; + end = l.start + recording::Duration(l.duration_90k as i64); + } + drop(self.r); + channel.async_save_recording(self.id, self.f); + PreviousWriter { + end, + local_time_delta, + run_offset, + } + } +} + +impl<'a, C: Clocks, D: DirWriter> Drop for Writer<'a, C, D> { + fn drop(&mut self) { + if ::std::thread::panicking() { + // This will probably panic again. Don't do it. + return; + } + if let WriterState::Open(w) = mem::replace(&mut self.state, WriterState::Unopened) { + // Swallow any error. The caller should only drop the Writer without calling close() + // if there's already been an error. The caller should report that. No point in + // complaining again. + let _ = w.close(self.channel, None); + } + } +} + +#[cfg(test)] +mod tests { + use base::clock::SimulatedClocks; + use db::{self, CompositeId}; + use parking_lot::Mutex; + use recording; + use std::collections::VecDeque; + use std::io; + use std::sync::Arc; + use std::sync::mpsc; + use super::{ClockAdjuster, Writer}; + use testutil; + + #[derive(Clone)] + struct MockDir(Arc>>); + + enum MockDirAction { + Create(CompositeId, Box Result + Send>), + Sync(Box Result<(), io::Error> + Send>), + Unlink(CompositeId, Box Result<(), io::Error> + Send>), + } + + impl MockDir { + fn new() -> Self { MockDir(Arc::new(Mutex::new(VecDeque::new()))) } + fn expect(&self, action: MockDirAction) { self.0.lock().push_back(action); } + fn ensure_done(&self) { assert_eq!(self.0.lock().len(), 0); } + } + + impl super::DirWriter for MockDir { + type File = MockFile; + + fn create_file(&self, id: CompositeId) -> Result { + match self.0.lock().pop_front().expect("got create_file with no expectation") { + MockDirAction::Create(expected_id, ref f) => { + assert_eq!(id, expected_id); + f(id) + }, + _ => panic!("got create_file({}), expected something else", id), + } + } + fn sync(&self) -> Result<(), io::Error> { + match self.0.lock().pop_front().expect("got sync with no expectation") { + MockDirAction::Sync(f) => f(), + _ => panic!("got sync, expected something else"), + } + } + fn unlink_file(&self, id: CompositeId) -> Result<(), io::Error> { + match self.0.lock().pop_front().expect("got unlink_file with no expectation") { + MockDirAction::Unlink(expected_id, f) => { + assert_eq!(id, expected_id); + f(id) + }, + _ => panic!("got unlink({}), expected something else", id), + } + } + } + + impl Drop for MockDir { + fn drop(&mut self) { + if !::std::thread::panicking() { + assert_eq!(self.0.lock().len(), 0); + } + } + } + + #[derive(Clone)] + struct MockFile(Arc>>); + + enum MockFileAction { + SyncAll(Box Result<(), io::Error> + Send>), + Write(Box Result + Send>), + } + + impl MockFile { + fn new() -> Self { MockFile(Arc::new(Mutex::new(VecDeque::new()))) } + fn expect(&self, action: MockFileAction) { self.0.lock().push_back(action); } + fn ensure_done(&self) { assert_eq!(self.0.lock().len(), 0); } + } + + impl super::FileWriter for MockFile { + fn sync_all(&self) -> Result<(), io::Error> { + match self.0.lock().pop_front().expect("got sync_all with no expectation") { + MockFileAction::SyncAll(f) => f(), + _ => panic!("got sync_all, expected something else"), + } + } + fn write(&mut self, buf: &[u8]) -> Result { + match self.0.lock().pop_front().expect("got write with no expectation") { + MockFileAction::Write(f) => f(buf), + _ => panic!("got write({:?}), expected something else", buf), + } + } + } + + struct Harness { + clocks: SimulatedClocks, + db: Arc, + dir_id: i32, + _tmpdir: ::tempdir::TempDir, + dir: MockDir, + channel: super::SyncerChannel, + join: ::std::thread::JoinHandle<()>, + } + + fn new_harness() -> Harness { + let tdb = testutil::TestDb::new(); + let dir_id = *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap(); + + // This starts a real fs-backed syncer. Get rid of it. + tdb.db.lock().clear_on_flush(); + drop(tdb.syncer_channel); + tdb.syncer_join.join().unwrap(); + + // Start a mocker syncer. + let clocks = SimulatedClocks::new(::time::Timespec::new(0, 0)); + let dir = MockDir::new(); + let mut syncer = super::Syncer { + clocks: clocks.clone(), + dir_id: *tdb.db.lock().sample_file_dirs_by_id().keys().next().unwrap(), + dir: dir.clone(), + db: tdb.db.clone(), + }; + let (snd, rcv) = mpsc::channel(); + tdb.db.lock().on_flush(Box::new({ + let snd = snd.clone(); + move || if let Err(e) = snd.send(super::SyncerCommand::DatabaseFlushed) { + warn!("Unable to notify syncer for dir {} of flush: {}", dir_id, e); + } + })); + let join = ::std::thread::Builder::new() + .name("mock-syncer".to_owned()) + .spawn(move || syncer.run(rcv)).unwrap(); + + Harness { + clocks, + dir_id, + dir, + db: tdb.db, + _tmpdir: tdb.tmpdir, + channel: super::SyncerChannel(snd), + join, + } + } + + fn eio() -> io::Error { io::Error::new(io::ErrorKind::Other, "got EIO") } + + #[test] + fn write_path_retries() { + testutil::init(); + let h = new_harness(); + let video_sample_entry_id = h.db.lock().insert_video_sample_entry( + 1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap(); + { + let mut w = Writer::new(&h.clocks, &h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, + video_sample_entry_id); + h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), Box::new(|_id| Err(eio())))); + let f = MockFile::new(); + h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), + Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); + f.expect(MockFileAction::Write(Box::new(|buf| { + assert_eq!(buf, b"1234"); + Err(eio()) + }))); + f.expect(MockFileAction::Write(Box::new(|buf| { + assert_eq!(buf, b"1234"); + Ok(1) + }))); + f.expect(MockFileAction::Write(Box::new(|buf| { + assert_eq!(buf, b"234"); + Err(eio()) + }))); + f.expect(MockFileAction::Write(Box::new(|buf| { + assert_eq!(buf, b"234"); + Ok(3) + }))); + f.expect(MockFileAction::SyncAll(Box::new(|| Err(eio())))); + f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); + w.write(b"1234", recording::Time(1), 0, true).unwrap(); + h.dir.expect(MockDirAction::Sync(Box::new(|| Err(eio())))); + h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); + drop(w); + h.channel.flush(); + f.ensure_done(); + h.dir.ensure_done(); + } + + { + let l = h.db.lock(); + let s = l.streams_by_id().get(&testutil::TEST_STREAM_ID).unwrap(); + assert_eq!(s.bytes_to_add, 0); + assert_eq!(s.sample_file_bytes, 4); + } + drop(h.channel); + h.db.lock().clear_on_flush(); + h.join.join().unwrap(); + } + + #[test] + fn gc_path_retries() { + testutil::init(); + let h = new_harness(); + h.db.lock().update_retention(&[db::RetentionChange { + stream_id: testutil::TEST_STREAM_ID, + new_record: true, + new_limit: 3, + }]).unwrap(); + + // Setup: add a 3-byte recording. + let video_sample_entry_id = h.db.lock().insert_video_sample_entry( + 1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap(); + { + let mut w = Writer::new(&h.clocks, &h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, + video_sample_entry_id); + let f = MockFile::new(); + h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), + Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); + f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) }))); + f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); + w.write(b"123", recording::Time(2), 0, true).unwrap(); + h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); + w.close(Some(1)); + h.channel.flush(); + f.ensure_done(); + h.dir.ensure_done(); + + // Then a 1-byte recording. + let f = MockFile::new(); + h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2), + Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); + f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) }))); + f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); + w.write(b"4", recording::Time(3), 1, true).unwrap(); + h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); + h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new({ + let db = h.db.clone(); + move |_| { + // The drop(w) below should cause the old recording to be deleted (moved to + // garbage). When the database is flushed, the syncer forces garbage collection + // including this unlink. + + // This should have already applied the changes to sample file bytes, even + // though the garbage has yet to be collected. + let l = db.lock(); + let s = l.streams_by_id().get(&testutil::TEST_STREAM_ID).unwrap(); + assert_eq!(s.bytes_to_delete, 0); + assert_eq!(s.bytes_to_add, 0); + assert_eq!(s.sample_file_bytes, 1); + Err(eio()) // force a retry. + } + }))); + h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new(|_| Ok(())))); + h.dir.expect(MockDirAction::Sync(Box::new(|| Err(eio())))); + h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); + drop(w); + h.channel.flush(); // wait until the Save... + h.channel.flush(); // ...and the DatabaseFlush are processed. + f.ensure_done(); + h.dir.ensure_done(); + } + + // Garbage should be marked collected on the next flush. + { + let mut l = h.db.lock(); + assert!(!l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage.is_empty()); + l.flush("forced gc").unwrap(); + assert!(l.sample_file_dirs_by_id().get(&h.dir_id).unwrap().garbage.is_empty()); + } + + // The syncer should shut down cleanly. + drop(h.channel); + h.db.lock().clear_on_flush(); + h.join.join().unwrap(); + } + + #[test] + fn adjust() { + testutil::init(); + + // no-ops. + for v in &[None, Some(0), Some(-10), Some(10)] { + let mut a = ClockAdjuster::new(*v); + for _ in 0..1800 { + assert_eq!(3000, a.adjust(3000), "v={:?}", *v); + } + } + + // typical, 100 ppm adjustment. + let mut a = ClockAdjuster::new(Some(-540)); + let mut total = 0; + for _ in 0..1800 { + let new = a.adjust(3000); + assert!(new == 2999 || new == 3000); + total += new; + } + let expected = 1800*3000 - 540; + assert!(total == expected || total == expected + 1, "total={} vs expected={}", + total, expected); + + a = ClockAdjuster::new(Some(540)); + let mut total = 0; + for _ in 0..1800 { + let new = a.adjust(3000); + assert!(new == 3000 || new == 3001); + total += new; + } + let expected = 1800*3000 + 540; + assert!(total == expected || total == expected + 1, "total={} vs expected={}", + total, expected); + + // capped at 500 ppm (change of 2,700/90,000ths over 1 minute). + a = ClockAdjuster::new(Some(-1_000_000)); + total = 0; + for _ in 0..1800 { + let new = a.adjust(3000); + assert!(new == 2998 || new == 2999, "new={}", new); + total += new; + } + let expected = 1800*3000 - 2700; + assert!(total == expected || total == expected + 1, "total={} vs expected={}", + total, expected); + + a = ClockAdjuster::new(Some(1_000_000)); + total = 0; + for _ in 0..1800 { + let new = a.adjust(3000); + assert!(new == 3001 || new == 3002, "new={}", new); + total += new; + } + let expected = 1800*3000 + 2700; + assert!(total == expected || total == expected + 1, "total={} vs expected={}", + total, expected); + } +} diff --git a/src/cmds/config/cameras.rs b/src/cmds/config/cameras.rs index 1346ab9..3b30f72 100644 --- a/src/cmds/config/cameras.rs +++ b/src/cmds/config/cameras.rs @@ -33,7 +33,7 @@ extern crate cursive; use self::cursive::Cursive; use self::cursive::traits::{Boxable, Identifiable, Finder}; use self::cursive::views; -use db::{self, dir}; +use db::{self, writer}; use failure::Error; use std::collections::BTreeMap; use std::str::FromStr; @@ -188,7 +188,7 @@ fn confirm_deletion(siv: &mut Cursive, db: &Arc, id: i32, to_delet None => continue, }; let l = zero_limits.entry(dir_id).or_insert_with(|| Vec::with_capacity(2)); - l.push(dir::NewLimit { + l.push(writer::NewLimit { stream_id, limit: 0, }); @@ -209,12 +209,12 @@ fn confirm_deletion(siv: &mut Cursive, db: &Arc, id: i32, to_delet } } -fn lower_retention(db: &Arc, zero_limits: BTreeMap>) +fn lower_retention(db: &Arc, zero_limits: BTreeMap>) -> Result<(), Error> { let dirs_to_open: Vec<_> = zero_limits.keys().map(|id| *id).collect(); db.lock().open_sample_file_dirs(&dirs_to_open[..])?; for (&dir_id, l) in &zero_limits { - dir::lower_retention(db.clone(), dir_id, &l)?; + writer::lower_retention(db.clone(), dir_id, &l)?; } Ok(()) } diff --git a/src/cmds/config/dirs.rs b/src/cmds/config/dirs.rs index 7a31280..373dc06 100644 --- a/src/cmds/config/dirs.rs +++ b/src/cmds/config/dirs.rs @@ -33,7 +33,7 @@ extern crate cursive; use self::cursive::Cursive; use self::cursive::traits::{Boxable, Identifiable}; use self::cursive::views; -use db::{self, dir}; +use db::{self, writer}; use failure::Error; use std::cell::RefCell; use std::collections::BTreeMap; @@ -142,7 +142,7 @@ fn actually_delete(model: &RefCell, siv: &mut Cursive) { let model = &*model.borrow(); let new_limits: Vec<_> = model.streams.iter() - .map(|(&id, s)| dir::NewLimit {stream_id: id, limit: s.retain.unwrap()}) + .map(|(&id, s)| writer::NewLimit {stream_id: id, limit: s.retain.unwrap()}) .collect(); siv.pop_layer(); // deletion confirmation siv.pop_layer(); // retention dialog @@ -150,7 +150,7 @@ fn actually_delete(model: &RefCell, siv: &mut Cursive) { let mut l = model.db.lock(); l.open_sample_file_dirs(&[model.dir_id]).unwrap(); // TODO: don't unwrap. } - if let Err(e) = dir::lower_retention(model.db.clone(), model.dir_id, &new_limits[..]) { + if let Err(e) = writer::lower_retention(model.db.clone(), model.dir_id, &new_limits[..]) { siv.add_layer(views::Dialog::text(format!("Unable to delete excess video: {}", e)) .title("Error") .dismiss_button("Abort")); diff --git a/src/cmds/run.rs b/src/cmds/run.rs index 73cee32..033e742 100644 --- a/src/cmds/run.rs +++ b/src/cmds/run.rs @@ -29,7 +29,7 @@ // along with this program. If not, see . use clock; -use db::{self, dir}; +use db::{self, dir, writer}; use failure::Error; use fnv::FnvHashMap; use futures::{Future, Stream}; @@ -90,7 +90,7 @@ fn resolve_zone() -> String { struct Syncer { dir: Arc, - channel: dir::SyncerChannel, + channel: writer::SyncerChannel<::std::fs::File>, join: thread::JoinHandle<()>, } @@ -122,7 +122,7 @@ pub fn run() -> Result<(), Error> { let streams = l.streams_by_id().len(); let env = streamer::Environment { db: &db, - clocks: &clock::REAL, + clocks: &clock::RealClocks{}, opener: &*stream::FFMPEG, shutdown: &shutdown_streamers, }; @@ -142,7 +142,7 @@ pub fn run() -> Result<(), Error> { drop(l); let mut syncers = FnvHashMap::with_capacity_and_hasher(dirs.len(), Default::default()); for (id, dir) in dirs.drain() { - let (channel, join) = dir::start_syncer(db.clone(), id)?; + let (channel, join) = writer::start_syncer(db.clone(), id)?; syncers.insert(id, Syncer { dir, channel, diff --git a/src/main.rs b/src/main.rs index 98f2bf5..88f0e6f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -46,6 +46,7 @@ extern crate reffers; extern crate rusqlite; extern crate memmap; extern crate mime; +extern crate moonfire_base as base; extern crate moonfire_db as db; extern crate moonfire_ffmpeg; extern crate mylog; @@ -62,7 +63,8 @@ extern crate tokio_signal; extern crate url; extern crate uuid; -mod clock; +use base::clock as clock; + mod cmds; mod h264; mod json; diff --git a/src/mp4.rs b/src/mp4.rs index 7a35f80..71845f9 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -1448,7 +1448,7 @@ impl FileInner { let f = self.dirs_by_stream_id .get(&s.s.id.stream()) .ok_or_else(|| format_err!("{}: stream not found", s.s.id))? - .open_sample_file(s.s.id)?; + .open_file(s.s.id)?; let start = s.s.sample_file_range().start + r.start; let mmap = Box::new(unsafe { memmap::MmapOptions::new() @@ -1520,6 +1520,7 @@ mod tests { use byteorder::{BigEndian, ByteOrder}; use db::recording::{self, TIME_UNITS_PER_SEC}; use db::testutil::{self, TestDb, TEST_STREAM_ID}; + use db::writer; use futures::Future; use futures::Stream as FuturesStream; use hyper::header; @@ -1755,8 +1756,9 @@ mod tests { extra_data.width, extra_data.height, extra_data.sample_entry, extra_data.rfc6381_codec).unwrap(); let dir = db.dirs_by_stream_id.get(&TEST_STREAM_ID).unwrap(); - let mut output = dir::Writer::new(dir, &db.db, &db.syncer_channel, TEST_STREAM_ID, - video_sample_entry_id); + let mut output = writer::Writer::new(&::base::clock::RealClocks{}, dir, &db.db, + &db.syncer_channel, TEST_STREAM_ID, + video_sample_entry_id); // end_pts is the pts of the end of the most recent frame (start + duration). // It's needed because dir::Writer calculates a packet's duration from its pts and the diff --git a/src/streamer.rs b/src/streamer.rs index 4166281..012b85e 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -29,7 +29,7 @@ // along with this program. If not, see . use clock::{Clocks, TimerGuard}; -use db::{Camera, Database, Stream, dir, recording}; +use db::{Camera, Database, Stream, dir, recording, writer}; use failure::Error; use h264; use std::result::Result; @@ -48,7 +48,7 @@ pub struct Environment<'a, 'b, C, S> where C: 'a + Clocks, S: 'a + stream::Strea pub shutdown: &'b Arc, } -pub struct Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { +pub struct Streamer<'a, C, S> where C: Clocks, S: 'a + stream::Stream { shutdown: Arc, // State below is only used by the thread in Run. @@ -56,7 +56,7 @@ pub struct Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { rotate_interval_sec: i64, db: Arc, dir: Arc, - syncer_channel: dir::SyncerChannel, + syncer_channel: writer::SyncerChannel<::std::fs::File>, clocks: &'a C, opener: &'a stream::Opener, stream_id: i32, @@ -67,7 +67,7 @@ pub struct Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { pub fn new<'b>(env: &Environment<'a, 'b, C, S>, dir: Arc, - syncer_channel: dir::SyncerChannel, + syncer_channel: writer::SyncerChannel<::std::fs::File>, stream_id: i32, c: &Camera, s: &Stream, rotate_offset_sec: i64, rotate_interval_sec: i64) -> Self { Streamer { @@ -121,8 +121,8 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { // Seconds since epoch at which to next rotate. let mut rotate: Option = None; let mut transformed = Vec::new(); - let mut w = dir::Writer::new(&self.dir, &self.db, &self.syncer_channel, self.stream_id, - video_sample_entry_id); + let mut w = writer::Writer::new(self.clocks, &self.dir, &self.db, &self.syncer_channel, + self.stream_id, video_sample_entry_id); while !self.shutdown.load(Ordering::SeqCst) { let pkt = { let _t = TimerGuard::new(self.clocks, || "getting next packet");