knob to reduce db commits (SSD write cycles)

This improves the practicality of having many streams (including the doubling
of streams by having main + sub streams for each camera). With these tuned
properly, extra streams don't cause any extra write cycles in normal or error
cases. Consider the worst case in which each RTSP session immediately sends a
single frame and then fails. Moonfire retries every second, so this would
formerly cause one commit per second per stream. (flush_if_sec=0 preserves
this behavior.) Now the commits can be arbitrarily infrequent by setting
higher values of flush_if_sec.

WARNING: this isn't production-ready! I hacked up dir.rs to make tests pass
and "moonfire-nvr run" work in the best-case scenario, but it doesn't handle
errors gracefully. I've been debating what to do when writing a recording
fails. I considered "abandoning" the recording then either reusing or skipping
its id. (in the latter case, marking the file as garbage if it can't be
unlinked immediately). I think now there's no point in abandoning a recording.
If I can't write to that file, there's no reason to believe another will work
better. It's better to retry that recording forever, and perhaps put the whole
directory into an error state that stops recording until those writes go
through. I'm planning to redesign dir.rs to make this happen.
This commit is contained in:
Scott Lamb
2018-02-22 16:35:34 -08:00
parent 31adbc1e9f
commit b037c9bdd7
15 changed files with 822 additions and 694 deletions

301
db/dir.rs
View File

