tests and fixes for Writer and Syncer

* separate these out into a new file, writer.rs, as dir.rs was getting
  unwieldy.
* extract traits for the parts of SampleFileDir and std::fs::File they needed;
  set up mock implementations.
* move clock.rs to a new base crate to be accessible from the db crate.
* add tests that exercise all the retry paths.
* bugfix: account for the new recording's bytes when calculating how much to
  delete.
* bugfix: when retrying an unlink failure in collect_garbage, we shouldn't
  warn about all the recordings no longer existing. Do this by retrying each
  step rather than the whole procedure again.
* avoid double-panic scenarios, which I hit while tweaking the mocks. These
  are quite annoying to debug as Rust doesn't print information about either
  panic. I ended up using lldb to get a backtrace. Better to be cautious about
  what we're doing when already panicking.
* give more context on raw::insert_recording errors, which I hit as well while
  tweaking the new tests.
This commit is contained in:
Scott Lamb
2018-03-04 12:24:24 -08:00
parent b78ffc3808
commit d6fa470713
18 changed files with 1182 additions and 732 deletions

View File

@@ -15,8 +15,9 @@ failure = "0.1.1"
fnv = "1.0"
lazy_static = "1.0"
libc = "0.2"
log = { version = "0.4", features = ["release_max_level_info"] }
log = "0.4"
lru-cache = "0.1"
moonfire-base = { path = "../base" }
mylog = { git = "https://github.com/scottlamb/mylog" }
openssl = "0.10"
parking_lot = { version = "0.5", features = [] }

View File

