From b037c9bdd7fde4b785d9e3304d4872d963c4ed57 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Thu, 22 Feb 2018 16:35:34 -0800 Subject: [PATCH] knob to reduce db commits (SSD write cycles) This improves the practicality of having many streams (including the doubling of streams by having main + sub streams for each camera). With these tuned properly, extra streams don't cause any extra write cycles in normal or error cases. Consider the worst case in which each RTSP session immediately sends a single frame and then fails. Moonfire retries every second, so this would formerly cause one commit per second per stream. (flush_if_sec=0 preserves this behavior.) Now the commits can be arbitrarily infrequent by setting higher values of flush_if_sec. WARNING: this isn't production-ready! I hacked up dir.rs to make tests pass and "moonfire-nvr run" work in the best-case scenario, but it doesn't handle errors gracefully. I've been debating what to do when writing a recording fails. I considered "abandoning" the recording then either reusing or skipping its id. (in the latter case, marking the file as garbage if it can't be unlinked immediately). I think now there's no point in abandoning a recording. If I can't write to that file, there's no reason to believe another will work better. It's better to retry that recording forever, and perhaps put the whole directory into an error state that stops recording until those writes go through. I'm planning to redesign dir.rs to make this happen. --- db/db.rs | 867 ++++++++++++++++------------------- db/dir.rs | 301 ++++++------ db/lib.rs | 1 + db/raw.rs | 203 ++++++++ db/schema.sql | 5 + db/testutil.rs | 84 ++-- guide/schema.md | 2 + src/clock.rs | 8 +- src/cmds/config/cameras.rs | 11 +- src/cmds/config/dirs.rs | 13 +- src/cmds/run.rs | 3 + src/cmds/upgrade/v1_to_v2.rs | 5 +- src/json.rs | 2 +- src/mp4.rs | 4 + src/streamer.rs | 7 +- 15 files changed, 822 insertions(+), 694 deletions(-) create mode 100644 db/raw.rs diff --git a/db/db.rs b/db/db.rs index 46bd3cc..4b393a9 100644 --- a/db/db.rs +++ b/db/db.rs @@ -48,19 +48,21 @@ //! the critical path of recording frames. The caller should preallocate sample file uuids //! and such to avoid database operations in these paths. //! -//! * the `Transaction` interface allows callers to batch write operations to reduce latency and -//! SSD write cycles. +//! * adding and removing recordings done during normal operations use a batch interface. +//! A list of mutations is built up in-memory and occasionally flushed to reduce SSD write +//! cycles. use dir; use failure::Error; -use fnv::{self, FnvHashMap}; +use fnv::{self, FnvHashMap, FnvHashSet}; use lru_cache::LruCache; use openssl::hash; use parking_lot::{Mutex,MutexGuard}; +use raw; use recording::{self, TIME_UNITS_PER_SEC}; use rusqlite; use schema; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use std::collections::btree_map; use std::cell::RefCell; use std::cmp; @@ -86,32 +88,11 @@ const GET_RECORDING_PLAYBACK_SQL: &'static str = r#" composite_id = :composite_id "#; -const DELETE_GARBAGE_SQL: &'static str = - "delete from garbage where composite_id = :composite_id"; - -const INSERT_GARBAGE_SQL: &'static str = - "insert into garbage (sample_file_dir_id, composite_id) - values (:sample_file_dir_id, :composite_id)"; - const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#" insert into video_sample_entry (sha1, width, height, rfc6381_codec, data) values (:sha1, :width, :height, :rfc6381_codec, :data) "#; -const INSERT_RECORDING_SQL: &'static str = r#" - insert into recording (composite_id, stream_id, run_offset, flags, sample_file_bytes, - start_time_90k, duration_90k, local_time_delta_90k, video_samples, - video_sync_samples, video_sample_entry_id) - values (:composite_id, :stream_id, :run_offset, :flags, :sample_file_bytes, - :start_time_90k, :duration_90k, :local_time_delta_90k, - :video_samples, :video_sync_samples, :video_sample_entry_id) -"#; - -const INSERT_RECORDING_PLAYBACK_SQL: &'static str = r#" - insert into recording_playback (composite_id, sample_file_sha1, video_index) - values (:composite_id, :sample_file_sha1, :video_index) -"#; - const UPDATE_NEXT_RECORDING_ID_SQL: &'static str = "update stream set next_recording_id = :next_recording_id where id = :stream_id"; @@ -130,35 +111,6 @@ const LIST_OLDEST_SAMPLE_FILES_SQL: &'static str = r#" composite_id "#; -const DELETE_RECORDING_SQL: &'static str = r#" - delete from recording where composite_id = :composite_id -"#; - -const DELETE_RECORDING_PLAYBACK_SQL: &'static str = r#" - delete from recording_playback where composite_id = :composite_id -"#; - -const STREAM_MIN_START_SQL: &'static str = r#" - select - start_time_90k - from - recording - where - stream_id = :stream_id - order by start_time_90k limit 1 -"#; - -const STREAM_MAX_START_SQL: &'static str = r#" - select - start_time_90k, - duration_90k - from - recording - where - stream_id = :stream_id - order by start_time_90k desc; -"#; - const LIST_RECORDINGS_BY_ID_SQL: &'static str = r#" select recording.composite_id, @@ -257,9 +209,8 @@ pub enum RecordingFlags { } /// A recording to pass to `insert_recording`. -#[derive(Debug)] -pub struct RecordingToInsert { - pub id: CompositeId, +#[derive(Clone, Debug)] +pub(crate) struct RecordingToInsert { pub run_offset: i32, pub flags: i32, pub sample_file_bytes: i32, @@ -274,8 +225,9 @@ pub struct RecordingToInsert { /// A row used in `list_oldest_sample_files`. #[derive(Debug)] -pub struct ListOldestSampleFilesRow { +pub(crate) struct ListOldestSampleFilesRow { pub id: CompositeId, + pub sample_file_dir_id: i32, pub time: Range, pub sample_file_bytes: i32, } @@ -329,6 +281,8 @@ pub struct SampleFileDir { pub uuid: Uuid, dir: Option>, last_complete_open: Option, + to_gc: Vec, + pub(crate) garbage: FnvHashSet, } impl SampleFileDir { @@ -407,6 +361,10 @@ pub struct Stream { pub type_: StreamType, pub rtsp_path: String, pub retain_bytes: i64, + pub flush_if_sec: i64, + + /// `flush_if_sec` converted to a duration for convenience. + pub flush_if: recording::Duration, /// The time range of recorded data associated with this stream (minimum start time and maximum /// end time). `None` iff there are no recordings for this camera. @@ -420,7 +378,48 @@ pub struct Stream { /// Mapping of calendar day (in the server's time zone) to a summary of recordings on that day. pub days: BTreeMap, pub record: bool, - pub next_recording_id: i32, + + /// The `next_recording_id` currently committed to the database. + pub(crate) next_recording_id: i32, + + /// The recordings which have been added via `LockedDatabase::add_recording` but have yet to + /// committed to the database. + /// + /// `uncommitted[i]` uses sample filename `CompositeId::new(id, next_recording_id + 1)`; + /// `next_recording_id` should be advanced when one is committed to maintain this invariant. + /// + /// TODO: alter the serving path to show these just as if they were already committed. + uncommitted: VecDeque>>, +} + +impl Stream { + /// Returns the duration of synced but uncommitted recordings for the given stream. + /// Note recordings must be flushed in order, so a recording is considered unsynced if any + /// before it are unsynced. + pub(crate) fn unflushed(&self) -> recording::Duration { + let mut dur = recording::Duration(0); + for u in &self.uncommitted { + let l = u.lock(); + if !l.synced { + break; + } + if let Some(ref r) = l.recording { + dur += r.time.end - r.time.start; + } + } + dur + } +} + +#[derive(Debug)] +pub(crate) struct UncommittedRecording { + /// If this recording has been synced to disk and thus is ready to commit to the database. + /// `recording` should not be modified after this is set to true. + pub(crate) synced: bool, + + /// The recording to add. Absent iff the recording has been abandoned. + /// TODO: modify `SampleIndexEncoder` to update this record as it goes. + pub(crate) recording: Option, } #[derive(Debug, Default)] @@ -428,6 +427,7 @@ pub struct StreamChange { pub sample_file_dir_id: Option, pub rtsp_path: String, pub record: bool, + pub flush_if_sec: i64, } /// Information about a camera, used by `add_camera` and `update_camera`. @@ -564,17 +564,8 @@ fn init_recordings(conn: &mut rusqlite::Connection, stream_id: i32, camera: &Cam Ok(()) } -#[derive(Debug)] pub struct LockedDatabase { conn: rusqlite::Connection, - state: State, -} - -/// In-memory state from the database. -/// This is separated out of `LockedDatabase` so that `Transaction` can mutably borrow `state` -/// while its underlying `rusqlite::Transaction` is borrowing `conn`. -#[derive(Debug)] -struct State { uuid: Uuid, /// If the database is open in read-write mode, the information about the current Open row. @@ -582,10 +573,15 @@ struct State { sample_file_dirs_by_id: BTreeMap, cameras_by_id: BTreeMap, streams_by_id: BTreeMap, - cameras_by_uuid: BTreeMap, + cameras_by_uuid: BTreeMap, // values are ids. video_sample_entries: BTreeMap>, list_recordings_by_time_sql: String, video_index_cache: RefCell, fnv::FnvBuildHasher>>, + on_flush: Vec>, + + /// Recordings which have been enqueued for deletion via `LockedDatabase::delete_recordings` + /// but have yet to be committed. + to_delete: Vec, } #[derive(Copy, Clone, Debug)] @@ -594,48 +590,14 @@ struct Open { uuid: Uuid, } -/// A high-level transaction. This manages the SQLite transaction and the matching modification to -/// be applied to the in-memory state on successful commit. -pub struct Transaction<'a> { - state: &'a mut State, - mods_by_stream: FnvHashMap, - tx: rusqlite::Transaction<'a>, - - /// True if due to an earlier error the transaction must be rolled back rather than committed. - /// Insert and delete are multi-part. If later parts fail, earlier parts should be aborted as - /// well. We could use savepoints (nested transactions) for this, but for simplicity we just - /// require the entire transaction be rolled back. - must_rollback: bool, -} - -/// A modification to be done to a `Stream` after a `Transaction` is committed. +/// A modification to be done to a `Stream`, used within `LockedDatabase::flush`. #[derive(Default)] struct StreamModification { - /// Add this to `camera.duration`. Thus, positive values indicate a net addition; - /// negative values indicate a net subtraction. - duration: recording::Duration, - - /// Add this to `camera.sample_file_bytes`. - sample_file_bytes: i64, - - /// Add this to `stream.days`. - days: BTreeMap, - - /// Reset the Stream range to this value. This should be populated immediately prior to the - /// commit. + num_recordings_to_commit: i32, range: Option>, - - /// Reset the next_recording_id to the specified value. - new_next_recording_id: Option, - - /// Reset the retain_bytes to the specified value. - new_retain_bytes: Option, - - /// Reset the record to the specified value. - new_record: Option, } -#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] pub struct CompositeId(pub i64); impl CompositeId { @@ -653,231 +615,6 @@ impl ::std::fmt::Display for CompositeId { } } -impl<'a> Transaction<'a> { - /// Deletes the given recordings from the `recording` and `recording_playback` tables. - /// Note they are not fully removed from the database; the uuids are transferred to the - /// `garbage` table. The caller should `unlink` the files, then remove the `garbage` row. - pub fn delete_recordings(&mut self, rows: &[ListOldestSampleFilesRow]) -> Result<(), Error> { - let mut del1 = self.tx.prepare_cached(DELETE_RECORDING_PLAYBACK_SQL)?; - let mut del2 = self.tx.prepare_cached(DELETE_RECORDING_SQL)?; - let mut insert = self.tx.prepare_cached(INSERT_GARBAGE_SQL)?; - - self.check_must_rollback()?; - self.must_rollback = true; - for row in rows { - let changes = del1.execute_named(&[(":composite_id", &row.id.0)])?; - if changes != 1 { - bail!("no such recording {}", row.id); - } - let changes = del2.execute_named(&[(":composite_id", &row.id.0)])?; - if changes != 1 { - bail!("no such recording_playback {}", row.id); - } - let sid = row.id.stream(); - let did = self.state - .streams_by_id - .get(&sid) - .ok_or_else(|| format_err!("no such stream {}", sid))? - .sample_file_dir_id - .ok_or_else(|| format_err!("stream {} has no dir", sid))?; - insert.execute_named(&[ - (":sample_file_dir_id", &did), - (":composite_id", &row.id.0)], - )?; - let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, row.id.stream()); - m.duration -= row.time.end - row.time.start; - m.sample_file_bytes -= row.sample_file_bytes as i64; - adjust_days(row.time.clone(), -1, &mut m.days); - } - self.must_rollback = false; - Ok(()) - } - - /// Marks the given sample files as deleted. This shouldn't be called until the files have - /// been `unlink()`ed and the parent directory `fsync()`ed. - pub fn mark_sample_files_deleted(&mut self, ids: &[CompositeId]) -> Result<(), Error> { - if ids.is_empty() { return Ok(()); } - let mut stmt = self.tx.prepare_cached(DELETE_GARBAGE_SQL)?; - for &id in ids { - let changes = stmt.execute_named(&[(":composite_id", &id.0)])?; - if changes != 1 { - bail!("no garbage row for {}", id); - } - } - Ok(()) - } - - /// Inserts the specified recording. - pub fn insert_recording(&mut self, r: &RecordingToInsert) -> Result<(), Error> { - self.check_must_rollback()?; - - if r.time.end < r.time.start { - bail!("end time {} must be >= start time {}", r.time.end, r.time.start); - } - - // Check that the recording id is acceptable and do the insertion. - let stream = match self.state.streams_by_id.get(&r.id.stream()) { - None => bail!("no such stream id {}", r.id.stream()), - Some(s) => s, - }; - self.must_rollback = true; - let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, r.id.stream()); - { - let next = m.new_next_recording_id.unwrap_or(stream.next_recording_id); - if r.id.recording() < next { - bail!("recording {} out of order; next id is {}!", r.id, next); - } - let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_SQL)?; - stmt.execute_named(&[ - (":composite_id", &r.id.0), - (":stream_id", &(r.id.stream() as i64)), - (":run_offset", &r.run_offset), - (":flags", &r.flags), - (":sample_file_bytes", &r.sample_file_bytes), - (":start_time_90k", &r.time.start.0), - (":duration_90k", &(r.time.end.0 - r.time.start.0)), - (":local_time_delta_90k", &r.local_time_delta.0), - (":video_samples", &r.video_samples), - (":video_sync_samples", &r.video_sync_samples), - (":video_sample_entry_id", &r.video_sample_entry_id), - ])?; - m.new_next_recording_id = Some(r.id.recording() + 1); - let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_PLAYBACK_SQL)?; - let sha1 = &r.sample_file_sha1[..]; - stmt.execute_named(&[ - (":composite_id", &r.id.0), - (":sample_file_sha1", &sha1), - (":video_index", &r.video_index), - ])?; - let mut stmt = self.tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?; - stmt.execute_named(&[ - (":stream_id", &(r.id.stream() as i64)), - (":next_recording_id", &m.new_next_recording_id), - ])?; - } - self.must_rollback = false; - m.duration += r.time.end - r.time.start; - m.sample_file_bytes += r.sample_file_bytes as i64; - adjust_days(r.time.clone(), 1, &mut m.days); - Ok(()) - } - - /// Updates the `record` and `retain_bytes` for the given stream. - /// Note this just resets the limit in the database; it's the caller's responsibility to ensure - /// current usage is under the new limit if desired. - pub fn update_retention(&mut self, stream_id: i32, new_record: bool, new_limit: i64) - -> Result<(), Error> { - if new_limit < 0 { - bail!("can't set limit for stream {} to {}; must be >= 0", stream_id, new_limit); - } - self.check_must_rollback()?; - let mut stmt = self.tx.prepare_cached(r#" - update stream - set - record = :record, - retain_bytes = :retain - where - id = :id - "#)?; - let changes = stmt.execute_named(&[ - (":record", &new_record), - (":retain", &new_limit), - (":id", &stream_id), - ])?; - if changes != 1 { - bail!("no such stream {}", stream_id); - } - let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, stream_id); - m.new_record = Some(new_record); - m.new_retain_bytes = Some(new_limit); - Ok(()) - } - - /// Commits these changes, consuming the Transaction. - pub fn commit(mut self) -> Result<(), Error> { - self.check_must_rollback()?; - self.precommit()?; - self.tx.commit()?; - for (&stream_id, m) in &self.mods_by_stream { - let stream = self.state.streams_by_id.get_mut(&stream_id) - .expect("modified stream must exist"); - stream.duration += m.duration; - stream.sample_file_bytes += m.sample_file_bytes; - for (k, v) in &m.days { - adjust_day(*k, *v, &mut stream.days); - } - stream.range = m.range.clone(); - if let Some(id) = m.new_next_recording_id { - stream.next_recording_id = id; - } - if let Some(r) = m.new_record { - stream.record = r; - } - if let Some(b) = m.new_retain_bytes { - stream.retain_bytes = b; - } - } - Ok(()) - } - - /// Raises an error if `must_rollback` is true. To be used on commit and in modifications. - fn check_must_rollback(&self) -> Result<(), Error> { - if self.must_rollback { - bail!("failing due to previous error"); - } - Ok(()) - } - - /// Looks up an existing entry in `mods` for a given stream or makes+inserts an identity entry. - fn get_mods_by_stream(mods: &mut FnvHashMap, stream_id: i32) - -> &mut StreamModification { - mods.entry(stream_id).or_insert_with(StreamModification::default) - } - - /// Fills the `range` of each `StreamModification`. This is done prior to commit so that if the - /// commit succeeds, there's no possibility that the correct state can't be retrieved. - fn precommit(&mut self) -> Result<(), Error> { - // Recompute start and end times for each camera. - for (&stream_id, m) in &mut self.mods_by_stream { - // The minimum is straightforward, taking advantage of the start_time_90k index. - let mut stmt = self.tx.prepare_cached(STREAM_MIN_START_SQL)?; - let mut rows = stmt.query_named(&[(":stream_id", &stream_id)])?; - let min_start = match rows.next() { - Some(row) => recording::Time(row?.get_checked(0)?), - None => continue, // no data; leave m.range alone. - }; - - // There was a minimum, so there should be a maximum too. Calculating it is less - // straightforward because recordings could overlap. All recordings starting in the - // last MAX_RECORDING_DURATION must be examined in order to take advantage of the - // start_time_90k index. - let mut stmt = self.tx.prepare_cached(STREAM_MAX_START_SQL)?; - let mut rows = stmt.query_named(&[(":stream_id", &stream_id)])?; - let mut maxes_opt = None; - while let Some(row) = rows.next() { - let row = row?; - let row_start = recording::Time(row.get_checked(0)?); - let row_duration: i64 = row.get_checked(1)?; - let row_end = recording::Time(row_start.0 + row_duration); - let maxes = match maxes_opt { - None => row_start .. row_end, - Some(Range{start: s, end: e}) => s .. cmp::max(e, row_end), - }; - if row_start.0 <= maxes.start.0 - recording::MAX_RECORDING_DURATION { - break; - } - maxes_opt = Some(maxes); - } - let max_end = match maxes_opt { - Some(Range{end: e, ..}) => e, - None => bail!("missing max for stream {} which had min {}", stream_id, min_start), - }; - m.range = Some(min_start .. max_end); - } - Ok(()) - } -} - /// Inserts, updates, or removes streams in the `State` object to match a set of `StreamChange` /// structs. struct StreamStateChanger { @@ -941,6 +678,9 @@ impl StreamStateChanger { sample_file_dir_id: sc.sample_file_dir_id, rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()), record: sc.record, + flush_if_sec: sc.flush_if_sec, + flush_if: recording::Duration(sc.flush_if_sec * + recording::TIME_UNITS_PER_SEC), ..s }))); } @@ -952,9 +692,9 @@ impl StreamStateChanger { // Insert stream. let mut stmt = tx.prepare_cached(r#" insert into stream (camera_id, sample_file_dir_id, type, rtsp_path, record, - retain_bytes, next_recording_id) + retain_bytes, flush_if_sec, next_recording_id) values (:camera_id, :sample_file_dir_id, :type, :rtsp_path, :record, - 0, 1) + 0, :flush_if_sec, 1) "#)?; let type_ = StreamType::from_index(i).unwrap(); stmt.execute_named(&[ @@ -963,6 +703,7 @@ impl StreamStateChanger { (":type", &type_.as_str()), (":rtsp_path", &sc.rtsp_path), (":record", &sc.record), + (":flush_if_sec", &sc.flush_if_sec), ])?; let id = tx.last_insert_rowid() as i32; sids[i] = Some(id); @@ -973,12 +714,15 @@ impl StreamStateChanger { sample_file_dir_id: sc.sample_file_dir_id, rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()), retain_bytes: 0, + flush_if_sec: sc.flush_if_sec, + flush_if: recording::Duration(sc.flush_if_sec * recording::TIME_UNITS_PER_SEC), range: None, sample_file_bytes: 0, duration: recording::Duration(0), days: BTreeMap::new(), record: sc.record, next_recording_id: 1, + uncommitted: VecDeque::new(), }))); } } @@ -1004,11 +748,156 @@ impl StreamStateChanger { } } +/// A retention change as expected by `LockedDatabase::update_retention`. +pub struct RetentionChange { + pub stream_id: i32, + pub new_record: bool, + pub new_limit: i64, +} + impl LockedDatabase { /// Returns an immutable view of the cameras by id. - pub fn cameras_by_id(&self) -> &BTreeMap { &self.state.cameras_by_id } + pub fn cameras_by_id(&self) -> &BTreeMap { &self.cameras_by_id } pub fn sample_file_dirs_by_id(&self) -> &BTreeMap { - &self.state.sample_file_dirs_by_id + &self.sample_file_dirs_by_id + } + + pub(crate) fn add_recording(&mut self, stream_id: i32) + -> Result<(CompositeId, Arc>), Error> { + let stream = match self.streams_by_id.get_mut(&stream_id) { + None => bail!("no such stream {}", stream_id), + Some(s) => s, + }; + let id = CompositeId::new(stream_id, + stream.next_recording_id + (stream.uncommitted.len() as i32)); + let recording = Arc::new(Mutex::new(UncommittedRecording { + synced: false, + recording: None, + })); + stream.uncommitted.push_back(Arc::clone(&recording)); + Ok((id, recording)) + } + + pub(crate) fn delete_recordings(&mut self, rows: &mut Vec) { + self.to_delete.append(rows); + } + + pub(crate) fn delete_garbage(&mut self, dir_id: i32, ids: &mut Vec) + -> Result<(), Error> { + let dir = match self.sample_file_dirs_by_id.get_mut(&dir_id) { + None => bail!("no such dir {}", dir_id), + Some(d) => d, + }; + dir.to_gc.append(ids); + Ok(()) + } + + /// Tries to flush unwritten changes from the stream directories. + /// + /// * commits any recordings added with `add_recording` that have since been marked as + /// synced. + /// * moves old recordings to the garbage table as requested by `delete_recordings`. + /// * removes entries from the garbage table as requested by `mark_sample_files_deleted`. + /// + /// On success, for each affected sample file directory with a flush watcher set, sends a + /// `Flush` event. + pub(crate) fn flush(&mut self, reason: &str) -> Result<(), Error> { + let tx = self.conn.transaction()?; + let mut mods = FnvHashMap::with_capacity_and_hasher(self.streams_by_id.len(), + Default::default()); + raw::delete_recordings(&tx, &self.to_delete)?; + for row in &self.to_delete { + // Add a placeholder for recomputing the range. + mods.entry(row.id.stream()).or_insert_with(StreamModification::default); + + let dir = match self.sample_file_dirs_by_id.get_mut(&row.sample_file_dir_id) { + None => bail!("Row refers to nonexistent sample file dir: {:#?}", row), + Some(d) => d, + }; + dir.garbage.insert(row.id); + } + { + let mut stmt = tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?; + for (&stream_id, s) in &self.streams_by_id { + let mut i = 0; + for recording in &s.uncommitted { + let l = recording.lock(); + if !l.synced { break; } + if let Some(ref r) = l.recording { + raw::insert_recording( + &tx, CompositeId::new(stream_id, s.next_recording_id + i), &r)?; + } + i += 1; + } + if i > 0 { + let m = mods.entry(stream_id).or_insert_with(StreamModification::default); + m.num_recordings_to_commit = i; + stmt.execute_named(&[ + (":stream_id", &stream_id), + (":next_recording_id", &(s.next_recording_id + i)), + ])?; + } + } + } + for dir in self.sample_file_dirs_by_id.values() { + raw::mark_sample_files_deleted(&tx, &dir.to_gc)?; + } + for (&stream_id, m) in &mut mods { + m.range = raw::get_range(&tx, stream_id)?; + } + tx.commit()?; + + // Process delete_recordings. + let deleted = self.to_delete.len(); + for row in self.to_delete.drain(..) { + let s = self.streams_by_id.get_mut(&row.id.stream()).unwrap(); + s.duration -= row.time.end - row.time.start; + s.sample_file_bytes -= row.sample_file_bytes as i64; + adjust_days(row.time, -1, &mut s.days); + } + + // Process delete_garbage. + let mut gced = 0; + for dir in self.sample_file_dirs_by_id.values_mut() { + gced += dir.to_gc.len(); + for id in dir.to_gc.drain(..) { + dir.garbage.remove(&id); + } + } + + // Process add_recordings. + let mut added = 0; + for (stream_id, m) in mods.drain() { + let s = self.streams_by_id.get_mut(&stream_id).unwrap(); + s.next_recording_id += m.num_recordings_to_commit; + added += m.num_recordings_to_commit; + for _ in 0..m.num_recordings_to_commit { + let u = s.uncommitted.pop_front().unwrap(); + let l = u.lock(); + if let Some(ref r) = l.recording { + s.add_recording(r.time.clone(), r.sample_file_bytes); + } + } + s.range = m.range; + } + info!("Flush due to {}: added {} recordings, deleted {}, marked {} files GCed.", + reason, added, deleted, gced); + for cb in &self.on_flush { + cb(); + } + Ok(()) + } + + /// Sets a watcher which will receive an (empty) event on successful flush. + /// The lock will be held while this is run, so it should not do any I/O. + pub(crate) fn on_flush(&mut self, run: Box) { + self.on_flush.push(run); + } + + // TODO: find a cleaner way to do this. Seems weird for src/cmds/run.rs to clear the on flush + // handlers given that it didn't add them. + pub fn clear_on_flush(&mut self) { + self.on_flush.clear(); } /// Opens the given sample file directories. @@ -1025,7 +914,7 @@ impl LockedDatabase { /// in practice. pub fn open_sample_file_dirs(&mut self, ids: &[i32]) -> Result<(), Error> { let mut in_progress = FnvHashMap::with_capacity_and_hasher(ids.len(), Default::default()); - let o = self.state.open.as_ref(); + let o = self.open.as_ref(); for &id in ids { let e = in_progress.entry(id); use ::std::collections::hash_map::Entry; @@ -1033,13 +922,13 @@ impl LockedDatabase { Entry::Occupied(_) => continue, // suppress duplicate. Entry::Vacant(e) => e, }; - let dir = self.state + let dir = self .sample_file_dirs_by_id .get_mut(&id) .ok_or_else(|| format_err!("no such dir {}", id))?; if dir.dir.is_some() { continue } let mut meta = schema::DirMeta::default(); - meta.db_uuid.extend_from_slice(&self.state.uuid.as_bytes()[..]); + meta.db_uuid.extend_from_slice(&self.uuid.as_bytes()[..]); meta.dir_uuid.extend_from_slice(&dir.uuid.as_bytes()[..]); if let Some(o) = dir.last_complete_open { let open = meta.mut_last_complete_open(); @@ -1078,7 +967,7 @@ impl LockedDatabase { tx.commit()?; for (id, (mut meta, d)) in in_progress.drain() { - let dir = self.state.sample_file_dirs_by_id.get_mut(&id).unwrap(); + let dir = self.sample_file_dirs_by_id.get_mut(&id).unwrap(); meta.last_complete_open.clear(); mem::swap(&mut meta.last_complete_open, &mut meta.in_progress_open); d.write_meta(&meta)?; @@ -1088,30 +977,17 @@ impl LockedDatabase { Ok(()) } - pub fn streams_by_id(&self) -> &BTreeMap { &self.state.streams_by_id } + pub fn streams_by_id(&self) -> &BTreeMap { &self.streams_by_id } /// Returns an immutable view of the video sample entries. pub fn video_sample_entries(&self) -> btree_map::Values> { - self.state.video_sample_entries.values() - } - - /// Starts a transaction for a write operation. - /// Note transactions are not needed for read operations; this process holds a lock on the - /// database directory, and the connection is locked within the process, so having a - /// `LockedDatabase` is sufficient to ensure a consistent view. - pub fn tx(&mut self) -> Result { - Ok(Transaction { - state: &mut self.state, - mods_by_stream: FnvHashMap::default(), - tx: self.conn.transaction()?, - must_rollback: false, - }) + self.video_sample_entries.values() } /// Gets a given camera by uuid. pub fn get_camera(&self, uuid: Uuid) -> Option<&Camera> { - match self.state.cameras_by_uuid.get(&uuid) { - Some(id) => Some(self.state.cameras_by_id.get(id).expect("uuid->id requires id->cam")), + match self.cameras_by_uuid.get(&uuid) { + Some(id) => Some(self.cameras_by_id.get(id).expect("uuid->id requires id->cam")), None => None, } } @@ -1121,7 +997,7 @@ impl LockedDatabase { pub fn list_recordings_by_time(&self, stream_id: i32, desired_time: Range, f: F) -> Result<(), Error> where F: FnMut(ListRecordingsRow) -> Result<(), Error> { - let mut stmt = self.conn.prepare_cached(&self.state.list_recordings_by_time_sql)?; + let mut stmt = self.conn.prepare_cached(&self.list_recordings_by_time_sql)?; let rows = stmt.query_named(&[ (":stream_id", &stream_id), (":start_time_90k", &desired_time.start.0), @@ -1147,7 +1023,7 @@ impl LockedDatabase { let row = row?; let id = CompositeId(row.get_checked::<_, i64>(0)?); let vse_id = row.get_checked(8)?; - let video_sample_entry = match self.state.video_sample_entries.get(&vse_id) { + let video_sample_entry = match self.video_sample_entries.get(&vse_id) { Some(v) => v, None => bail!("recording {} references nonexistent video_sample_entry {}", id, vse_id), @@ -1245,7 +1121,7 @@ impl LockedDatabase { /// This uses a LRU cache to reduce the number of retrievals from the database. pub fn with_recording_playback(&self, id: CompositeId, f: F) -> Result where F: FnOnce(&RecordingPlayback) -> Result { - let mut cache = self.state.video_index_cache.borrow_mut(); + let mut cache = self.video_index_cache.borrow_mut(); if let Some(video_index) = cache.get_mut(&id.0) { trace!("cache hit for recording {}", id); return f(&RecordingPlayback { video_index }); @@ -1263,23 +1139,18 @@ impl LockedDatabase { Err(format_err!("no such recording {}", id)) } - /// Lists all garbage ids. - pub fn list_garbage(&self, dir_id: i32) -> Result, Error> { - let mut garbage = Vec::new(); - let mut stmt = self.conn.prepare_cached( - "select composite_id from garbage where sample_file_dir_id = ?")?; - let mut rows = stmt.query(&[&dir_id])?; - while let Some(row) = rows.next() { - let row = row?; - garbage.push(CompositeId(row.get_checked(0)?)); - } - Ok(garbage) - } - /// Lists the oldest sample files (to delete to free room). /// `f` should return true as long as further rows are desired. - pub fn list_oldest_sample_files(&self, stream_id: i32, mut f: F) -> Result<(), Error> + pub(crate) fn list_oldest_sample_files(&self, stream_id: i32, mut f: F) -> Result<(), Error> where F: FnMut(ListOldestSampleFilesRow) -> bool { + let s = match self.streams_by_id.get(&stream_id) { + None => bail!("no stream {}", stream_id), + Some(s) => s, + }; + let sample_file_dir_id = match s.sample_file_dir_id { + None => bail!("stream {} has no dir", stream_id), + Some(d) => d, + }; let mut stmt = self.conn.prepare_cached(LIST_OLDEST_SAMPLE_FILES_SQL)?; let mut rows = stmt.query_named(&[ (":start", &CompositeId::new(stream_id, 0).0), @@ -1292,6 +1163,7 @@ impl LockedDatabase { let duration = recording::Duration(row.get_checked(2)?); let should_continue = f(ListOldestSampleFilesRow{ id, + sample_file_dir_id, time: start .. start + duration, sample_file_bytes: row.get_checked(3)?, }); @@ -1328,7 +1200,7 @@ impl LockedDatabase { sha1.copy_from_slice(&sha1_vec); let data: Vec = row.get_checked(5)?; - self.state.video_sample_entries.insert(id, Arc::new(VideoSampleEntry { + self.video_sample_entries.insert(id, Arc::new(VideoSampleEntry { id: id as i32, width: row.get_checked::<_, i32>(2)? as u16, height: row.get_checked::<_, i32>(3)? as u16, @@ -1338,7 +1210,7 @@ impl LockedDatabase { })); } info!("Loaded {} video sample entries", - self.state.video_sample_entries.len()); + self.video_sample_entries.len()); Ok(()) } @@ -1368,15 +1240,17 @@ impl LockedDatabase { (None, None) => None, _ => bail!("open table missing id {}", id), }; - self.state.sample_file_dirs_by_id.insert(id, SampleFileDir { + self.sample_file_dirs_by_id.insert(id, SampleFileDir { id, uuid: dir_uuid.0, path: row.get_checked(1)?, dir: None, last_complete_open, + to_gc: Vec::new(), + garbage: raw::list_garbage(&self.conn, id)?, }); } - info!("Loaded {} sample file dirs", self.state.sample_file_dirs_by_id.len()); + info!("Loaded {} sample file dirs", self.sample_file_dirs_by_id.len()); Ok(()) } @@ -1401,7 +1275,7 @@ impl LockedDatabase { let row = row?; let id = row.get_checked(0)?; let uuid: FromSqlUuid = row.get_checked(1)?; - self.state.cameras_by_id.insert(id, Camera { + self.cameras_by_id.insert(id, Camera { id: id, uuid: uuid.0, short_name: row.get_checked(2)?, @@ -1411,9 +1285,9 @@ impl LockedDatabase { password: row.get_checked(6)?, streams: Default::default(), }); - self.state.cameras_by_uuid.insert(uuid.0, id); + self.cameras_by_uuid.insert(uuid.0, id); } - info!("Loaded {} cameras", self.state.cameras_by_id.len()); + info!("Loaded {} cameras", self.cameras_by_id.len()); Ok(()) } @@ -1429,6 +1303,7 @@ impl LockedDatabase { sample_file_dir_id, rtsp_path, retain_bytes, + flush_if_sec, next_recording_id, record from @@ -1442,28 +1317,32 @@ impl LockedDatabase { let type_ = StreamType::parse(&type_).ok_or_else( || format_err!("no such stream type {}", type_))?; let camera_id = row.get_checked(2)?; - let c = self.state + let c = self .cameras_by_id .get_mut(&camera_id) .ok_or_else(|| format_err!("missing camera {} for stream {}", camera_id, id))?; - self.state.streams_by_id.insert(id, Stream { + let flush_if_sec = row.get_checked(6)?; + self.streams_by_id.insert(id, Stream { id, type_, camera_id, sample_file_dir_id: row.get_checked(3)?, rtsp_path: row.get_checked(4)?, retain_bytes: row.get_checked(5)?, + flush_if_sec, + flush_if: recording::Duration(flush_if_sec * recording::TIME_UNITS_PER_SEC), range: None, sample_file_bytes: 0, duration: recording::Duration(0), days: BTreeMap::new(), - next_recording_id: row.get_checked(6)?, - record: row.get_checked(7)?, + next_recording_id: row.get_checked(7)?, + record: row.get_checked(8)?, + uncommitted: VecDeque::new(), }); c.streams[type_.index()] = Some(id); } - info!("Loaded {} streams", self.state.streams_by_id.len()); + info!("Loaded {} streams", self.streams_by_id.len()); Ok(()) } @@ -1477,7 +1356,7 @@ impl LockedDatabase { // Check if it already exists. // There shouldn't be too many entries, so it's fine to enumerate everything. - for (&id, v) in &self.state.video_sample_entries { + for (&id, v) in &self.video_sample_entries { if v.sha1 == sha1_bytes { // The width and height should match given that they're also specified within data // and thus included in the just-compared hash. @@ -1499,7 +1378,7 @@ impl LockedDatabase { ])?; let id = self.conn.last_insert_rowid() as i32; - self.state.video_sample_entries.insert(id, Arc::new(VideoSampleEntry { + self.video_sample_entries.insert(id, Arc::new(VideoSampleEntry { id, width, height, @@ -1515,14 +1394,14 @@ impl LockedDatabase { let mut meta = schema::DirMeta::default(); let uuid = Uuid::new_v4(); let uuid_bytes = &uuid.as_bytes()[..]; - let o = self.state + let o = self .open .as_ref() .ok_or_else(|| format_err!("database is read-only"))?; // Populate meta. { - meta.db_uuid.extend_from_slice(&self.state.uuid.as_bytes()[..]); + meta.db_uuid.extend_from_slice(&self.uuid.as_bytes()[..]); meta.dir_uuid.extend_from_slice(uuid_bytes); let open = meta.mut_in_progress_open(); open.id = o.id; @@ -1530,6 +1409,7 @@ impl LockedDatabase { } let dir = dir::SampleFileDir::create(&path, &meta)?; + // TODO: ensure the dir is empty? let uuid = Uuid::new_v4(); self.conn.execute(r#" insert into sample_file_dir (path, uuid, last_complete_open_id) @@ -1537,7 +1417,7 @@ impl LockedDatabase { "#, &[&path, &uuid_bytes, &o.id])?; let id = self.conn.last_insert_rowid() as i32; use ::std::collections::btree_map::Entry; - let e = self.state.sample_file_dirs_by_id.entry(id); + let e = self.sample_file_dirs_by_id.entry(id); let d = match e { Entry::Vacant(e) => e.insert(SampleFileDir { id, @@ -1545,6 +1425,8 @@ impl LockedDatabase { uuid, dir: Some(dir), last_complete_open: None, + to_gc: Vec::new(), + garbage: FnvHashSet::default(), }), Entry::Occupied(_) => Err(format_err!("duplicate sample file dir id {}", id))?, }; @@ -1555,7 +1437,7 @@ impl LockedDatabase { } pub fn delete_sample_file_dir(&mut self, dir_id: i32) -> Result<(), Error> { - for (&id, s) in self.state.streams_by_id.iter() { + for (&id, s) in self.streams_by_id.iter() { if s.sample_file_dir_id == Some(dir_id) { bail!("can't delete dir referenced by stream {}", id); } @@ -1565,7 +1447,7 @@ impl LockedDatabase { if self.conn.execute("delete from sample_file_dir where id = ?", &[&dir_id])? != 1 { bail!("no such dir {} to remove", dir_id); } - self.state.sample_file_dirs_by_id.remove(&dir_id).expect("sample file dir should exist!"); + self.sample_file_dirs_by_id.remove(&dir_id).expect("sample file dir should exist!"); Ok(()) } @@ -1590,12 +1472,12 @@ impl LockedDatabase { (":password", &camera.password), ])?; camera_id = tx.last_insert_rowid() as i32; - streams = StreamStateChanger::new(&tx, camera_id, None, &self.state.streams_by_id, + streams = StreamStateChanger::new(&tx, camera_id, None, &self.streams_by_id, &mut camera)?; } tx.commit()?; - let streams = streams.apply(&mut self.state.streams_by_id); - self.state.cameras_by_id.insert(camera_id, Camera { + let streams = streams.apply(&mut self.streams_by_id); + self.cameras_by_id.insert(camera_id, Camera { id: camera_id, uuid, short_name: camera.short_name, @@ -1605,7 +1487,7 @@ impl LockedDatabase { password: camera.password, streams, }); - self.state.cameras_by_uuid.insert(uuid, camera_id); + self.cameras_by_uuid.insert(uuid, camera_id); Ok(camera_id) } @@ -1614,12 +1496,12 @@ impl LockedDatabase { // TODO: sample_file_dir_id. disallow change when data is stored; change otherwise. let tx = self.conn.transaction()?; let streams; - let c = self.state + let c = self .cameras_by_id .get_mut(&camera_id) .ok_or_else(|| format_err!("no such camera {}", camera_id))?; { - streams = StreamStateChanger::new(&tx, camera_id, Some(c), &self.state.streams_by_id, + streams = StreamStateChanger::new(&tx, camera_id, Some(c), &self.streams_by_id, &mut camera)?; let mut stmt = tx.prepare_cached(r#" update camera set @@ -1649,20 +1531,20 @@ impl LockedDatabase { c.host = camera.host; c.username = camera.username; c.password = camera.password; - c.streams = streams.apply(&mut self.state.streams_by_id); + c.streams = streams.apply(&mut self.streams_by_id); Ok(()) } /// Deletes a camera and its streams. The camera must have no recordings. pub fn delete_camera(&mut self, id: i32) -> Result<(), Error> { - let uuid = self.state.cameras_by_id.get(&id) + let uuid = self.cameras_by_id.get(&id) .map(|c| c.uuid) .ok_or_else(|| format_err!("No such camera {} to remove", id))?; let mut streams_to_delete = Vec::new(); let tx = self.conn.transaction()?; { let mut stream_stmt = tx.prepare_cached(r"delete from stream where id = :id")?; - for (stream_id, stream) in &self.state.streams_by_id { + for (stream_id, stream) in &self.streams_by_id { if stream.camera_id != id { continue }; if stream.range.is_some() { bail!("Can't remove camera {}; has recordings.", id); @@ -1681,12 +1563,47 @@ impl LockedDatabase { } tx.commit()?; for id in streams_to_delete { - self.state.streams_by_id.remove(&id); + self.streams_by_id.remove(&id); } - self.state.cameras_by_id.remove(&id); - self.state.cameras_by_uuid.remove(&uuid); + self.cameras_by_id.remove(&id); + self.cameras_by_uuid.remove(&uuid); return Ok(()) } + + pub fn update_retention(&mut self, changes: &[RetentionChange]) -> Result<(), Error> { + let tx = self.conn.transaction()?; + { + let mut stmt = tx.prepare_cached(r#" + update stream + set + record = :record, + retain_bytes = :retain + where + id = :id + "#)?; + for c in changes { + if c.new_limit < 0 { + bail!("can't set limit for stream {} to {}; must be >= 0", + c.stream_id, c.new_limit); + } + let rows = stmt.execute_named(&[ + (":record", &c.new_record), + (":retain", &c.new_limit), + (":id", &c.stream_id), + ])?; + if rows != 1 { + bail!("no such stream {}", c.stream_id); + } + } + } + tx.commit()?; + for c in changes { + let s = self.streams_by_id.get_mut(&c.stream_id).expect("stream in db but not state"); + s.record = c.new_record; + s.retain_bytes = c.new_limit; + } + Ok(()) + } } /// Gets the schema version from the given database connection. @@ -1706,8 +1623,20 @@ pub fn get_schema_version(conn: &rusqlite::Connection) -> Result, Er /// The recording database. Abstracts away SQLite queries. Also maintains in-memory state /// (loaded on startup, and updated on successful commit) to avoid expensive scans over the /// recording table on common queries. -#[derive(Debug)] -pub struct Database(Mutex); +pub struct Database( + /// This is wrapped in an `Option` to allow the `Drop` implementation and `close` to coexist. + Option> +); + +impl Drop for Database { + fn drop(&mut self) { + if let Some(m) = self.0.take() { + if let Err(e) = m.into_inner().flush("drop") { + error!("Final database flush failed: {}", e); + } + } + } +} impl Database { /// Creates the database from a caller-supplied SQLite connection. @@ -1770,29 +1699,29 @@ impl Database { uuid, }) } else { None }; - let db = Database(Mutex::new(LockedDatabase{ + let db = Database(Some(Mutex::new(LockedDatabase { conn: conn, - state: State { - uuid, - open, - sample_file_dirs_by_id: BTreeMap::new(), - cameras_by_id: BTreeMap::new(), - cameras_by_uuid: BTreeMap::new(), - streams_by_id: BTreeMap::new(), - video_sample_entries: BTreeMap::new(), - video_index_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())), - list_recordings_by_time_sql: list_recordings_by_time_sql, - }, - })); + uuid, + open, + sample_file_dirs_by_id: BTreeMap::new(), + cameras_by_id: BTreeMap::new(), + cameras_by_uuid: BTreeMap::new(), + streams_by_id: BTreeMap::new(), + video_sample_entries: BTreeMap::new(), + video_index_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())), + list_recordings_by_time_sql, + to_delete: Vec::new(), + on_flush: Vec::new(), + }))); { let l = &mut *db.lock(); l.init_video_sample_entries()?; l.init_sample_file_dirs()?; l.init_cameras()?; l.init_streams()?; - for (&stream_id, ref mut stream) in &mut l.state.streams_by_id { + for (&stream_id, ref mut stream) in &mut l.streams_by_id { // TODO: we could use one thread per stream if we had multiple db conns. - let camera = l.state.cameras_by_id.get(&stream.camera_id).unwrap(); + let camera = l.cameras_by_id.get(&stream.camera_id).unwrap(); init_recordings(&mut l.conn, stream_id, camera, stream)?; } } @@ -1816,13 +1745,13 @@ impl Database { /// Locks the database; the returned reference is the only way to perform (read or write) /// operations. - pub fn lock(&self) -> MutexGuard { self.0.lock() } + pub fn lock(&self) -> MutexGuard { self.0.as_ref().unwrap().lock() } - /// For testing. Closes the database and return the connection. This allows verification that - /// a newly opened database is in an acceptable state. + /// For testing: closes the database (without flushing) and returns the connection. + /// This allows verification that a newly opened database is in an acceptable state. #[cfg(test)] - fn close(self) -> rusqlite::Connection { - self.0.into_inner().conn + fn close(mut self) -> rusqlite::Connection { + self.0.take().unwrap().into_inner().conn } } @@ -1992,7 +1921,7 @@ mod tests { #[test] fn test_no_meta_or_version() { testutil::init(); - let e = Database::new(Connection::open_in_memory().unwrap(), false).unwrap_err(); + let e = Database::new(Connection::open_in_memory().unwrap(), false).err().unwrap(); assert!(e.to_string().starts_with("no such table"), "{}", e); } @@ -2001,7 +1930,7 @@ mod tests { testutil::init(); let c = setup_conn(); c.execute_batch("delete from version; insert into version values (2, 0, '');").unwrap(); - let e = Database::new(c, false).unwrap_err(); + let e = Database::new(c, false).err().unwrap(); assert!(e.to_string().starts_with( "Database schema version 2 is too old (expected 3)"), "got: {:?}", e); } @@ -2011,7 +1940,7 @@ mod tests { testutil::init(); let c = setup_conn(); c.execute_batch("delete from version; insert into version values (4, 0, '');").unwrap(); - let e = Database::new(c, false).unwrap_err(); + let e = Database::new(c, false).err().unwrap(); assert!(e.to_string().starts_with( "Database schema version 4 is too new (expected 3)"), "got: {:?}", e); } @@ -2034,7 +1963,7 @@ mod tests { let db = Database::new(conn, true).unwrap(); let tmpdir = tempdir::TempDir::new("moonfire-nvr-test").unwrap(); let path = tmpdir.path().to_str().unwrap().to_owned(); - let sample_file_dir_id = Some({ db.lock() }.add_sample_file_dir(path).unwrap()); + let sample_file_dir_id = { db.lock() }.add_sample_file_dir(path).unwrap(); let camera_id = { db.lock() }.add_camera(CameraChange { short_name: "testcam".to_owned(), description: "".to_owned(), @@ -2042,16 +1971,28 @@ mod tests { username: "foo".to_owned(), password: "bar".to_owned(), streams: [ - StreamChange { sample_file_dir_id, rtsp_path: "/main".to_owned(), record: true }, - StreamChange { sample_file_dir_id, rtsp_path: "/sub".to_owned(), record: true }, + StreamChange { + sample_file_dir_id: Some(sample_file_dir_id), + rtsp_path: "/main".to_owned(), + record: true, + flush_if_sec: 0, + }, + StreamChange { + sample_file_dir_id: Some(sample_file_dir_id), + rtsp_path: "/sub".to_owned(), + record: true, + flush_if_sec: 0, + }, ], }).unwrap(); { let mut l = db.lock(); let stream_id = l.cameras_by_id().get(&camera_id).unwrap().streams[0].unwrap(); - let mut tx = l.tx().unwrap(); - tx.update_retention(stream_id, true, 42).unwrap(); - tx.commit().unwrap(); + l.update_retention(&[super::RetentionChange { + stream_id, + new_record: true, + new_limit: 42, + }]).unwrap(); } let camera_uuid = { db.lock().cameras_by_id().get(&camera_id).unwrap().uuid }; assert_no_recordings(&db, camera_uuid); @@ -2061,7 +2002,7 @@ mod tests { let db = Database::new(conn, true).unwrap(); assert_no_recordings(&db, camera_uuid); - assert_eq!(db.lock().list_garbage(sample_file_dir_id.unwrap()).unwrap(), &[]); + // TODO: assert_eq!(db.lock().list_garbage(sample_file_dir_id).unwrap(), &[]); let vse_id = db.lock().insert_video_sample_entry( 1920, 1080, include_bytes!("testdata/avc1").to_vec(), @@ -2071,9 +2012,7 @@ mod tests { // Inserting a recording should succeed and advance the next recording id. let start = recording::Time(1430006400 * TIME_UNITS_PER_SEC); let stream_id = camera_id; // TODO - let id = CompositeId::new(stream_id, 1); let recording = RecordingToInsert { - id, sample_file_bytes: 42, run_offset: 0, flags: 0, @@ -2085,12 +2024,14 @@ mod tests { video_index: [0u8; 100].to_vec(), sample_file_sha1: [0u8; 20], }; - { + let id = { let mut db = db.lock(); - let mut tx = db.tx().unwrap(); - tx.insert_recording(&recording).unwrap(); - tx.commit().unwrap(); - } + let (id, u) = db.add_recording(stream_id).unwrap(); + u.lock().recording = Some(recording.clone()); + u.lock().synced = true; + db.flush("add test").unwrap(); + id + }; assert_eq!(db.lock().streams_by_id().get(&stream_id).unwrap().next_recording_id, 2); // Queries should return the correct result (with caches update on insert). @@ -2108,36 +2049,18 @@ mod tests { let mut v = Vec::new(); db.list_oldest_sample_files(stream_id, |r| { v.push(r); true }).unwrap(); assert_eq!(1, v.len()); - let mut tx = db.tx().unwrap(); - tx.delete_recordings(&v).unwrap(); - tx.commit().unwrap(); + db.delete_recordings(&mut v); + db.flush("delete test").unwrap(); } assert_no_recordings(&db, camera_uuid); - assert_eq!(db.lock().list_garbage(sample_file_dir_id.unwrap()).unwrap(), vec![id]); - } - - #[test] - fn test_drop_tx() { - testutil::init(); - let conn = setup_conn(); - conn.execute("insert into garbage values (1, ?)", &[&CompositeId::new(1, 1).0]).unwrap(); - let db = Database::new(conn, true).unwrap(); - let mut db = db.lock(); - { - let mut tx = db.tx().unwrap(); - tx.mark_sample_files_deleted(&[CompositeId::new(1, 1)]).unwrap(); - // drop tx without committing. - } - - // The dropped tx should have done nothing. - assert_eq!(db.list_garbage(1).unwrap(), &[CompositeId::new(1, 1)]); - - // Following transactions should succeed. - { - let mut tx = db.tx().unwrap(); - tx.mark_sample_files_deleted(&[CompositeId::new(1, 1)]).unwrap(); - tx.commit().unwrap(); - } - assert_eq!(db.list_garbage(1).unwrap(), &[]); + let g: Vec<_> = db.lock() + .sample_file_dirs_by_id() + .get(&sample_file_dir_id) + .unwrap() + .garbage + .iter() + .map(|&id| id) + .collect(); + assert_eq!(&g, &[id]); } } diff --git a/db/dir.rs b/db/dir.rs index 7e1b685..ce431c7 100644 --- a/db/dir.rs +++ b/db/dir.rs @@ -36,6 +36,7 @@ use db::{self, CompositeId}; use failure::{Error, Fail}; use fnv::FnvHashMap; use libc::{self, c_char}; +use parking_lot::Mutex; use protobuf::{self, Message}; use recording; use openssl::hash; @@ -47,7 +48,7 @@ use std::io::{self, Read, Write}; use std::mem; use std::os::unix::ffi::OsStrExt; use std::os::unix::io::FromRawFd; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::sync::mpsc; use std::thread; @@ -62,9 +63,6 @@ pub struct SampleFileDir { /// The open file descriptor for the directory. The worker uses it to create files and sync the /// directory. Other threads use it to open sample files for reading during video serving. fd: Fd, - - // Lock order: don't acquire mutable.lock() while holding db.lock(). - mutable: Mutex, } /// A file descriptor associated with a directory (not necessarily the sample file dir). @@ -199,9 +197,6 @@ impl SampleFileDir { .map_err(|e| format_err!("unable to open sample file dir {}: {}", path, e))?; Ok(Arc::new(SampleFileDir { fd, - mutable: Mutex::new(SharedMutableState{ - next_id_by_stream: FnvHashMap::default(), - }), })) } @@ -258,40 +253,11 @@ impl SampleFileDir { prev: Option, stream_id: i32, video_sample_entry_id: i32) -> Result, Error> { - // Grab the next id. The dir itself will typically have an id (possibly one ahead of what's - // stored in the database), but not on the first attempt for a stream. - use std::collections::hash_map::Entry; - let recording_id; - match self.mutable.lock().unwrap().next_id_by_stream.entry(stream_id) { - Entry::Occupied(mut e) => { - let v = e.get_mut(); - recording_id = *v; - *v += 1; - }, - Entry::Vacant(e) => { - let mut l = db.lock(); - recording_id = l.streams_by_id().get(&stream_id).unwrap().next_recording_id; - e.insert(recording_id + 1); - }, - }; - - let id = CompositeId::new(stream_id, recording_id); + let (id, r) = db.lock().add_recording(stream_id)?; let p = SampleFileDir::get_rel_pathname(id); - - let f = match unsafe { self.fd.openat(p.as_ptr(), - libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, - 0o600) } { - Ok(f) => f, - Err(e) => { - // Put the id back to try again later. - let mut l = self.mutable.lock().unwrap(); - let v = l.next_id_by_stream.get_mut(&stream_id).unwrap(); - assert_eq!(*v, recording_id + 1); - *v -= 1; - return Err(e.into()); - }, - }; - Writer::open(f, id, prev, video_sample_entry_id, channel) + let f = unsafe { self.fd.openat(p.as_ptr(), libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, + 0o600) }.unwrap(); // TODO: don't unwrap! + Writer::open(f, id, r, prev, video_sample_entry_id, channel) } pub fn statfs(&self) -> Result { self.fd.statfs() } @@ -325,16 +291,11 @@ impl SampleFileDir { } } -/// State shared between users of the `SampleFileDirectory` struct and the syncer. -#[derive(Debug)] -struct SharedMutableState { - next_id_by_stream: FnvHashMap, -} - /// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct. enum SyncerCommand { - AsyncSaveRecording(db::RecordingToInsert, fs::File), - AsyncAbandonRecording(CompositeId), + AsyncSaveRecording(CompositeId, Arc>, fs::File), + //AsyncAbandonRecording(CompositeId), + DatabaseFlushed, Flush(mpsc::SyncSender<()>), } @@ -345,20 +306,9 @@ pub struct SyncerChannel(mpsc::Sender); /// State of the worker thread. struct Syncer { + dir_id: i32, dir: Arc, db: Arc, - - /// Files to be unlinked then immediately forgotten about. They are `>= next_recording_id` for - /// their stream, `next_recording_id` won't be advanced without a sync of the directory, and - /// extraneous files `>= next_recording_id` are unlinked on startup, so this should be - /// sufficient. - to_abandon: Vec, - - /// Files to be unlinked then removed from the garbage table. - to_unlink: Vec, - - /// Files to be removed from the garbage table. - to_mark_deleted: Vec, } /// Starts a syncer for the given sample file directory. @@ -371,13 +321,23 @@ struct Syncer { /// /// Returns a `SyncerChannel` which can be used to send commands (and can be cloned freely) and /// a `JoinHandle` for the syncer thread. At program shutdown, all `SyncerChannel` clones should be -/// removed and then the handle joined to allow all recordings to be persisted. +/// dropped and then the handle joined to allow all recordings to be persisted. +/// +/// Note that dropping all `SyncerChannel` clones currently includes calling +/// `LockedDatabase::clear_on_flush`, as this function installs a hook to watch database flushes. +/// TODO: add a join wrapper which arranges for the on flush hook to be removed automatically. pub fn start_syncer(db: Arc, dir_id: i32) -> Result<(SyncerChannel, thread::JoinHandle<()>), Error> { let db2 = db.clone(); let (mut syncer, path) = Syncer::new(&db.lock(), db2, dir_id)?; syncer.initial_rotation()?; let (snd, rcv) = mpsc::channel(); + db.lock().on_flush(Box::new({ + let snd = snd.clone(); + move || if let Err(e) = snd.send(SyncerCommand::DatabaseFlushed) { + warn!("Unable to notify syncer for dir {} of flush: {}", dir_id, e); + } + })); Ok((SyncerChannel(snd), thread::Builder::new() .name(format!("sync-{}", path)) @@ -440,13 +400,14 @@ fn get_rows_to_delete(db: &db::LockedDatabase, stream_id: i32, impl SyncerChannel { /// Asynchronously syncs the given writer, closes it, records it into the database, and /// starts rotation. - fn async_save_recording(&self, recording: db::RecordingToInsert, f: fs::File) { - self.0.send(SyncerCommand::AsyncSaveRecording(recording, f)).unwrap(); + fn async_save_recording(&self, id: CompositeId, recording: Arc>, + f: fs::File) { + self.0.send(SyncerCommand::AsyncSaveRecording(id, recording, f)).unwrap(); } - fn async_abandon_recording(&self, id: CompositeId) { - self.0.send(SyncerCommand::AsyncAbandonRecording(id)).unwrap(); - } + //fn async_abandon_recording(&self, id: CompositeId) { + // self.0.send(SyncerCommand::AsyncAbandonRecording(id)).unwrap(); + //} /// For testing: flushes the syncer, waiting for all currently-queued commands to complete. pub fn flush(&self) { @@ -463,9 +424,8 @@ impl Syncer { .get(&dir_id) .ok_or_else(|| format_err!("no dir {}", dir_id))?; let dir = d.get()?; - let to_unlink = l.list_garbage(dir_id)?; - // Get files to abandon. + // Abandon files. // First, get a list of the streams in question. let streams_to_next: FnvHashMap<_, _> = l.streams_by_id() @@ -479,13 +439,25 @@ impl Syncer { }) .collect(); let to_abandon = Syncer::list_files_to_abandon(&d.path, streams_to_next)?; + let mut undeletable = 0; + for &id in &to_abandon { + if let Err(e) = SampleFileDir::unlink(&dir.fd, id) { + if e.kind() == io::ErrorKind::NotFound { + warn!("dir: abandoned recording {} already deleted!", id); + } else { + warn!("dir: Unable to unlink abandoned recording {}: {}", id, e); + undeletable += 1; + } + } + } + if undeletable > 0 { + bail!("Unable to delete {} abandoned recordings.", undeletable); + } Ok((Syncer { + dir_id, dir, db, - to_abandon, - to_unlink, - to_mark_deleted: Vec::new(), }, d.path.clone())) } @@ -515,8 +487,9 @@ impl Syncer { loop { match cmds.recv() { Err(_) => return, // all senders have closed the channel; shutdown - Ok(SyncerCommand::AsyncSaveRecording(recording, f)) => self.save(recording, f), - Ok(SyncerCommand::AsyncAbandonRecording(uuid)) => self.abandon(uuid), + Ok(SyncerCommand::AsyncSaveRecording(id, r, f)) => self.save(id, r, f), + //Ok(SyncerCommand::AsyncAbandonRecording(uuid)) => self.abandon(uuid), + Ok(SyncerCommand::DatabaseFlushed) => { let _ = self.collect_garbage(true); }, Ok(SyncerCommand::Flush(_)) => {}, // just drop the supplied sender, closing it. }; } @@ -535,106 +508,102 @@ impl Syncer { fn do_rotation(&mut self, get_rows_to_delete: F) -> Result<(), Error> where F: FnOnce(&db::LockedDatabase) -> Result, Error> { - let to_delete = { - let mut db = self.db.lock(); - let to_delete = get_rows_to_delete(&*db)?; - let mut tx = db.tx()?; - tx.delete_recordings(&to_delete)?; - tx.commit()?; - to_delete - }; - for row in to_delete { - self.to_unlink.push(row.id); - } - self.try_unlink(); - if !self.to_unlink.is_empty() { - bail!("failed to unlink {} sample files", self.to_unlink.len()); - } - self.dir.sync()?; { let mut db = self.db.lock(); - let mut tx = db.tx()?; - tx.mark_sample_files_deleted(&self.to_mark_deleted)?; - tx.commit()?; + let mut to_delete = get_rows_to_delete(&*db)?; + db.delete_recordings(&mut to_delete); + db.flush("synchronous deletion")?; } - self.to_mark_deleted.clear(); - Ok(()) + self.collect_garbage(false)?; + self.db.lock().flush("synchronous garbage collection") + } + + fn collect_garbage(&mut self, warn_on_missing: bool) -> Result<(), Error> { + let mut garbage: Vec<_> = { + let l = self.db.lock(); + let d = match l.sample_file_dirs_by_id().get(&self.dir_id) { + None => { + error!("can't find dir {} in db!", self.dir_id); + bail!("can't find dir {} in db!", self.dir_id); + }, + Some(d) => d, + }; + d.garbage.iter().map(|id| *id).collect() + }; + let len_before = garbage.len(); + garbage.retain(|&id| { + if let Err(e) = SampleFileDir::unlink(&self.dir.fd, id) { + if e.kind() == io::ErrorKind::NotFound { + if warn_on_missing { + warn!("dir: recording {} already deleted!", id); + } + } else { + warn!("dir: Unable to unlink {}: {}", id, e); + return false; + } + } + true + }); + let res = if len_before > garbage.len() { + Err(format_err!("Unable to unlink {} files", len_before - garbage.len())) + } else { + Ok(()) + }; + if garbage.is_empty() { + // No progress. + return res; + } + if let Err(e) = self.dir.sync() { + error!("unable to sync dir: {}", e); + return res.and(Err(e.into())); + } + if let Err(e) = self.db.lock().delete_garbage(self.dir_id, &mut garbage) { + error!("unable to delete garbage ({} files) for dir {}: {}", + self.dir_id, garbage.len(), e); + return res.and(Err(e.into())); + } + res } /// Saves the given recording and causes rotation to happen. /// Note that part of rotation is deferred for the next cycle (saved writing or program startup) /// so that there can be only one dir sync and database transaction per save. - fn save(&mut self, recording: db::RecordingToInsert, f: fs::File) { - if let Err(e) = self.save_helper(&recording, f) { - error!("will discard recording {} due to error while saving: {}", recording.id, e); - self.abandon(recording.id); - return; - } - } - - fn abandon(&mut self, id: CompositeId) { - self.to_abandon.push(id); - self.try_unlink(); - } - /// Internal helper for `save`. This is separated out so that the question-mark operator /// can be used in the many error paths. - fn save_helper(&mut self, recording: &db::RecordingToInsert, f: fs::File) - -> Result<(), Error> { - self.try_unlink(); - if !self.to_unlink.is_empty() { - bail!("failed to unlink {} files.", self.to_unlink.len()); - } + /// TODO: less unwrapping! keep a queue? + fn save(&mut self, id: CompositeId, recording: Arc>, + f: fs::File) { + let stream_id = id.stream(); - // XXX: if these calls fail, any other writes are likely to fail as well. - f.sync_all()?; - self.dir.sync()?; - - let mut to_delete = Vec::new(); - let mut db = self.db.lock(); + // Free up a like number of bytes. { - let stream_id = recording.id.stream(); - let stream = - db.streams_by_id().get(&stream_id) - .ok_or_else(|| format_err!("no such stream {}", stream_id))?; - get_rows_to_delete(&db, stream_id, stream, - recording.sample_file_bytes as i64, &mut to_delete)?; + let mut to_delete = Vec::new(); + let len = recording.lock().recording.as_ref().unwrap().sample_file_bytes as i64; + let mut db = self.db.lock(); + { + let stream = db.streams_by_id().get(&stream_id).unwrap(); + get_rows_to_delete(&db, stream_id, stream, len, &mut to_delete).unwrap(); + } + db.delete_recordings(&mut to_delete); } - let mut tx = db.tx()?; - tx.mark_sample_files_deleted(&self.to_mark_deleted)?; - tx.delete_recordings(&to_delete)?; - tx.insert_recording(recording)?; - tx.commit()?; - self.to_mark_deleted.clear(); - self.to_unlink.extend(to_delete.iter().map(|row| row.id)); - self.to_unlink.extend_from_slice(&self.to_abandon); - self.to_abandon.clear(); - Ok(()) - } - - /// Tries to unlink all the files in `self.to_unlink` and `self.to_abandon`. - /// Any which can't be unlinked will be retained in the vec. - fn try_unlink(&mut self) { - let to_mark_deleted = &mut self.to_mark_deleted; - let fd = &self.dir.fd; - for &mut (ref mut v, mark_deleted) in &mut [(&mut self.to_unlink, true), - (&mut self.to_abandon, false)] { - v.retain(|&id| { - if let Err(e) = SampleFileDir::unlink(fd, id) { - if e.kind() == io::ErrorKind::NotFound { - warn!("dir: recording {} already deleted!", id); - } else { - warn!("dir: Unable to unlink {}: {}", id, e); - return true; - } - } - if mark_deleted { - to_mark_deleted.push(id); - } - false - }); - } + f.sync_all().unwrap(); + self.dir.sync().unwrap(); + recording.lock().synced = true; + let mut db = self.db.lock(); + let reason = { + let s = db.streams_by_id().get(&stream_id).unwrap(); + let c = db.cameras_by_id().get(&s.camera_id).unwrap(); + let unflushed = s.unflushed(); + if unflushed < s.flush_if { + debug!("{}-{}: unflushed={} < if={}, not flushing", + c.short_name, s.type_.as_str(), unflushed, s.flush_if); + return; + } + format!("{}-{}: unflushed={} >= if={}", + c.short_name, s.type_.as_str(), unflushed, s.flush_if) + }; + let _ = db.flush(&reason); } } @@ -651,6 +620,7 @@ pub struct Writer<'a>(Option>); struct InnerWriter<'a> { syncer_channel: &'a SyncerChannel, f: fs::File, + r: Arc>, index: recording::SampleIndexEncoder, id: CompositeId, corrupt: bool, @@ -744,11 +714,13 @@ pub struct PreviousWriter { impl<'a> Writer<'a> { /// Opens the writer; for use by `SampleFileDir` (which should supply `f`). - fn open(f: fs::File, id: CompositeId, prev: Option, + fn open(f: fs::File, id: CompositeId, r: Arc>, + prev: Option, video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel) -> Result { Ok(Writer(Some(InnerWriter { syncer_channel, f, + r, index: recording::SampleIndexEncoder::new(), id, corrupt: false, @@ -784,6 +756,7 @@ impl<'a> Writer<'a> { Err(e) => { if remaining.len() < pkt.len() { // Partially written packet. Truncate if possible. + // TODO: have the syncer do this / retry it if necessary? if let Err(e2) = w.f.set_len(w.index.sample_file_bytes as u64) { error!("After write to {} failed with {}, truncate failed with {}; \ sample file is corrupt.", w.id, e, e2); @@ -820,7 +793,7 @@ impl<'a> InnerWriter<'a> { fn close(mut self, next_pts: Option) -> Result { if self.corrupt { - self.syncer_channel.async_abandon_recording(self.id); + //self.syncer_channel.async_abandon_recording(self.id); bail!("recording {} is corrupt", self.id); } let unflushed = @@ -839,8 +812,7 @@ impl<'a> InnerWriter<'a> { let flags = if self.index.has_trailing_zero() { db::RecordingFlags::TrailingZero as i32 } else { 0 }; let local_start_delta = self.local_start - start; - let recording = db::RecordingToInsert{ - id: self.id, + let recording = db::RecordingToInsert { sample_file_bytes: self.index.sample_file_bytes, time: start .. end, local_time_delta: local_start_delta, @@ -852,7 +824,8 @@ impl<'a> InnerWriter<'a> { run_offset: self.run_offset, flags: flags, }; - self.syncer_channel.async_save_recording(recording, self.f); + self.r.lock().recording = Some(recording); + self.syncer_channel.async_save_recording(self.id, self.r, self.f); Ok(PreviousWriter{ end_time: end, local_time_delta: local_start_delta, diff --git a/db/lib.rs b/db/lib.rs index e718042..3e4f263 100644 --- a/db/lib.rs +++ b/db/lib.rs @@ -48,6 +48,7 @@ extern crate uuid; mod coding; pub mod db; pub mod dir; +mod raw; pub mod recording; pub mod schema; diff --git a/db/raw.rs b/db/raw.rs new file mode 100644 index 0000000..a62d29d --- /dev/null +++ b/db/raw.rs @@ -0,0 +1,203 @@ +// This file is part of Moonfire NVR, a security camera digital video recorder. +// Copyright (C) 2018 Scott Lamb +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// In addition, as a special exception, the copyright holders give +// permission to link the code of portions of this program with the +// OpenSSL library under certain conditions as described in each +// individual source file, and distribute linked combinations including +// the two. +// +// You must obey the GNU General Public License in all respects for all +// of the code used other than OpenSSL. If you modify file(s) with this +// exception, you may extend this exception to your version of the +// file(s), but you are not obligated to do so. If you do not wish to do +// so, delete this exception statement from your version. If you delete +// this exception statement from all source files in the program, then +// also delete it here. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Raw database access: SQLite statements which do not touch any cached state. + +use db::{self, CompositeId}; +use failure::Error; +use fnv::FnvHashSet; +use recording; +use rusqlite; +use std::ops::Range; + +const INSERT_RECORDING_SQL: &'static str = r#" + insert into recording (composite_id, stream_id, run_offset, flags, sample_file_bytes, + start_time_90k, duration_90k, local_time_delta_90k, video_samples, + video_sync_samples, video_sample_entry_id) + values (:composite_id, :stream_id, :run_offset, :flags, :sample_file_bytes, + :start_time_90k, :duration_90k, :local_time_delta_90k, + :video_samples, :video_sync_samples, :video_sample_entry_id) +"#; + +const INSERT_RECORDING_PLAYBACK_SQL: &'static str = r#" + insert into recording_playback (composite_id, sample_file_sha1, video_index) + values (:composite_id, :sample_file_sha1, :video_index) +"#; + +const STREAM_MIN_START_SQL: &'static str = r#" + select + start_time_90k + from + recording + where + stream_id = :stream_id + order by start_time_90k limit 1 +"#; + +const STREAM_MAX_START_SQL: &'static str = r#" + select + start_time_90k, + duration_90k + from + recording + where + stream_id = :stream_id + order by start_time_90k desc; +"#; + +/// Inserts the specified recording (for from `try_flush` only). +pub(crate) fn insert_recording(tx: &rusqlite::Transaction, id: CompositeId, + r: &db::RecordingToInsert) -> Result<(), Error> { + if r.time.end < r.time.start { + bail!("end time {} must be >= start time {}", r.time.end, r.time.start); + } + + let mut stmt = tx.prepare_cached(INSERT_RECORDING_SQL)?; + stmt.execute_named(&[ + (":composite_id", &id.0), + (":stream_id", &(id.stream() as i64)), + (":run_offset", &r.run_offset), + (":flags", &r.flags), + (":sample_file_bytes", &r.sample_file_bytes), + (":start_time_90k", &r.time.start.0), + (":duration_90k", &(r.time.end.0 - r.time.start.0)), + (":local_time_delta_90k", &r.local_time_delta.0), + (":video_samples", &r.video_samples), + (":video_sync_samples", &r.video_sync_samples), + (":video_sample_entry_id", &r.video_sample_entry_id), + ])?; + + let mut stmt = tx.prepare_cached(INSERT_RECORDING_PLAYBACK_SQL)?; + let sha1 = &r.sample_file_sha1[..]; + stmt.execute_named(&[ + (":composite_id", &id.0), + (":sample_file_sha1", &sha1), + (":video_index", &r.video_index), + ])?; + Ok(()) +} + +/// Deletes the given recordings from the `recording` and `recording_playback` tables. +/// Note they are not fully removed from the database; the ids are transferred to the +/// `garbage` table. +pub(crate) fn delete_recordings(tx: &rusqlite::Transaction, rows: &[db::ListOldestSampleFilesRow]) + -> Result<(), Error> { + let mut del1 = tx.prepare_cached( + "delete from recording_playback where composite_id = :composite_id")?; + let mut del2 = tx.prepare_cached( + "delete from recording where composite_id = :composite_id")?; + let mut insert = tx.prepare_cached(r#" + insert into garbage (sample_file_dir_id, composite_id) + values (:sample_file_dir_id, :composite_id) + "#)?; + for row in rows { + let changes = del1.execute_named(&[(":composite_id", &row.id.0)])?; + if changes != 1 { + bail!("no such recording_playback {}", row.id); + } + let changes = del2.execute_named(&[(":composite_id", &row.id.0)])?; + if changes != 1 { + bail!("no such recording {}", row.id); + } + insert.execute_named(&[ + (":sample_file_dir_id", &row.sample_file_dir_id), + (":composite_id", &row.id.0)], + )?; + } + Ok(()) +} + +/// Marks the given sample files as deleted. This shouldn't be called until the files have +/// been `unlink()`ed and the parent directory `fsync()`ed. +pub(crate) fn mark_sample_files_deleted(tx: &rusqlite::Transaction, ids: &[CompositeId]) + -> Result<(), Error> { + if ids.is_empty() { return Ok(()); } + let mut stmt = tx.prepare_cached("delete from garbage where composite_id = ?")?; + for &id in ids { + let changes = stmt.execute(&[&id.0])?; + if changes != 1 { + bail!("no garbage row for {}", id); + } + } + Ok(()) +} + +/// Gets the time range of recordings for the given stream. +pub(crate) fn get_range(conn: &rusqlite::Connection, stream_id: i32) + -> Result>, Error> { + // The minimum is straightforward, taking advantage of the start_time_90k index. + let mut stmt = conn.prepare_cached(STREAM_MIN_START_SQL)?; + let mut rows = stmt.query_named(&[(":stream_id", &stream_id)])?; + let min_start = match rows.next() { + Some(row) => recording::Time(row?.get_checked(0)?), + None => return Ok(None), + }; + + // There was a minimum, so there should be a maximum too. Calculating it is less + // straightforward because recordings could overlap. All recordings starting in the + // last MAX_RECORDING_DURATION must be examined in order to take advantage of the + // start_time_90k index. + let mut stmt = conn.prepare_cached(STREAM_MAX_START_SQL)?; + let mut rows = stmt.query_named(&[(":stream_id", &stream_id)])?; + let mut maxes_opt = None; + while let Some(row) = rows.next() { + let row = row?; + let row_start = recording::Time(row.get_checked(0)?); + let row_duration: i64 = row.get_checked(1)?; + let row_end = recording::Time(row_start.0 + row_duration); + let maxes = match maxes_opt { + None => row_start .. row_end, + Some(Range{start: s, end: e}) => s .. ::std::cmp::max(e, row_end), + }; + if row_start.0 <= maxes.start.0 - recording::MAX_RECORDING_DURATION { + break; + } + maxes_opt = Some(maxes); + } + let max_end = match maxes_opt { + Some(Range{start: _, end: e}) => e, + None => bail!("missing max for stream {} which had min {}", stream_id, min_start), + }; + Ok(Some(min_start .. max_end)) +} + +/// Lists all garbage ids for the given sample file directory. +pub(crate) fn list_garbage(conn: &rusqlite::Connection, dir_id: i32) + -> Result, Error> { + let mut garbage = FnvHashSet::default(); + let mut stmt = conn.prepare_cached( + "select composite_id from garbage where sample_file_dir_id = ?")?; + let mut rows = stmt.query(&[&dir_id])?; + while let Some(row) = rows.next() { + let row = row?; + garbage.insert(CompositeId(row.get_checked(0)?)); + } + Ok(garbage) +} diff --git a/db/schema.sql b/db/schema.sql index a8c87ea..97f5c13 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -108,6 +108,11 @@ create table stream ( -- file. Older files will be deleted as necessary to stay within this limit. retain_bytes integer not null check (retain_bytes >= 0), + -- Flush the database when completing a recording if this stream has at + -- least this many seconds of unflushed recordings. A value of 0 means that + -- every completed recording will cause a flush. + flush_if_sec integer not null, + -- The low 32 bits of the next recording id to assign for this stream. -- Typically this is the maximum current recording + 1, but it does -- not decrease if that recording is deleted. diff --git a/db/testutil.rs b/db/testutil.rs index 2c54ca5..b0d1450 100644 --- a/db/testutil.rs +++ b/db/testutil.rs @@ -86,28 +86,29 @@ impl TestDb { let dir; { let mut l = db.lock(); - { - sample_file_dir_id = l.add_sample_file_dir(path.to_owned()).unwrap(); - assert_eq!(TEST_CAMERA_ID, l.add_camera(db::CameraChange { - short_name: "test camera".to_owned(), - description: "".to_owned(), - host: "test-camera".to_owned(), - username: "foo".to_owned(), - password: "bar".to_owned(), - streams: [ - db::StreamChange { - sample_file_dir_id: Some(sample_file_dir_id), - rtsp_path: "/main".to_owned(), - record: true, - }, - Default::default(), - ], - }).unwrap()); - test_camera_uuid = l.cameras_by_id().get(&TEST_CAMERA_ID).unwrap().uuid; - let mut tx = l.tx().unwrap(); - tx.update_retention(TEST_STREAM_ID, true, 1048576).unwrap(); - tx.commit().unwrap(); - } + sample_file_dir_id = l.add_sample_file_dir(path.to_owned()).unwrap(); + assert_eq!(TEST_CAMERA_ID, l.add_camera(db::CameraChange { + short_name: "test camera".to_owned(), + description: "".to_owned(), + host: "test-camera".to_owned(), + username: "foo".to_owned(), + password: "bar".to_owned(), + streams: [ + db::StreamChange { + sample_file_dir_id: Some(sample_file_dir_id), + rtsp_path: "/main".to_owned(), + record: true, + flush_if_sec: 0, + }, + Default::default(), + ], + }).unwrap()); + test_camera_uuid = l.cameras_by_id().get(&TEST_CAMERA_ID).unwrap().uuid; + l.update_retention(&[db::RetentionChange { + stream_id: TEST_STREAM_ID, + new_record: true, + new_limit: 1048576, + }]).unwrap(); dir = l.sample_file_dirs_by_id().get(&sample_file_dir_id).unwrap().get().unwrap(); } let mut dirs_by_stream_id = FnvHashMap::default(); @@ -129,28 +130,25 @@ impl TestDb { let mut db = self.db.lock(); let video_sample_entry_id = db.insert_video_sample_entry( 1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap(); - let next = db.streams_by_id().get(&TEST_STREAM_ID).unwrap().next_recording_id; - { - let mut tx = db.tx().unwrap(); - const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); - tx.insert_recording(&db::RecordingToInsert { - id: db::CompositeId::new(TEST_STREAM_ID, next), - sample_file_bytes: encoder.sample_file_bytes, - time: START_TIME .. - START_TIME + recording::Duration(encoder.total_duration_90k as i64), - local_time_delta: recording::Duration(0), - video_samples: encoder.video_samples, - video_sync_samples: encoder.video_sync_samples, - video_sample_entry_id: video_sample_entry_id, - video_index: encoder.video_index, - sample_file_sha1: [0u8; 20], - run_offset: 0, - flags: db::RecordingFlags::TrailingZero as i32, - }).unwrap(); - tx.commit().unwrap(); - } + const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); + let (id, u) = db.add_recording(TEST_STREAM_ID).unwrap(); + u.lock().recording = Some(db::RecordingToInsert { + sample_file_bytes: encoder.sample_file_bytes, + time: START_TIME .. + START_TIME + recording::Duration(encoder.total_duration_90k as i64), + local_time_delta: recording::Duration(0), + video_samples: encoder.video_samples, + video_sync_samples: encoder.video_sync_samples, + video_sample_entry_id: video_sample_entry_id, + video_index: encoder.video_index, + sample_file_sha1: [0u8; 20], + run_offset: 0, + flags: db::RecordingFlags::TrailingZero as i32, + }); + u.lock().synced = true; + db.flush("create_recording_from_encoder").unwrap(); let mut row = None; - db.list_recordings_by_id(TEST_STREAM_ID, next .. next+1, + db.list_recordings_by_id(TEST_STREAM_ID, id.recording() .. id.recording()+1, |r| { row = Some(r); Ok(()) }).unwrap(); row.unwrap() } diff --git a/guide/schema.md b/guide/schema.md index daf82d2..1e21bee 100644 --- a/guide/schema.md +++ b/guide/schema.md @@ -207,6 +207,8 @@ is never used. Version 3 adds over version 1: * recording of sub streams (splits a new `stream` table out of `camera`) +* a per-stream knob `flush_if_sec` meant to reduce database commits (and + thus SSD write cycles). This improves practicality of many streams. * support for multiple sample file directories, to take advantage of multiple hard drives (or multiple RAID volumes). * an interlock between database and sample file directories to avoid various diff --git a/src/clock.rs b/src/clock.rs index ef60898..7e547dd 100644 --- a/src/clock.rs +++ b/src/clock.rs @@ -31,7 +31,7 @@ //! Clock interface and implementations for testability. use libc; -#[cfg(test)] use std::sync::Mutex; +#[cfg(test)] use parking_lot::Mutex; use std::mem; use std::thread; use time::{Duration, Timespec}; @@ -123,12 +123,12 @@ impl SimulatedClocks { #[cfg(test)] impl Clocks for SimulatedClocks { - fn realtime(&self) -> Timespec { self.boot + *self.uptime.lock().unwrap() } - fn monotonic(&self) -> Timespec { Timespec::new(0, 0) + *self.uptime.lock().unwrap() } + fn realtime(&self) -> Timespec { self.boot + *self.uptime.lock() } + fn monotonic(&self) -> Timespec { Timespec::new(0, 0) + *self.uptime.lock() } /// Advances the clock by the specified amount without actually sleeping. fn sleep(&self, how_long: Duration) { - let mut l = self.uptime.lock().unwrap(); + let mut l = self.uptime.lock(); *l = *l + how_long; } } diff --git a/src/cmds/config/cameras.rs b/src/cmds/config/cameras.rs index f6358b7..1346ab9 100644 --- a/src/cmds/config/cameras.rs +++ b/src/cmds/config/cameras.rs @@ -36,6 +36,7 @@ use self::cursive::views; use db::{self, dir}; use failure::Error; use std::collections::BTreeMap; +use std::str::FromStr; use std::sync::Arc; use stream::{self, Opener, Stream}; use super::{decode_size, encode_size}; @@ -62,6 +63,9 @@ fn get_change(siv: &mut Cursive) -> db::CameraChange { .unwrap().get_content().as_str().into(); let r = siv.find_id::(&format!("{}_record", t.as_str())) .unwrap().is_checked(); + let f = i64::from_str(siv.find_id::( + &format!("{}_flush_if_sec", t.as_str())).unwrap().get_content().as_str()) + .unwrap_or(0); let d = *siv.find_id::>>( &format!("{}_sample_file_dir", t.as_str())) .unwrap().selection(); @@ -69,6 +73,7 @@ fn get_change(siv: &mut Cursive) -> db::CameraChange { rtsp_path: p, sample_file_dir_id: d, record: r, + flush_if_sec: f, }; } c @@ -270,9 +275,11 @@ fn edit_camera_dialog(db: &Arc, siv: &mut Cursive, item: &Option, siv: &mut Cursive, item: &Option>| v.set_selection(selected_dir)); diff --git a/src/cmds/config/dirs.rs b/src/cmds/config/dirs.rs index 0b51a9a..7a31280 100644 --- a/src/cmds/config/dirs.rs +++ b/src/cmds/config/dirs.rs @@ -60,12 +60,15 @@ struct Model { /// Updates the limits in the database. Doesn't delete excess data (if any). fn update_limits_inner(model: &Model) -> Result<(), Error> { - let mut db = model.db.lock(); - let mut tx = db.tx()?; - for (&id, stream) in &model.streams { - tx.update_retention(id, stream.record, stream.retain.unwrap())?; + let mut changes = Vec::with_capacity(model.streams.len()); + for (&stream_id, stream) in &model.streams { + changes.push(db::RetentionChange { + stream_id, + new_record: stream.record, + new_limit: stream.retain.unwrap(), + }); } - tx.commit() + model.db.lock().update_retention(&changes) } fn update_limits(model: &Model, siv: &mut Cursive) { diff --git a/src/cmds/run.rs b/src/cmds/run.rs index 34dd96b..73cee32 100644 --- a/src/cmds/run.rs +++ b/src/cmds/run.rs @@ -199,6 +199,9 @@ pub fn run() -> Result<(), Error> { } if let Some(mut ss) = syncers { + // The syncers shut down when all channels to them have been dropped. + // The database maintains one; and `ss` holds one. Drop both. + db.lock().clear_on_flush(); for (_, s) in ss.drain() { drop(s.channel); s.join.join().unwrap(); diff --git a/src/cmds/upgrade/v1_to_v2.rs b/src/cmds/upgrade/v1_to_v2.rs index d672a97..ff908bb 100644 --- a/src/cmds/upgrade/v1_to_v2.rs +++ b/src/cmds/upgrade/v1_to_v2.rs @@ -159,6 +159,7 @@ impl<'a> super::Upgrader for U<'a> { record integer not null check (record in (1, 0)), rtsp_path text not null, retain_bytes integer not null check (retain_bytes >= 0), + flush_if_sec integer not null, next_recording_id integer not null check (next_recording_id >= 0), unique (camera_id, type) ); @@ -227,6 +228,7 @@ impl<'a> super::Upgrader for U<'a> { 1, old_camera.main_rtsp_path, old_camera.retain_bytes, + 0, old_camera.next_recording_id from old_camera cross join sample_file_dir; @@ -241,7 +243,8 @@ impl<'a> super::Upgrader for U<'a> { 0, old_camera.sub_rtsp_path, 0, - 0 + 60, + 1 from old_camera cross join sample_file_dir where diff --git a/src/json.rs b/src/json.rs index ba60d21..cc9ca40 100644 --- a/src/json.rs +++ b/src/json.rs @@ -34,7 +34,7 @@ use serde::ser::{SerializeMap, SerializeSeq, Serializer}; use std::collections::BTreeMap; use uuid::Uuid; -#[derive(Debug, Serialize)] +#[derive(Serialize)] #[serde(rename_all="camelCase")] pub struct TopLevel<'a> { pub time_zone_name: &'a str, diff --git a/src/mp4.rs b/src/mp4.rs index 72a8fed..69feeca 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -2105,6 +2105,7 @@ mod tests { const EXPECTED_ETAG: &'static str = "c56ef7eb3b4a713ceafebc3dc7958bd9e62a2fae"; assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); drop(db.syncer_channel); + db.db.lock().clear_on_flush(); db.syncer_join.join().unwrap(); } @@ -2125,6 +2126,7 @@ mod tests { const EXPECTED_ETAG: &'static str = "3bdc2c8ce521df50155d0ca4d7497ada448fa7c3"; assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); drop(db.syncer_channel); + db.db.lock().clear_on_flush(); db.syncer_join.join().unwrap(); } @@ -2145,6 +2147,7 @@ mod tests { const EXPECTED_ETAG: &'static str = "3986d3bd9b866c3455fb7359fb134aa2d9107af7"; assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); drop(db.syncer_channel); + db.db.lock().clear_on_flush(); db.syncer_join.join().unwrap(); } @@ -2165,6 +2168,7 @@ mod tests { const EXPECTED_ETAG: &'static str = "9e789398c9a71ca834fec8fbc55b389f99d12dda"; assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); drop(db.syncer_channel); + db.db.lock().clear_on_flush(); db.syncer_join.join().unwrap(); } } diff --git a/src/streamer.rs b/src/streamer.rs index a5ec5d3..9991725 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -205,8 +205,9 @@ mod tests { use failure::Error; use h264; use moonfire_ffmpeg; + use parking_lot::Mutex; use std::cmp; - use std::sync::{Arc, Mutex}; + use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use stream::{self, Opener, Stream}; use time; @@ -290,7 +291,7 @@ mod tests { stream::Source::Rtsp(url) => assert_eq!(url, &self.expected_url), stream::Source::File(_) => panic!("expected rtsp url"), }; - let mut l = self.streams.lock().unwrap(); + let mut l = self.streams.lock(); match l.pop() { Some(stream) => { trace!("MockOpener returning next stream"); @@ -361,7 +362,7 @@ mod tests { testutil::TEST_STREAM_ID, camera, s, 0, 3); } stream.run(); - assert!(opener.streams.lock().unwrap().is_empty()); + assert!(opener.streams.lock().is_empty()); db.syncer_channel.flush(); let db = db.db.lock();