@@ -36,6 +36,7 @@ use db::{self, CompositeId};
use failure::{Error, Fail};
use fnv::FnvHashMap;
use libc::{self, c_char};
use parking_lot::Mutex;
use protobuf::{self, Message};
use recording;
use openssl::hash;
@@ -47,7 +48,7 @@ use std::io::{self, Read, Write};
use std::mem;
use std::os::unix::ffi::OsStrExt;
use std::os::unix::io::FromRawFd;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::sync::mpsc;
use std::thread;
@@ -62,9 +63,6 @@ pub struct SampleFileDir {
/// The open file descriptor for the directory. The worker uses it to create files and sync the
/// directory. Other threads use it to open sample files for reading during video serving.
fd: Fd,
// Lock order: don't acquire mutable.lock() while holding db.lock().
mutable: Mutex<SharedMutableState>,
}
/// A file descriptor associated with a directory (not necessarily the sample file dir).
@@ -199,9 +197,6 @@ impl SampleFileDir {
.map_err(|e| format_err!("unable to open sample file dir {}: {}", path, e))?;
Ok(Arc::new(SampleFileDir {
fd,
mutable: Mutex::new(SharedMutableState{
next_id_by_stream: FnvHashMap::default(),
}),
}))
}
@@ -258,40 +253,11 @@ impl SampleFileDir {
prev: Option<PreviousWriter>, stream_id: i32,
video_sample_entry_id: i32)
-> Result<Writer<'a>, Error> {
// Grab the next id. The dir itself will typically have an id (possibly one ahead of what's
// stored in the database), but not on the first attempt for a stream.
use std::collections::hash_map::Entry;
let recording_id;
match self.mutable.lock().unwrap().next_id_by_stream.entry(stream_id) {
Entry::Occupied(mut e) => {
let v = e.get_mut();
recording_id = *v;
*v += 1;
},
Entry::Vacant(e) => {
let mut l = db.lock();
recording_id = l.streams_by_id().get(&stream_id).unwrap().next_recording_id;
e.insert(recording_id + 1);
},
};
let id = CompositeId::new(stream_id, recording_id);
let (id, r) = db.lock().add_recording(stream_id)?;
let p = SampleFileDir::get_rel_pathname(id);
let f = match unsafe { self.fd.openat(p.as_ptr(),
libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT,
0o600) } {
Ok(f) => f,
Err(e) => {
// Put the id back to try again later.
let mut l = self.mutable.lock().unwrap();
let v = l.next_id_by_stream.get_mut(&stream_id).unwrap();
assert_eq!(*v, recording_id + 1);
*v -= 1;
return Err(e.into());
},
};
Writer::open(f, id, prev, video_sample_entry_id, channel)
let f = unsafe { self.fd.openat(p.as_ptr(), libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT,
0o600) }.unwrap(); // TODO: don't unwrap!
Writer::open(f, id, r, prev, video_sample_entry_id, channel)
}
pub fn statfs(&self) -> Result<libc::statvfs, io::Error> { self.fd.statfs() }
@@ -325,16 +291,11 @@ impl SampleFileDir {
}
}
/// State shared between users of the `SampleFileDirectory` struct and the syncer.
#[derive(Debug)]
struct SharedMutableState {
next_id_by_stream: FnvHashMap<i32, i32>,
}
/// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct.
enum SyncerCommand {
AsyncSaveRecording(db::RecordingToInsert, fs::File),
AsyncAbandonRecording(CompositeId),
AsyncSaveRecording(CompositeId, Arc<Mutex<db::UncommittedRecording>>, fs::File),
//AsyncAbandonRecording(CompositeId),
DatabaseFlushed,
Flush(mpsc::SyncSender<()>),
}
@@ -345,20 +306,9 @@ pub struct SyncerChannel(mpsc::Sender<SyncerCommand>);
/// State of the worker thread.
struct Syncer {
dir_id: i32,
dir: Arc<SampleFileDir>,
db: Arc<db::Database>,
/// Files to be unlinked then immediately forgotten about. They are `>= next_recording_id` for
/// their stream, `next_recording_id` won't be advanced without a sync of the directory, and
/// extraneous files `>= next_recording_id` are unlinked on startup, so this should be
/// sufficient.
to_abandon: Vec<CompositeId>,
/// Files to be unlinked then removed from the garbage table.
to_unlink: Vec<CompositeId>,
/// Files to be removed from the garbage table.
to_mark_deleted: Vec<CompositeId>,
}
/// Starts a syncer for the given sample file directory.
@@ -371,13 +321,23 @@ struct Syncer {
///
/// Returns a `SyncerChannel` which can be used to send commands (and can be cloned freely) and
/// a `JoinHandle` for the syncer thread. At program shutdown, all `SyncerChannel` clones should be
/// removed and then the handle joined to allow all recordings to be persisted.
/// dropped and then the handle joined to allow all recordings to be persisted.
///
/// Note that dropping all `SyncerChannel` clones currently includes calling
/// `LockedDatabase::clear_on_flush`, as this function installs a hook to watch database flushes.
/// TODO: add a join wrapper which arranges for the on flush hook to be removed automatically.
pub fn start_syncer(db: Arc<db::Database>, dir_id: i32)
-> Result<(SyncerChannel, thread::JoinHandle<()>), Error> {
let db2 = db.clone();
let (mut syncer, path) = Syncer::new(&db.lock(), db2, dir_id)?;
syncer.initial_rotation()?;
let (snd, rcv) = mpsc::channel();
db.lock().on_flush(Box::new({
let snd = snd.clone();
move || if let Err(e) = snd.send(SyncerCommand::DatabaseFlushed) {
warn!("Unable to notify syncer for dir {} of flush: {}", dir_id, e);
}
}));
Ok((SyncerChannel(snd),
thread::Builder::new()
.name(format!("sync-{}", path))
@@ -440,13 +400,14 @@ fn get_rows_to_delete(db: &db::LockedDatabase, stream_id: i32,
impl SyncerChannel {
/// Asynchronously syncs the given writer, closes it, records it into the database, and
/// starts rotation.
fn async_save_recording(&self, recording: db::RecordingToInsert, f: fs::File) {
self.0.send(SyncerCommand::AsyncSaveRecording(recording, f)).unwrap();
fn async_save_recording(&self, id: CompositeId, recording: Arc<Mutex<db::UncommittedRecording>>,
f: fs::File) {
self.0.send(SyncerCommand::AsyncSaveRecording(id, recording, f)).unwrap();
}
fn async_abandon_recording(&self, id: CompositeId) {
self.0.send(SyncerCommand::AsyncAbandonRecording(id)).unwrap();
}
//fn async_abandon_recording(&self, id: CompositeId) {
// self.0.send(SyncerCommand::AsyncAbandonRecording(id)).unwrap();
//}
/// For testing: flushes the syncer, waiting for all currently-queued commands to complete.
pub fn flush(&self) {
@@ -463,9 +424,8 @@ impl Syncer {
.get(&dir_id)
.ok_or_else(|| format_err!("no dir {}", dir_id))?;
let dir = d.get()?;
let to_unlink = l.list_garbage(dir_id)?;
// Get files to abandon.
// Abandon files.
// First, get a list of the streams in question.
let streams_to_next: FnvHashMap<_, _> =
l.streams_by_id()
@@ -479,13 +439,25 @@ impl Syncer {
})
.collect();
let to_abandon = Syncer::list_files_to_abandon(&d.path, streams_to_next)?;
let mut undeletable = 0;
for &id in &to_abandon {
if let Err(e) = SampleFileDir::unlink(&dir.fd, id) {
if e.kind() == io::ErrorKind::NotFound {
warn!("dir: abandoned recording {} already deleted!", id);
} else {
warn!("dir: Unable to unlink abandoned recording {}: {}", id, e);
undeletable += 1;
}
}
}
if undeletable > 0 {
bail!("Unable to delete {} abandoned recordings.", undeletable);
}
Ok((Syncer {
dir_id,
dir,
db,
to_abandon,
to_unlink,
to_mark_deleted: Vec::new(),
}, d.path.clone()))
}
@@ -515,8 +487,9 @@ impl Syncer {
loop {
match cmds.recv() {
Err(_) => return, // all senders have closed the channel; shutdown
Ok(SyncerCommand::AsyncSaveRecording(recording, f)) => self.save(recording, f),
Ok(SyncerCommand::AsyncAbandonRecording(uuid)) => self.abandon(uuid),
Ok(SyncerCommand::AsyncSaveRecording(id, r, f)) => self.save(id, r, f),
//Ok(SyncerCommand::AsyncAbandonRecording(uuid)) => self.abandon(uuid),
Ok(SyncerCommand::DatabaseFlushed) => { let _ = self.collect_garbage(true); },
Ok(SyncerCommand::Flush(_)) => {}, // just drop the supplied sender, closing it.
};
}
@@ -535,106 +508,102 @@ impl Syncer {
fn do_rotation<F>(&mut self, get_rows_to_delete: F) -> Result<(), Error>
where F: FnOnce(&db::LockedDatabase) -> Result<Vec<db::ListOldestSampleFilesRow>, Error> {
let to_delete = {
let mut db = self.db.lock();
let to_delete = get_rows_to_delete(&*db)?;
let mut tx = db.tx()?;
tx.delete_recordings(&to_delete)?;
tx.commit()?;
to_delete
};
for row in to_delete {
self.to_unlink.push(row.id);
}
self.try_unlink();
if !self.to_unlink.is_empty() {
bail!("failed to unlink {} sample files", self.to_unlink.len());
}
self.dir.sync()?;
{
let mut db = self.db.lock();
let mut tx = db.tx()?;
tx.mark_sample_files_deleted(&self.to_mark_deleted)?;
tx.commit()?;
let mut to_delete = get_rows_to_delete(&*db)?;
db.delete_recordings(&mut to_delete);
db.flush("synchronous deletion")?;
}
self.to_mark_deleted.clear();
Ok(())
self.collect_garbage(false)?;
self.db.lock().flush("synchronous garbage collection")
}
fn collect_garbage(&mut self, warn_on_missing: bool) -> Result<(), Error> {
let mut garbage: Vec<_> = {
let l = self.db.lock();
let d = match l.sample_file_dirs_by_id().get(&self.dir_id) {
None => {
error!("can't find dir {} in db!", self.dir_id);
bail!("can't find dir {} in db!", self.dir_id);
},
Some(d) => d,
};
d.garbage.iter().map(|id| *id).collect()
};
let len_before = garbage.len();
garbage.retain(|&id| {
if let Err(e) = SampleFileDir::unlink(&self.dir.fd, id) {
if e.kind() == io::ErrorKind::NotFound {
if warn_on_missing {
warn!("dir: recording {} already deleted!", id);
}
} else {
warn!("dir: Unable to unlink {}: {}", id, e);
return false;
}
}
true
});
let res = if len_before > garbage.len() {
Err(format_err!("Unable to unlink {} files", len_before - garbage.len()))
} else {
Ok(())
};
if garbage.is_empty() {
// No progress.
return res;
}
if let Err(e) = self.dir.sync() {
error!("unable to sync dir: {}", e);
return res.and(Err(e.into()));
}
if let Err(e) = self.db.lock().delete_garbage(self.dir_id, &mut garbage) {
error!("unable to delete garbage ({} files) for dir {}: {}",
self.dir_id, garbage.len(), e);
return res.and(Err(e.into()));
}
res
}
/// Saves the given recording and causes rotation to happen.
/// Note that part of rotation is deferred for the next cycle (saved writing or program startup)
/// so that there can be only one dir sync and database transaction per save.
fn save(&mut self, recording: db::RecordingToInsert, f: fs::File) {
if let Err(e) = self.save_helper(&recording, f) {
error!("will discard recording {} due to error while saving: {}", recording.id, e);
self.abandon(recording.id);
return;
}
}
fn abandon(&mut self, id: CompositeId) {
self.to_abandon.push(id);
self.try_unlink();
}
/// Internal helper for `save`. This is separated out so that the question-mark operator
/// can be used in the many error paths.
fn save_helper(&mut self, recording: &db::RecordingToInsert, f: fs::File)
-> Result<(), Error> {
self.try_unlink();
if !self.to_unlink.is_empty() {
bail!("failed to unlink {} files.", self.to_unlink.len());
}
/// TODO: less unwrapping! keep a queue?
fn save(&mut self, id: CompositeId, recording: Arc<Mutex<db::UncommittedRecording>>,
f: fs::File) {
let stream_id = id.stream();
// XXX: if these calls fail, any other writes are likely to fail as well.
f.sync_all()?;
self.dir.sync()?;
let mut to_delete = Vec::new();
let mut db = self.db.lock();
// Free up a like number of bytes.
{
let stream_id = recording.id.stream();
let stream =
db.streams_by_id().get(&stream_id)
.ok_or_else(|| format_err!("no such stream {}", stream_id))?;
get_rows_to_delete(&db, stream_id, stream,
recording.sample_file_bytes as i64, &mut to_delete)?;
let mut to_delete = Vec::new();
let len = recording.lock().recording.as_ref().unwrap().sample_file_bytes as i64;
let mut db = self.db.lock();
{
let stream = db.streams_by_id().get(&stream_id).unwrap();
get_rows_to_delete(&db, stream_id, stream, len, &mut to_delete).unwrap();
}
db.delete_recordings(&mut to_delete);
}
let mut tx = db.tx()?;
tx.mark_sample_files_deleted(&self.to_mark_deleted)?;
tx.delete_recordings(&to_delete)?;
tx.insert_recording(recording)?;
tx.commit()?;
self.to_mark_deleted.clear();
self.to_unlink.extend(to_delete.iter().map(|row| row.id));
self.to_unlink.extend_from_slice(&self.to_abandon);
self.to_abandon.clear();
Ok(())
}
/// Tries to unlink all the files in `self.to_unlink` and `self.to_abandon`.
/// Any which can't be unlinked will be retained in the vec.
fn try_unlink(&mut self) {
let to_mark_deleted = &mut self.to_mark_deleted;
let fd = &self.dir.fd;
for &mut (ref mut v, mark_deleted) in &mut [(&mut self.to_unlink, true),
(&mut self.to_abandon, false)] {
v.retain(|&id| {
if let Err(e) = SampleFileDir::unlink(fd, id) {
if e.kind() == io::ErrorKind::NotFound {
warn!("dir: recording {} already deleted!", id);
} else {
warn!("dir: Unable to unlink {}: {}", id, e);
return true;
}
}
if mark_deleted {
to_mark_deleted.push(id);
}
false
});
}
f.sync_all().unwrap();
self.dir.sync().unwrap();
recording.lock().synced = true;
let mut db = self.db.lock();
let reason = {
let s = db.streams_by_id().get(&stream_id).unwrap();
let c = db.cameras_by_id().get(&s.camera_id).unwrap();
let unflushed = s.unflushed();
if unflushed < s.flush_if {
debug!("{}-{}: unflushed={} < if={}, not flushing",
c.short_name, s.type_.as_str(), unflushed, s.flush_if);
return;
}
format!("{}-{}: unflushed={} >= if={}",
c.short_name, s.type_.as_str(), unflushed, s.flush_if)
};
let _ = db.flush(&reason);
}
}
@@ -651,6 +620,7 @@ pub struct Writer<'a>(Option<InnerWriter<'a>>);
struct InnerWriter<'a> {
syncer_channel: &'a SyncerChannel,
f: fs::File,
r: Arc<Mutex<db::UncommittedRecording>>,
index: recording::SampleIndexEncoder,
id: CompositeId,
corrupt: bool,
@@ -744,11 +714,13 @@ pub struct PreviousWriter {
impl<'a> Writer<'a> {
/// Opens the writer; for use by `SampleFileDir` (which should supply `f`).
fn open(f: fs::File, id: CompositeId, prev: Option<PreviousWriter>,
fn open(f: fs::File, id: CompositeId, r: Arc<Mutex<db::UncommittedRecording>>,
prev: Option<PreviousWriter>,
video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel) -> Result<Self, Error> {
Ok(Writer(Some(InnerWriter {
syncer_channel,
f,
r,
index: recording::SampleIndexEncoder::new(),
id,
corrupt: false,
@@ -784,6 +756,7 @@ impl<'a> Writer<'a> {
Err(e) => {
if remaining.len() < pkt.len() {
// Partially written packet. Truncate if possible.
// TODO: have the syncer do this / retry it if necessary?
if let Err(e2) = w.f.set_len(w.index.sample_file_bytes as u64) {
error!("After write to {} failed with {}, truncate failed with {}; \
sample file is corrupt.", w.id, e, e2);
@@ -820,7 +793,7 @@ impl<'a> InnerWriter<'a> {
fn close(mut self, next_pts: Option<i64>) -> Result<PreviousWriter, Error> {
if self.corrupt {
self.syncer_channel.async_abandon_recording(self.id);
//self.syncer_channel.async_abandon_recording(self.id);
bail!("recording {} is corrupt", self.id);
}
let unflushed =
@@ -839,8 +812,7 @@ impl<'a> InnerWriter<'a> {
let flags = if self.index.has_trailing_zero() { db::RecordingFlags::TrailingZero as i32 }
else { 0 };
let local_start_delta = self.local_start - start;
let recording = db::RecordingToInsert{
id: self.id,
let recording = db::RecordingToInsert {
sample_file_bytes: self.index.sample_file_bytes,
time: start .. end,
local_time_delta: local_start_delta,
@@ -852,7 +824,8 @@ impl<'a> InnerWriter<'a> {
run_offset: self.run_offset,
flags: flags,
};
self.syncer_channel.async_save_recording(recording, self.f);
self.r.lock().recording = Some(recording);
self.syncer_channel.async_save_recording(self.id, self.r, self.f);
Ok(PreviousWriter{
end_time: end,
local_time_delta: local_start_delta,