@@ -1697,6 +1697,9 @@ pub struct Database(
impl Drop for Database {
fn drop(&mut self) {
if ::std::thread::panicking() {
return; // don't flush while panicking.
}
if let Some(m) = self.0.take() {
if let Err(e) = m.into_inner().flush("drop") {
error!("Final database flush failed: {}", e);

689
db/dir.rs
View File

@@ -1,5 +1,5 @@
// This file is part of Moonfire NVR, a security camera digital video recorder.
// Copyright (C) 2016 Scott Lamb <slamb@slamb.org>
// This file is part of Moonfire NVR, a security camera network video recorder.
// Copyright (C) 2018 Scott Lamb <slamb@slamb.org>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -32,16 +32,11 @@
//!
//! This includes opening files for serving, rotating away old files, and saving new files.
use db::{self, CompositeId};
use db::CompositeId;
use failure::{Error, Fail};
use fnv::FnvHashMap;
use libc::{self, c_char};
use parking_lot::Mutex;
use protobuf::{self, Message};
use recording;
use openssl::hash;
use schema;
use std::cmp;
use std::ffi;
use std::fs;
use std::io::{self, Read, Write};
@@ -49,8 +44,6 @@ use std::mem;
use std::os::unix::ffi::OsStrExt;
use std::os::unix::io::FromRawFd;
use std::sync::Arc;
use std::sync::mpsc;
use std::thread;
/// A sample file directory. Typically one per physical disk drive.
///
@@ -257,11 +250,16 @@ impl SampleFileDir {
}
/// Opens the given sample file for reading.
pub fn open_sample_file(&self, composite_id: CompositeId) -> Result<fs::File, io::Error> {
pub fn open_file(&self, composite_id: CompositeId) -> Result<fs::File, io::Error> {
let p = SampleFileDir::get_rel_pathname(composite_id);
unsafe { self.fd.openat(p.as_ptr(), libc::O_RDONLY, 0) }
}
pub fn create_file(&self, composite_id: CompositeId) -> Result<fs::File, io::Error> {
let p = SampleFileDir::get_rel_pathname(composite_id);
unsafe { self.fd.openat(p.as_ptr(), libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, 0o600) }
}
pub(crate) fn write_meta(&self, meta: &schema::DirMeta) -> Result<(), Error> {
write_meta(&self.fd, meta)
}
@@ -278,9 +276,9 @@ impl SampleFileDir {
}
/// Unlinks the given sample file within this directory.
fn unlink(fd: &Fd, id: CompositeId) -> Result<(), io::Error> {
pub(crate) fn unlink_file(&self, id: CompositeId) -> Result<(), io::Error> {
let p = SampleFileDir::get_rel_pathname(id);
let res = unsafe { libc::unlinkat(fd.0, p.as_ptr(), 0) };
let res = unsafe { libc::unlinkat(self.fd.0, p.as_ptr(), 0) };
if res < 0 {
return Err(io::Error::last_os_error())
}
@@ -288,612 +286,11 @@ impl SampleFileDir {
}
/// Syncs the directory itself.
fn sync(&self) -> Result<(), io::Error> {
pub(crate) fn sync(&self) -> Result<(), io::Error> {
self.fd.sync()
}
}
/// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct.
enum SyncerCommand {
AsyncSaveRecording(CompositeId, fs::File),
DatabaseFlushed,
Flush(mpsc::SyncSender<()>),
}
/// A channel which can be used to send commands to the syncer.
/// Can be cloned to allow multiple threads to send commands.
#[derive(Clone)]
pub struct SyncerChannel(mpsc::Sender<SyncerCommand>);
/// State of the worker thread.
struct Syncer {
dir_id: i32,
dir: Arc<SampleFileDir>,
db: Arc<db::Database>,
}
/// Starts a syncer for the given sample file directory.
///
/// The lock must not be held on `db` when this is called.
///
/// There should be only one syncer per directory, or 0 if operating in read-only mode.
/// This function will perform the initial rotation synchronously, so that it is finished before
/// file writing starts. Afterward the syncing happens in a background thread.
///
/// Returns a `SyncerChannel` which can be used to send commands (and can be cloned freely) and
/// a `JoinHandle` for the syncer thread. Commands sent on the channel will be executed or retried
/// forever. (TODO: provide some manner of pushback during retry.) At program shutdown, all
/// `SyncerChannel` clones should be dropped and then the handle joined to allow all recordings to
/// be persisted.
///
/// Note that dropping all `SyncerChannel` clones currently includes calling
/// `LockedDatabase::clear_on_flush`, as this function installs a hook to watch database flushes.
/// TODO: add a join wrapper which arranges for the on flush hook to be removed automatically.
pub fn start_syncer(db: Arc<db::Database>, dir_id: i32)
-> Result<(SyncerChannel, thread::JoinHandle<()>), Error> {
let db2 = db.clone();
let (mut syncer, path) = Syncer::new(&db.lock(), db2, dir_id)?;
syncer.initial_rotation()?;
let (snd, rcv) = mpsc::channel();
db.lock().on_flush(Box::new({
let snd = snd.clone();
move || if let Err(e) = snd.send(SyncerCommand::DatabaseFlushed) {
warn!("Unable to notify syncer for dir {} of flush: {}", dir_id, e);
}
}));
Ok((SyncerChannel(snd),
thread::Builder::new()
.name(format!("sync-{}", path))
.spawn(move || syncer.run(rcv)).unwrap()))
}
pub struct NewLimit {
pub stream_id: i32,
pub limit: i64,
}
/// Deletes recordings if necessary to fit within the given new `retain_bytes` limit.
/// Note this doesn't change the limit in the database; it only deletes files.
/// Pass a limit of 0 to delete all recordings associated with a camera.
pub fn lower_retention(db: Arc<db::Database>, dir_id: i32, limits: &[NewLimit])
-> Result<(), Error> {
let db2 = db.clone();
let (mut syncer, _) = Syncer::new(&db.lock(), db2, dir_id)?;
syncer.do_rotation(|db| {
for l in limits {
let (bytes_before, extra);
{
let stream = db.streams_by_id().get(&l.stream_id)
.ok_or_else(|| format_err!("no such stream {}", l.stream_id))?;
bytes_before = stream.sample_file_bytes + stream.bytes_to_add -
stream.bytes_to_delete;
extra = stream.retain_bytes - l.limit;
}
if l.limit >= bytes_before { continue }
delete_recordings(db, l.stream_id, extra)?;
let stream = db.streams_by_id().get(&l.stream_id).unwrap();
info!("stream {}, deleting: {}->{}", l.stream_id, bytes_before,
stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete);
}
Ok(())
})
}
/// Deletes recordings to bring a stream's disk usage within bounds.
fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32,
extra_bytes_needed: i64) -> Result<(), Error> {
let bytes_needed = {
let stream = match db.streams_by_id().get(&stream_id) {
None => bail!("no stream {}", stream_id),
Some(s) => s,
};
stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete + extra_bytes_needed
- stream.retain_bytes
};
let mut bytes_to_delete = 0;
if bytes_needed <= 0 {
debug!("{}: have remaining quota of {}", stream_id, -bytes_needed);
return Ok(());
}
let mut n = 0;
db.delete_oldest_recordings(stream_id, &mut |row| {
n += 1;
if bytes_needed >= bytes_to_delete {
bytes_to_delete += row.sample_file_bytes as i64;
n += 1;
return true;
}
false
})?;
info!("{}: deleting {} bytes in {} recordings ({} bytes needed)",
stream_id, bytes_to_delete, n, bytes_needed);
Ok(())
}
impl SyncerChannel {
/// Asynchronously syncs the given writer, closes it, records it into the database, and
/// starts rotation.
fn async_save_recording(&self, id: CompositeId, f: fs::File) {
self.0.send(SyncerCommand::AsyncSaveRecording(id, f)).unwrap();
}
/// For testing: flushes the syncer, waiting for all currently-queued commands to complete.
pub fn flush(&self) {
let (snd, rcv) = mpsc::sync_channel(0);
self.0.send(SyncerCommand::Flush(snd)).unwrap();
rcv.recv().unwrap_err(); // syncer should just drop the channel, closing it.
}
}
impl Syncer {
fn new(l: &db::LockedDatabase, db: Arc<db::Database>, dir_id: i32)
-> Result<(Self, String), Error> {
let d = l.sample_file_dirs_by_id()
.get(&dir_id)
.ok_or_else(|| format_err!("no dir {}", dir_id))?;
let dir = d.get()?;
// Abandon files.
// First, get a list of the streams in question.
let streams_to_next: FnvHashMap<_, _> =
l.streams_by_id()
.iter()
.filter_map(|(&k, v)| {
if v.sample_file_dir_id == Some(dir_id) {
Some((k, v.next_recording_id))
} else {
None
}
})
.collect();
let to_abandon = Syncer::list_files_to_abandon(&d.path, streams_to_next)?;
let mut undeletable = 0;
for &id in &to_abandon {
if let Err(e) = SampleFileDir::unlink(&dir.fd, id) {
if e.kind() == io::ErrorKind::NotFound {
warn!("dir: abandoned recording {} already deleted!", id);
} else {
warn!("dir: Unable to unlink abandoned recording {}: {}", id, e);
undeletable += 1;
}
}
}
if undeletable > 0 {
bail!("Unable to delete {} abandoned recordings.", undeletable);
}
Ok((Syncer {
dir_id,
dir,
db,
}, d.path.clone()))
}
/// Lists files which should be "abandoned" (deleted without ever recording in the database)
/// on opening.
fn list_files_to_abandon(path: &str, streams_to_next: FnvHashMap<i32, i32>)
-> Result<Vec<CompositeId>, Error> {
let mut v = Vec::new();
for e in ::std::fs::read_dir(path)? {
let e = e?;
let id = match parse_id(e.file_name().as_bytes()) {
Ok(i) => i,
Err(_) => continue,
};
let next = match streams_to_next.get(&id.stream()) {
Some(n) => *n,
None => continue, // unknown stream.
};
if id.recording() >= next {
v.push(id);
}
}
Ok(v)
}
fn run(&mut self, cmds: mpsc::Receiver<SyncerCommand>) {
loop {
match cmds.recv() {
Err(_) => return, // all senders have closed the channel; shutdown
Ok(SyncerCommand::AsyncSaveRecording(id, f)) => self.save(id, f),
Ok(SyncerCommand::DatabaseFlushed) => {
retry_forever(&mut || self.collect_garbage(true))
},
Ok(SyncerCommand::Flush(_)) => {}, // just drop the supplied sender, closing it.
};
}
}
/// Rotates files for all streams and deletes stale files from previous runs.
/// Called from main thread.
fn initial_rotation(&mut self) -> Result<(), Error> {
self.do_rotation(|db| {
let streams: Vec<i32> = db.streams_by_id().keys().map(|&id| id).collect();
for &stream_id in &streams {
delete_recordings(db, stream_id, 0)?;
}
Ok(())
})
}
/// Helper to do initial or retention-lowering rotation. Called from main thread.
fn do_rotation<F>(&mut self, delete_recordings: F) -> Result<(), Error>
where F: FnOnce(&mut db::LockedDatabase) -> Result<(), Error> {
{
let mut db = self.db.lock();
delete_recordings(&mut *db)?;
db.flush("synchronous deletion")?;
}
self.collect_garbage(false)?;
self.db.lock().flush("synchronous garbage collection")
}
/// Helper for collecting garbage; called from main or worker threads.
fn collect_garbage(&mut self, warn_on_missing: bool) -> Result<(), Error> {
let mut garbage: Vec<_> = {
let l = self.db.lock();
let d = l.sample_file_dirs_by_id().get(&self.dir_id).unwrap();
d.garbage.iter().map(|id| *id).collect()
};
let len_before = garbage.len();
garbage.retain(|&id| {
if let Err(e) = SampleFileDir::unlink(&self.dir.fd, id) {
if e.kind() == io::ErrorKind::NotFound {
if warn_on_missing {
warn!("dir: recording {} already deleted!", id);
}
} else {
warn!("dir: Unable to unlink {}: {}", id, e);
return false;
}
}
true
});
let res = if len_before > garbage.len() {
Err(format_err!("Unable to unlink {} files (see earlier warning messages for details)",
len_before - garbage.len()))
} else {
Ok(())
};
if garbage.is_empty() {
// No progress.
return res;
}
if let Err(e) = self.dir.sync() {
error!("unable to sync dir: {}", e);
return res.and(Err(e.into()));
}
if let Err(e) = self.db.lock().delete_garbage(self.dir_id, &mut garbage) {
error!("unable to delete garbage ({} files) for dir {}: {}",
self.dir_id, garbage.len(), e);
return res.and(Err(e.into()));
}
res
}
/// Saves the given recording and causes rotation to happen. Called from worker thread.
///
/// Note that part of rotation is deferred for the next cycle (saved writing or program startup)
/// so that there can be only one dir sync and database transaction per save.
/// Internal helper for `save`. This is separated out so that the question-mark operator
/// can be used in the many error paths.
fn save(&mut self, id: CompositeId, f: fs::File) {
let stream_id = id.stream();
// Free up a like number of bytes.
retry_forever(&mut || delete_recordings(&mut self.db.lock(), stream_id, 0));
retry_forever(&mut || f.sync_all());
retry_forever(&mut || self.dir.sync());
let mut db = self.db.lock();
db.mark_synced(id).unwrap();
let reason = {
let s = db.streams_by_id().get(&stream_id).unwrap();
let c = db.cameras_by_id().get(&s.camera_id).unwrap();
let unflushed = s.unflushed();
if unflushed < s.flush_if {
debug!("{}-{}: unflushed={} < if={}, not flushing",
c.short_name, s.type_.as_str(), unflushed, s.flush_if);
return;
}
format!("{}-{}: unflushed={} >= if={}",
c.short_name, s.type_.as_str(), unflushed, s.flush_if)
};
if let Err(e) = db.flush(&reason) {
// Don't retry the commit now in case it causes extra flash write cycles.
// It's not necessary for correctness to flush before proceeding.
// Just wait until the next flush would happen naturally.
warn!("flush failure on save for reason {}; leaving unflushed for now: {:?}",
reason, e);
}
}
}
fn retry_forever<T, E: Into<Error>>(f: &mut FnMut() -> Result<T, E>) -> T {
let sleep_time = ::std::time::Duration::new(1, 0);
loop {
let e = match f() {
Ok(t) => return t,
Err(e) => e.into(),
};
warn!("sleeping for {:?} after error: {:?}", sleep_time, e);
thread::sleep(sleep_time);
}
}
/// Struct for writing a single run (of potentially several recordings) to disk and committing its
/// metadata to the database. `Writer` hands off each recording's state to the syncer when done. It
/// saves the recording to the database (if I/O errors do not prevent this), retries forever,
/// or panics (if further writing on this stream is impossible).
pub struct Writer<'a> {
dir: &'a SampleFileDir,
db: &'a db::Database,
channel: &'a SyncerChannel,
stream_id: i32,
video_sample_entry_id: i32,
state: WriterState,
}
enum WriterState {
Unopened,
Open(InnerWriter),
Closed(PreviousWriter),
}
/// State for writing a single recording, used within `Writer`.
///
/// Note that the recording created by every `InnerWriter` must be written to the `SyncerChannel`
/// with at least one sample. The sample may have zero duration.
struct InnerWriter {
f: fs::File,
r: Arc<Mutex<db::RecordingToInsert>>,
e: recording::SampleIndexEncoder,
id: CompositeId,
hasher: hash::Hasher,
/// The start time of this segment, based solely on examining the local clock after frames in
/// this segment were received. Frames can suffer from various kinds of delay (initial
/// buffering, encoding, and network transmission), so this time is set to far in the future on
/// construction, given a real value on the first packet, and decreased as less-delayed packets
/// are discovered. See design/time.md for details.
local_start: recording::Time,
adjuster: ClockAdjuster,
/// A sample which has been written to disk but not added to `index`. Index writes are one
/// sample behind disk writes because the duration of a sample is the difference between its
/// pts and the next sample's pts. A sample is flushed when the next sample is written, when
/// the writer is closed cleanly (the caller supplies the next pts), or when the writer is
/// closed uncleanly (with a zero duration, which the `.mp4` format allows only at the end).
///
/// Invariant: this should always be `Some` (briefly violated during `write` call only).
unflushed_sample: Option<UnflushedSample>,
}
/// Adjusts durations given by the camera to correct its clock frequency error.
#[derive(Copy, Clone, Debug)]
struct ClockAdjuster {
/// Every `every_minus_1 + 1` units, add `-ndir`.
/// Note i32::max_value() disables adjustment.
every_minus_1: i32,
/// Should be 1 or -1 (unless disabled).
ndir: i32,
/// Keeps accumulated difference from previous values.
cur: i32,
}
impl ClockAdjuster {
fn new(local_time_delta: Option<i64>) -> Self {
// Pick an adjustment rate to correct local_time_delta over the next minute (the
// desired duration of a single recording). Cap the rate at 500 ppm (which corrects
// 2,700/90,000ths of a second over a minute) to prevent noticeably speeding up or slowing
// down playback.
let (every_minus_1, ndir) = match local_time_delta {
Some(d) if d <= -2700 => (1999, 1),
Some(d) if d >= 2700 => (1999, -1),
Some(d) if d < -60 => ((60 * 90000) / -(d as i32) - 1, 1),
Some(d) if d > 60 => ((60 * 90000) / (d as i32) - 1, -1),
_ => (i32::max_value(), 0),
};
ClockAdjuster{
every_minus_1,
ndir,
cur: 0,
}
}
fn adjust(&mut self, mut val: i32) -> i32 {
self.cur += val;
// The "val > self.ndir" here is so that if decreasing durations (ndir == 1), we don't
// cause a duration of 1 to become a duration of 0. It has no effect when increasing
// durations. (There's no danger of a duration of 0 becoming a duration of 1; cur wouldn't
// be newly > self.every_minus_1.)
while self.cur > self.every_minus_1 && val > self.ndir {
val -= self.ndir;
self.cur -= self.every_minus_1 + 1;
}
val
}
}
#[derive(Copy, Clone)]
struct UnflushedSample {
local_time: recording::Time,
pts_90k: i64,
len: i32,
is_key: bool,
}
/// State associated with a run's previous recording; used within `Writer`.
#[derive(Copy, Clone)]
struct PreviousWriter {
end: recording::Time,
local_time_delta: recording::Duration,
run_offset: i32,
}
impl<'a> Writer<'a> {
pub fn new(dir: &'a SampleFileDir, db: &'a db::Database, channel: &'a SyncerChannel,
stream_id: i32, video_sample_entry_id: i32) -> Self {
Writer {
dir,
db,
channel,
stream_id,
video_sample_entry_id,
state: WriterState::Unopened,
}
}
/// Opens a new writer.
/// This returns a writer that violates the invariant that `unflushed_sample` is `Some`.
/// The caller (`write`) is responsible for correcting this.
fn open(&mut self) -> Result<&mut InnerWriter, Error> {
let prev = match self.state {
WriterState::Unopened => None,
WriterState::Open(ref mut w) => return Ok(w),
WriterState::Closed(prev) => Some(prev),
};
let (id, r) = self.db.lock().add_recording(self.stream_id, db::RecordingToInsert {
run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0),
start: prev.map(|p| p.end).unwrap_or(recording::Time(i64::max_value())),
video_sample_entry_id: self.video_sample_entry_id,
flags: db::RecordingFlags::Growing as i32,
..Default::default()
})?;
let p = SampleFileDir::get_rel_pathname(id);
let f = retry_forever(&mut || unsafe {
self.dir.fd.openat(p.as_ptr(), libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, 0o600)
});
self.state = WriterState::Open(InnerWriter {
f,
r,
e: recording::SampleIndexEncoder::new(),
id,
hasher: hash::Hasher::new(hash::MessageDigest::sha1())?,
local_start: recording::Time(i64::max_value()),
adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)),
unflushed_sample: None,
});
match self.state {
WriterState::Open(ref mut w) => Ok(w),
_ => unreachable!(),
}
}
pub fn previously_opened(&self) -> Result<bool, Error> {
Ok(match self.state {
WriterState::Unopened => false,
WriterState::Closed(_) => true,
WriterState::Open(_) => bail!("open!"),
})
}
/// Writes a new frame to this segment.
/// `local_time` should be the local clock's time as of when this packet was received.
pub fn write(&mut self, pkt: &[u8], local_time: recording::Time, pts_90k: i64,
is_key: bool) -> Result<(), Error> {
let w = self.open()?;
// Note w's invariant that `unflushed_sample` is `None` may currently be violated.
// We must restore it on all success or error paths.
if let Some(unflushed) = w.unflushed_sample.take() {
let duration = (pts_90k - unflushed.pts_90k) as i32;
if duration <= 0 {
// Restore invariant.
w.unflushed_sample = Some(unflushed);
bail!("pts not monotonically increasing; got {} then {}",
unflushed.pts_90k, pts_90k);
}
let duration = w.adjuster.adjust(duration);
w.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time);
}
let mut remaining = pkt;
while !remaining.is_empty() {
let written = retry_forever(&mut || w.f.write(remaining));
remaining = &remaining[written..];
}
w.unflushed_sample = Some(UnflushedSample {
local_time,
pts_90k,
len: pkt.len() as i32,
is_key,
});
w.hasher.update(pkt).unwrap();
Ok(())
}
/// Cleanly closes the writer, using a supplied pts of the next sample for the last sample's
/// duration (if known). If `close` is not called, the `Drop` trait impl will close the trait,
/// swallowing errors and using a zero duration for the last sample.
pub fn close(&mut self, next_pts: Option<i64>) {
self.state = match mem::replace(&mut self.state, WriterState::Unopened) {
WriterState::Open(w) => {
let prev = w.close(self.channel, next_pts);
WriterState::Closed(prev)
},
s => s,
};
}
}
impl InnerWriter {
fn add_sample(&mut self, duration_90k: i32, bytes: i32, is_key: bool,
pkt_local_time: recording::Time) {
let mut l = self.r.lock();
self.e.add_sample(duration_90k, bytes, is_key, &mut l);
let new = pkt_local_time - recording::Duration(l.duration_90k as i64);
self.local_start = cmp::min(self.local_start, new);
if l.run_offset == 0 { // start time isn't anchored to previous recording's end; adjust.
l.start = self.local_start;
}
}
fn close(mut self, channel: &SyncerChannel, next_pts: Option<i64>) -> PreviousWriter {
let unflushed = self.unflushed_sample.take().expect("should always be an unflushed sample");
let (duration, flags) = match next_pts {
None => (self.adjuster.adjust(0), db::RecordingFlags::TrailingZero as i32),
Some(p) => (self.adjuster.adjust((p - unflushed.pts_90k) as i32), 0),
};
let mut sha1_bytes = [0u8; 20];
sha1_bytes.copy_from_slice(&self.hasher.finish().unwrap()[..]);
let (local_time_delta, run_offset, end);
self.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time);
{
let mut l = self.r.lock();
l.flags = flags;
local_time_delta = self.local_start - l.start;
l.local_time_delta = local_time_delta;
l.sample_file_sha1 = sha1_bytes;
run_offset = l.run_offset;
end = l.start + recording::Duration(l.duration_90k as i64);
}
drop(self.r);
channel.async_save_recording(self.id, self.f);
PreviousWriter {
end,
local_time_delta,
run_offset,
}
}
}
impl<'a> Drop for Writer<'a> {
fn drop(&mut self) {
if let WriterState::Open(w) = mem::replace(&mut self.state, WriterState::Unopened) {
// Swallow any error. The caller should only drop the Writer without calling close()
// if there's already been an error. The caller should report that. No point in
// complaining again.
let _ = w.close(self.channel, None);
}
}
}
/// Parse a composite id filename.
///
/// These are exactly 16 bytes, lowercase hex.
@@ -914,68 +311,6 @@ pub(crate) fn parse_id(id: &[u8]) -> Result<CompositeId, ()> {
#[cfg(test)]
mod tests {
use super::ClockAdjuster;
use testutil;
#[test]
fn adjust() {
testutil::init();
// no-ops.
for v in &[None, Some(0), Some(-10), Some(10)] {
let mut a = ClockAdjuster::new(*v);
for _ in 0..1800 {
assert_eq!(3000, a.adjust(3000), "v={:?}", *v);
}
}
// typical, 100 ppm adjustment.
let mut a = ClockAdjuster::new(Some(-540));
let mut total = 0;
for _ in 0..1800 {
let new = a.adjust(3000);
assert!(new == 2999 || new == 3000);
total += new;
}
let expected = 1800*3000 - 540;
assert!(total == expected || total == expected + 1, "total={} vs expected={}",
total, expected);
a = ClockAdjuster::new(Some(540));
let mut total = 0;
for _ in 0..1800 {
let new = a.adjust(3000);
assert!(new == 3000 || new == 3001);
total += new;
}
let expected = 1800*3000 + 540;
assert!(total == expected || total == expected + 1, "total={} vs expected={}",
total, expected);
// capped at 500 ppm (change of 2,700/90,000ths over 1 minute).
a = ClockAdjuster::new(Some(-1_000_000));
total = 0;
for _ in 0..1800 {
let new = a.adjust(3000);
assert!(new == 2998 || new == 2999, "new={}", new);
total += new;
}
let expected = 1800*3000 - 2700;
assert!(total == expected || total == expected + 1, "total={} vs expected={}",
total, expected);
a = ClockAdjuster::new(Some(1_000_000));
total = 0;
for _ in 0..1800 {
let new = a.adjust(3000);
assert!(new == 3001 || new == 3002, "new={}", new);
total += new;
}
let expected = 1800*3000 + 2700;
assert!(total == expected || total == expected + 1, "total={} vs expected={}",
total, expected);
}
#[test]
fn parse_id() {
use super::parse_id;

View File

@@ -36,12 +36,14 @@ extern crate fnv;
extern crate libc;
#[macro_use] extern crate log;
extern crate lru_cache;
extern crate moonfire_base as base;
extern crate mylog;
extern crate openssl;
extern crate parking_lot;
extern crate protobuf;
extern crate regex;
extern crate rusqlite;
extern crate tempdir;
extern crate time;
extern crate uuid;
@@ -53,6 +55,7 @@ mod raw;
pub mod recording;
mod schema;
pub mod upgrade;
pub mod writer;
// This is only for #[cfg(test)], but it's also used by the dependent crate, and it appears that
// #[cfg(test)] is not passed on to dependencies.

View File

@@ -31,7 +31,7 @@
//! Raw database access: SQLite statements which do not touch any cached state.
use db::{self, CompositeId, FromSqlUuid};
use failure::Error;
use failure::{Error, ResultExt};
use fnv::FnvHashSet;
use recording;
use rusqlite;
@@ -191,7 +191,8 @@ pub(crate) fn get_db_uuid(conn: &rusqlite::Connection) -> Result<Uuid, Error> {
/// Inserts the specified recording (for from `try_flush` only).
pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: CompositeId,
r: &db::RecordingToInsert) -> Result<(), Error> {
let mut stmt = tx.prepare_cached(INSERT_RECORDING_SQL)?;
let mut stmt = tx.prepare_cached(INSERT_RECORDING_SQL)
.with_context(|e| format!("can't prepare recording insert: {}", e))?;
stmt.execute_named(&[
(":composite_id", &id.0),
(":stream_id", &(id.stream() as i64)),
@@ -205,15 +206,16 @@ pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: Com
(":video_samples", &r.video_samples),
(":video_sync_samples", &r.video_sync_samples),
(":video_sample_entry_id", &r.video_sample_entry_id),
])?;
]).with_context(|e| format!("unable to insert recording for {:#?}: {}", r, e))?;
let mut stmt = tx.prepare_cached(INSERT_RECORDING_PLAYBACK_SQL)?;
let mut stmt = tx.prepare_cached(INSERT_RECORDING_PLAYBACK_SQL)
.with_context(|e| format!("can't prepare recording_playback insert: {}", e))?;
let sha1 = &r.sample_file_sha1[..];
stmt.execute_named(&[
(":composite_id", &id.0),
(":sample_file_sha1", &sha1),
(":video_index", &r.video_index),
])?;
]).with_context(|e| format!("unable to insert recording_playback for {:#?}: {}", r, e))?;
Ok(())
}

View File

@@ -28,8 +28,6 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
extern crate tempdir;
use db;
use dir;
use fnv::FnvHashMap;
@@ -38,8 +36,10 @@ use rusqlite;
use std::env;
use std::sync::{self, Arc};
use std::thread;
use tempdir::TempDir;
use time;
use uuid::Uuid;
use writer;
static INIT: sync::Once = sync::ONCE_INIT;
@@ -66,16 +66,16 @@ pub fn init() {
pub struct TestDb {
pub db: Arc<db::Database>,
pub dirs_by_stream_id: Arc<FnvHashMap<i32, Arc<dir::SampleFileDir>>>,
pub syncer_channel: dir::SyncerChannel,
pub syncer_channel: writer::SyncerChannel<::std::fs::File>,
pub syncer_join: thread::JoinHandle<()>,
pub tmpdir: tempdir::TempDir,
pub tmpdir: TempDir,
pub test_camera_uuid: Uuid,
}
impl TestDb {
/// Creates a test database with one camera.
pub fn new() -> TestDb {
let tmpdir = tempdir::TempDir::new("moonfire-nvr-test").unwrap();
let tmpdir = TempDir::new("moonfire-nvr-test").unwrap();
let mut conn = rusqlite::Connection::open_in_memory().unwrap();
db::Database::init(&mut conn).unwrap();
@@ -113,7 +113,7 @@ impl TestDb {
let mut dirs_by_stream_id = FnvHashMap::default();
dirs_by_stream_id.insert(TEST_STREAM_ID, dir.clone());
let (syncer_channel, syncer_join) =
dir::start_syncer(db.clone(), sample_file_dir_id).unwrap();
writer::start_syncer(db.clone(), sample_file_dir_id).unwrap();
TestDb {
db,
dirs_by_stream_id: Arc::new(dirs_by_stream_id),

1022
db/writer.rs Normal file

File diff suppressed because it is too large Load Diff