diff --git a/db/db.rs b/db/db.rs index 46bd3cc..4b393a9 100644 --- a/db/db.rs +++ b/db/db.rs @@ -48,19 +48,21 @@ //! the critical path of recording frames. The caller should preallocate sample file uuids //! and such to avoid database operations in these paths. //! -//! * the `Transaction` interface allows callers to batch write operations to reduce latency and -//! SSD write cycles. +//! * adding and removing recordings done during normal operations use a batch interface. +//! A list of mutations is built up in-memory and occasionally flushed to reduce SSD write +//! cycles. use dir; use failure::Error; -use fnv::{self, FnvHashMap}; +use fnv::{self, FnvHashMap, FnvHashSet}; use lru_cache::LruCache; use openssl::hash; use parking_lot::{Mutex,MutexGuard}; +use raw; use recording::{self, TIME_UNITS_PER_SEC}; use rusqlite; use schema; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use std::collections::btree_map; use std::cell::RefCell; use std::cmp; @@ -86,32 +88,11 @@ const GET_RECORDING_PLAYBACK_SQL: &'static str = r#" composite_id = :composite_id "#; -const DELETE_GARBAGE_SQL: &'static str = - "delete from garbage where composite_id = :composite_id"; - -const INSERT_GARBAGE_SQL: &'static str = - "insert into garbage (sample_file_dir_id, composite_id) - values (:sample_file_dir_id, :composite_id)"; - const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#" insert into video_sample_entry (sha1, width, height, rfc6381_codec, data) values (:sha1, :width, :height, :rfc6381_codec, :data) "#; -const INSERT_RECORDING_SQL: &'static str = r#" - insert into recording (composite_id, stream_id, run_offset, flags, sample_file_bytes, - start_time_90k, duration_90k, local_time_delta_90k, video_samples, - video_sync_samples, video_sample_entry_id) - values (:composite_id, :stream_id, :run_offset, :flags, :sample_file_bytes, - :start_time_90k, :duration_90k, :local_time_delta_90k, - :video_samples, :video_sync_samples, :video_sample_entry_id) -"#; - -const INSERT_RECORDING_PLAYBACK_SQL: &'static str = r#" - insert into recording_playback (composite_id, sample_file_sha1, video_index) - values (:composite_id, :sample_file_sha1, :video_index) -"#; - const UPDATE_NEXT_RECORDING_ID_SQL: &'static str = "update stream set next_recording_id = :next_recording_id where id = :stream_id"; @@ -130,35 +111,6 @@ const LIST_OLDEST_SAMPLE_FILES_SQL: &'static str = r#" composite_id "#; -const DELETE_RECORDING_SQL: &'static str = r#" - delete from recording where composite_id = :composite_id -"#; - -const DELETE_RECORDING_PLAYBACK_SQL: &'static str = r#" - delete from recording_playback where composite_id = :composite_id -"#; - -const STREAM_MIN_START_SQL: &'static str = r#" - select - start_time_90k - from - recording - where - stream_id = :stream_id - order by start_time_90k limit 1 -"#; - -const STREAM_MAX_START_SQL: &'static str = r#" - select - start_time_90k, - duration_90k - from - recording - where - stream_id = :stream_id - order by start_time_90k desc; -"#; - const LIST_RECORDINGS_BY_ID_SQL: &'static str = r#" select recording.composite_id, @@ -257,9 +209,8 @@ pub enum RecordingFlags { } /// A recording to pass to `insert_recording`. -#[derive(Debug)] -pub struct RecordingToInsert { - pub id: CompositeId, +#[derive(Clone, Debug)] +pub(crate) struct RecordingToInsert { pub run_offset: i32, pub flags: i32, pub sample_file_bytes: i32, @@ -274,8 +225,9 @@ pub struct RecordingToInsert { /// A row used in `list_oldest_sample_files`. #[derive(Debug)] -pub struct ListOldestSampleFilesRow { +pub(crate) struct ListOldestSampleFilesRow { pub id: CompositeId, + pub sample_file_dir_id: i32, pub time: Range, pub sample_file_bytes: i32, } @@ -329,6 +281,8 @@ pub struct SampleFileDir { pub uuid: Uuid, dir: Option>, last_complete_open: Option, + to_gc: Vec, + pub(crate) garbage: FnvHashSet, } impl SampleFileDir { @@ -407,6 +361,10 @@ pub struct Stream { pub type_: StreamType, pub rtsp_path: String, pub retain_bytes: i64, + pub flush_if_sec: i64, + + /// `flush_if_sec` converted to a duration for convenience. + pub flush_if: recording::Duration, /// The time range of recorded data associated with this stream (minimum start time and maximum /// end time). `None` iff there are no recordings for this camera. @@ -420,7 +378,48 @@ pub struct Stream { /// Mapping of calendar day (in the server's time zone) to a summary of recordings on that day. pub days: BTreeMap, pub record: bool, - pub next_recording_id: i32, + + /// The `next_recording_id` currently committed to the database. + pub(crate) next_recording_id: i32, + + /// The recordings which have been added via `LockedDatabase::add_recording` but have yet to + /// committed to the database. + /// + /// `uncommitted[i]` uses sample filename `CompositeId::new(id, next_recording_id + 1)`; + /// `next_recording_id` should be advanced when one is committed to maintain this invariant. + /// + /// TODO: alter the serving path to show these just as if they were already committed. + uncommitted: VecDeque>>, +} + +impl Stream { + /// Returns the duration of synced but uncommitted recordings for the given stream. + /// Note recordings must be flushed in order, so a recording is considered unsynced if any + /// before it are unsynced. + pub(crate) fn unflushed(&self) -> recording::Duration { + let mut dur = recording::Duration(0); + for u in &self.uncommitted { + let l = u.lock(); + if !l.synced { + break; + } + if let Some(ref r) = l.recording { + dur += r.time.end - r.time.start; + } + } + dur + } +} + +#[derive(Debug)] +pub(crate) struct UncommittedRecording { + /// If this recording has been synced to disk and thus is ready to commit to the database. + /// `recording` should not be modified after this is set to true. + pub(crate) synced: bool, + + /// The recording to add. Absent iff the recording has been abandoned. + /// TODO: modify `SampleIndexEncoder` to update this record as it goes. + pub(crate) recording: Option, } #[derive(Debug, Default)] @@ -428,6 +427,7 @@ pub struct StreamChange { pub sample_file_dir_id: Option, pub rtsp_path: String, pub record: bool, + pub flush_if_sec: i64, } /// Information about a camera, used by `add_camera` and `update_camera`. @@ -564,17 +564,8 @@ fn init_recordings(conn: &mut rusqlite::Connection, stream_id: i32, camera: &Cam Ok(()) } -#[derive(Debug)] pub struct LockedDatabase { conn: rusqlite::Connection, - state: State, -} - -/// In-memory state from the database. -/// This is separated out of `LockedDatabase` so that `Transaction` can mutably borrow `state` -/// while its underlying `rusqlite::Transaction` is borrowing `conn`. -#[derive(Debug)] -struct State { uuid: Uuid, /// If the database is open in read-write mode, the information about the current Open row. @@ -582,10 +573,15 @@ struct State { sample_file_dirs_by_id: BTreeMap, cameras_by_id: BTreeMap, streams_by_id: BTreeMap, - cameras_by_uuid: BTreeMap, + cameras_by_uuid: BTreeMap, // values are ids. video_sample_entries: BTreeMap>, list_recordings_by_time_sql: String, video_index_cache: RefCell, fnv::FnvBuildHasher>>, + on_flush: Vec>, + + /// Recordings which have been enqueued for deletion via `LockedDatabase::delete_recordings` + /// but have yet to be committed. + to_delete: Vec, } #[derive(Copy, Clone, Debug)] @@ -594,48 +590,14 @@ struct Open { uuid: Uuid, } -/// A high-level transaction. This manages the SQLite transaction and the matching modification to -/// be applied to the in-memory state on successful commit. -pub struct Transaction<'a> { - state: &'a mut State, - mods_by_stream: FnvHashMap, - tx: rusqlite::Transaction<'a>, - - /// True if due to an earlier error the transaction must be rolled back rather than committed. - /// Insert and delete are multi-part. If later parts fail, earlier parts should be aborted as - /// well. We could use savepoints (nested transactions) for this, but for simplicity we just - /// require the entire transaction be rolled back. - must_rollback: bool, -} - -/// A modification to be done to a `Stream` after a `Transaction` is committed. +/// A modification to be done to a `Stream`, used within `LockedDatabase::flush`. #[derive(Default)] struct StreamModification { - /// Add this to `camera.duration`. Thus, positive values indicate a net addition; - /// negative values indicate a net subtraction. - duration: recording::Duration, - - /// Add this to `camera.sample_file_bytes`. - sample_file_bytes: i64, - - /// Add this to `stream.days`. - days: BTreeMap, - - /// Reset the Stream range to this value. This should be populated immediately prior to the - /// commit. + num_recordings_to_commit: i32, range: Option>, - - /// Reset the next_recording_id to the specified value. - new_next_recording_id: Option, - - /// Reset the retain_bytes to the specified value. - new_retain_bytes: Option, - - /// Reset the record to the specified value. - new_record: Option, } -#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] pub struct CompositeId(pub i64); impl CompositeId { @@ -653,231 +615,6 @@ impl ::std::fmt::Display for CompositeId { } } -impl<'a> Transaction<'a> { - /// Deletes the given recordings from the `recording` and `recording_playback` tables. - /// Note they are not fully removed from the database; the uuids are transferred to the - /// `garbage` table. The caller should `unlink` the files, then remove the `garbage` row. - pub fn delete_recordings(&mut self, rows: &[ListOldestSampleFilesRow]) -> Result<(), Error> { - let mut del1 = self.tx.prepare_cached(DELETE_RECORDING_PLAYBACK_SQL)?; - let mut del2 = self.tx.prepare_cached(DELETE_RECORDING_SQL)?; - let mut insert = self.tx.prepare_cached(INSERT_GARBAGE_SQL)?; - - self.check_must_rollback()?; - self.must_rollback = true; - for row in rows { - let changes = del1.execute_named(&[(":composite_id", &row.id.0)])?; - if changes != 1 { - bail!("no such recording {}", row.id); - } - let changes = del2.execute_named(&[(":composite_id", &row.id.0)])?; - if changes != 1 { - bail!("no such recording_playback {}", row.id); - } - let sid = row.id.stream(); - let did = self.state - .streams_by_id - .get(&sid) - .ok_or_else(|| format_err!("no such stream {}", sid))? - .sample_file_dir_id - .ok_or_else(|| format_err!("stream {} has no dir", sid))?; - insert.execute_named(&[ - (":sample_file_dir_id", &did), - (":composite_id", &row.id.0)], - )?; - let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, row.id.stream()); - m.duration -= row.time.end - row.time.start; - m.sample_file_bytes -= row.sample_file_bytes as i64; - adjust_days(row.time.clone(), -1, &mut m.days); - } - self.must_rollback = false; - Ok(()) - } - - /// Marks the given sample files as deleted. This shouldn't be called until the files have - /// been `unlink()`ed and the parent directory `fsync()`ed. - pub fn mark_sample_files_deleted(&mut self, ids: &[CompositeId]) -> Result<(), Error> { - if ids.is_empty() { return Ok(()); } - let mut stmt = self.tx.prepare_cached(DELETE_GARBAGE_SQL)?; - for &id in ids { - let changes = stmt.execute_named(&[(":composite_id", &id.0)])?; - if changes != 1 { - bail!("no garbage row for {}", id); - } - } - Ok(()) - } - - /// Inserts the specified recording. - pub fn insert_recording(&mut self, r: &RecordingToInsert) -> Result<(), Error> { - self.check_must_rollback()?; - - if r.time.end < r.time.start { - bail!("end time {} must be >= start time {}", r.time.end, r.time.start); - } - - // Check that the recording id is acceptable and do the insertion. - let stream = match self.state.streams_by_id.get(&r.id.stream()) { - None => bail!("no such stream id {}", r.id.stream()), - Some(s) => s, - }; - self.must_rollback = true; - let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, r.id.stream()); - { - let next = m.new_next_recording_id.unwrap_or(stream.next_recording_id); - if r.id.recording() < next { - bail!("recording {} out of order; next id is {}!", r.id, next); - } - let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_SQL)?; - stmt.execute_named(&[ - (":composite_id", &r.id.0), - (":stream_id", &(r.id.stream() as i64)), - (":run_offset", &r.run_offset), - (":flags", &r.flags), - (":sample_file_bytes", &r.sample_file_bytes), - (":start_time_90k", &r.time.start.0), - (":duration_90k", &(r.time.end.0 - r.time.start.0)), - (":local_time_delta_90k", &r.local_time_delta.0), - (":video_samples", &r.video_samples), - (":video_sync_samples", &r.video_sync_samples), - (":video_sample_entry_id", &r.video_sample_entry_id), - ])?; - m.new_next_recording_id = Some(r.id.recording() + 1); - let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_PLAYBACK_SQL)?; - let sha1 = &r.sample_file_sha1[..]; - stmt.execute_named(&[ - (":composite_id", &r.id.0), - (":sample_file_sha1", &sha1), - (":video_index", &r.video_index), - ])?; - let mut stmt = self.tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?; - stmt.execute_named(&[ - (":stream_id", &(r.id.stream() as i64)), - (":next_recording_id", &m.new_next_recording_id), - ])?; - } - self.must_rollback = false; - m.duration += r.time.end - r.time.start; - m.sample_file_bytes += r.sample_file_bytes as i64; - adjust_days(r.time.clone(), 1, &mut m.days); - Ok(()) - } - - /// Updates the `record` and `retain_bytes` for the given stream. - /// Note this just resets the limit in the database; it's the caller's responsibility to ensure - /// current usage is under the new limit if desired. - pub fn update_retention(&mut self, stream_id: i32, new_record: bool, new_limit: i64) - -> Result<(), Error> { - if new_limit < 0 { - bail!("can't set limit for stream {} to {}; must be >= 0", stream_id, new_limit); - } - self.check_must_rollback()?; - let mut stmt = self.tx.prepare_cached(r#" - update stream - set - record = :record, - retain_bytes = :retain - where - id = :id - "#)?; - let changes = stmt.execute_named(&[ - (":record", &new_record), - (":retain", &new_limit), - (":id", &stream_id), - ])?; - if changes != 1 { - bail!("no such stream {}", stream_id); - } - let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, stream_id); - m.new_record = Some(new_record); - m.new_retain_bytes = Some(new_limit); - Ok(()) - } - - /// Commits these changes, consuming the Transaction. - pub fn commit(mut self) -> Result<(), Error> { - self.check_must_rollback()?; - self.precommit()?; - self.tx.commit()?; - for (&stream_id, m) in &self.mods_by_stream { - let stream = self.state.streams_by_id.get_mut(&stream_id) - .expect("modified stream must exist"); - stream.duration += m.duration; - stream.sample_file_bytes += m.sample_file_bytes; - for (k, v) in &m.days { - adjust_day(*k, *v, &mut stream.days); - } - stream.range = m.range.clone(); - if let Some(id) = m.new_next_recording_id { - stream.next_recording_id = id; - } - if let Some(r) = m.new_record { - stream.record = r; - } - if let Some(b) = m.new_retain_bytes { - stream.retain_bytes = b; - } - } - Ok(()) - } - - /// Raises an error if `must_rollback` is true. To be used on commit and in modifications. - fn check_must_rollback(&self) -> Result<(), Error> { - if self.must_rollback { - bail!("failing due to previous error"); - } - Ok(()) - } - - /// Looks up an existing entry in `mods` for a given stream or makes+inserts an identity entry. - fn get_mods_by_stream(mods: &mut FnvHashMap, stream_id: i32) - -> &mut StreamModification { - mods.entry(stream_id).or_insert_with(StreamModification::default) - } - - /// Fills the `range` of each `StreamModification`. This is done prior to commit so that if the - /// commit succeeds, there's no possibility that the correct state can't be retrieved. - fn precommit(&mut self) -> Result<(), Error> { - // Recompute start and end times for each camera. - for (&stream_id, m) in &mut self.mods_by_stream { - // The minimum is straightforward, taking advantage of the start_time_90k index. - let mut stmt = self.tx.prepare_cached(STREAM_MIN_START_SQL)?; - let mut rows = stmt.query_named(&[(":stream_id", &stream_id)])?; - let min_start = match rows.next() { - Some(row) => recording::Time(row?.get_checked(0)?), - None => continue, // no data; leave m.range alone. - }; - - // There was a minimum, so there should be a maximum too. Calculating it is less - // straightforward because recordings could overlap. All recordings starting in the - // last MAX_RECORDING_DURATION must be examined in order to take advantage of the - // start_time_90k index. - let mut stmt = self.tx.prepare_cached(STREAM_MAX_START_SQL)?; - let mut rows = stmt.query_named(&[(":stream_id", &stream_id)])?; - let mut maxes_opt = None; - while let Some(row) = rows.next() { - let row = row?; - let row_start = recording::Time(row.get_checked(0)?); - let row_duration: i64 = row.get_checked(1)?; - let row_end = recording::Time(row_start.0 + row_duration); - let maxes = match maxes_opt { - None => row_start .. row_end, - Some(Range{start: s, end: e}) => s .. cmp::max(e, row_end), - }; - if row_start.0 <= maxes.start.0 - recording::MAX_RECORDING_DURATION { - break; - } - maxes_opt = Some(maxes); - } - let max_end = match maxes_opt { - Some(Range{end: e, ..}) => e, - None => bail!("missing max for stream {} which had min {}", stream_id, min_start), - }; - m.range = Some(min_start .. max_end); - } - Ok(()) - } -} - /// Inserts, updates, or removes streams in the `State` object to match a set of `StreamChange` /// structs. struct StreamStateChanger { @@ -941,6 +678,9 @@ impl StreamStateChanger { sample_file_dir_id: sc.sample_file_dir_id, rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()), record: sc.record, + flush_if_sec: sc.flush_if_sec, + flush_if: recording::Duration(sc.flush_if_sec * + recording::TIME_UNITS_PER_SEC), ..s }))); } @@ -952,9 +692,9 @@ impl StreamStateChanger { // Insert stream. let mut stmt = tx.prepare_cached(r#" insert into stream (camera_id, sample_file_dir_id, type, rtsp_path, record, - retain_bytes, next_recording_id) + retain_bytes, flush_if_sec, next_recording_id) values (:camera_id, :sample_file_dir_id, :type, :rtsp_path, :record, - 0, 1) + 0, :flush_if_sec, 1) "#)?; let type_ = StreamType::from_index(i).unwrap(); stmt.execute_named(&[ @@ -963,6 +703,7 @@ impl StreamStateChanger { (":type", &type_.as_str()), (":rtsp_path", &sc.rtsp_path), (":record", &sc.record), + (":flush_if_sec", &sc.flush_if_sec), ])?; let id = tx.last_insert_rowid() as i32; sids[i] = Some(id); @@ -973,12 +714,15 @@ impl StreamStateChanger { sample_file_dir_id: sc.sample_file_dir_id, rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()), retain_bytes: 0, + flush_if_sec: sc.flush_if_sec, + flush_if: recording::Duration(sc.flush_if_sec * recording::TIME_UNITS_PER_SEC), range: None, sample_file_bytes: 0, duration: recording::Duration(0), days: BTreeMap::new(), record: sc.record, next_recording_id: 1, + uncommitted: VecDeque::new(), }))); } } @@ -1004,11 +748,156 @@ impl StreamStateChanger { } } +/// A retention change as expected by `LockedDatabase::update_retention`. +pub struct RetentionChange { + pub stream_id: i32, + pub new_record: bool, + pub new_limit: i64, +} + impl LockedDatabase { /// Returns an immutable view of the cameras by id. - pub fn cameras_by_id(&self) -> &BTreeMap { &self.state.cameras_by_id } + pub fn cameras_by_id(&self) -> &BTreeMap { &self.cameras_by_id } pub fn sample_file_dirs_by_id(&self) -> &BTreeMap { - &self.state.sample_file_dirs_by_id + &self.sample_file_dirs_by_id + } + + pub(crate) fn add_recording(&mut self, stream_id: i32) + -> Result<(CompositeId, Arc>), Error> { + let stream = match self.streams_by_id.get_mut(&stream_id) { + None => bail!("no such stream {}", stream_id), + Some(s) => s, + }; + let id = CompositeId::new(stream_id, + stream.next_recording_id + (stream.uncommitted.len() as i32)); + let recording = Arc::new(Mutex::new(UncommittedRecording { + synced: false, + recording: None, + })); + stream.uncommitted.push_back(Arc::clone(&recording)); + Ok((id, recording)) + } + + pub(crate) fn delete_recordings(&mut self, rows: &mut Vec) { + self.to_delete.append(rows); + } + + pub(crate) fn delete_garbage(&mut self, dir_id: i32, ids: &mut Vec) + -> Result<(), Error> { + let dir = match self.sample_file_dirs_by_id.get_mut(&dir_id) { + None => bail!("no such dir {}", dir_id), + Some(d) => d, + }; + dir.to_gc.append(ids); + Ok(()) + } + + /// Tries to flush unwritten changes from the stream directories. + /// + /// * commits any recordings added with `add_recording` that have since been marked as + /// synced. + /// * moves old recordings to the garbage table as requested by `delete_recordings`. + /// * removes entries from the garbage table as requested by `mark_sample_files_deleted`. + /// + /// On success, for each affected sample file directory with a flush watcher set, sends a + /// `Flush` event. + pub(crate) fn flush(&mut self, reason: &str) -> Result<(), Error> { + let tx = self.conn.transaction()?; + let mut mods = FnvHashMap::with_capacity_and_hasher(self.streams_by_id.len(), + Default::default()); + raw::delete_recordings(&tx, &self.to_delete)?; + for row in &self.to_delete { + // Add a placeholder for recomputing the range. + mods.entry(row.id.stream()).or_insert_with(StreamModification::default); + + let dir = match self.sample_file_dirs_by_id.get_mut(&row.sample_file_dir_id) { + None => bail!("Row refers to nonexistent sample file dir: {:#?}", row), + Some(d) => d, + }; + dir.garbage.insert(row.id); + } + { + let mut stmt = tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?; + for (&stream_id, s) in &self.streams_by_id { + let mut i = 0; + for recording in &s.uncommitted { + let l = recording.lock(); + if !l.synced { break; } + if let Some(ref r) = l.recording { + raw::insert_recording( + &tx, CompositeId::new(stream_id, s.next_recording_id + i), &r)?; + } + i += 1; + } + if i > 0 { + let m = mods.entry(stream_id).or_insert_with(StreamModification::default); + m.num_recordings_to_commit = i; + stmt.execute_named(&[ + (":stream_id", &stream_id), + (":next_recording_id", &(s.next_recording_id + i)), + ])?; + } + } + } + for dir in self.sample_file_dirs_by_id.values() { + raw::mark_sample_files_deleted(&tx, &dir.to_gc)?; + } + for (&stream_id, m) in &mut mods { + m.range = raw::get_range(&tx, stream_id)?; + } + tx.commit()?; + + // Process delete_recordings. + let deleted = self.to_delete.len(); + for row in self.to_delete.drain(..) { + let s = self.streams_by_id.get_mut(&row.id.stream()).unwrap(); + s.duration -= row.time.end - row.time.start; + s.sample_file_bytes -= row.sample_file_bytes as i64; + adjust_days(row.time, -1, &mut s.days); + } + + // Process delete_garbage. + let mut gced = 0; + for dir in self.sample_file_dirs_by_id.values_mut() { + gced += dir.to_gc.len(); + for id in dir.to_gc.drain(..) { + dir.garbage.remove(&id); + } + } + + // Process add_recordings. + let mut added = 0; + for (stream_id, m) in mods.drain() { + let s = self.streams_by_id.get_mut(&stream_id).unwrap(); + s.next_recording_id += m.num_recordings_to_commit; + added += m.num_recordings_to_commit; + for _ in 0..m.num_recordings_to_commit { + let u = s.uncommitted.pop_front().unwrap(); + let l = u.lock(); + if let Some(ref r) = l.recording { + s.add_recording(r.time.clone(), r.sample_file_bytes); + } + } + s.range = m.range; + } + info!("Flush due to {}: added {} recordings, deleted {}, marked {} files GCed.", + reason, added, deleted, gced); + for cb in &self.on_flush { + cb(); + } + Ok(()) + } + + /// Sets a watcher which will receive an (empty) event on successful flush. + /// The lock will be held while this is run, so it should not do any I/O. + pub(crate) fn on_flush(&mut self, run: Box) { + self.on_flush.push(run); + } + + // TODO: find a cleaner way to do this. Seems weird for src/cmds/run.rs to clear the on flush + // handlers given that it didn't add them. + pub fn clear_on_flush(&mut self) { + self.on_flush.clear(); } /// Opens the given sample file directories. @@ -1025,7 +914,7 @@ impl LockedDatabase { /// in practice. pub fn open_sample_file_dirs(&mut self, ids: &[i32]) -> Result<(), Error> { let mut in_progress = FnvHashMap::with_capacity_and_hasher(ids.len(), Default::default()); - let o = self.state.open.as_ref(); + let o = self.open.as_ref(); for &id in ids { let e = in_progress.entry(id); use ::std::collections::hash_map::Entry; @@ -1033,13 +922,13 @@ impl LockedDatabase { Entry::Occupied(_) => continue, // suppress duplicate. Entry::Vacant(e) => e, }; - let dir = self.state + let dir = self .sample_file_dirs_by_id .get_mut(&id) .ok_or_else(|| format_err!("no such dir {}", id))?; if dir.dir.is_some() { continue } let mut meta = schema::DirMeta::default(); - meta.db_uuid.extend_from_slice(&self.state.uuid.as_bytes()[..]); + meta.db_uuid.extend_from_slice(&self.uuid.as_bytes()[..]); meta.dir_uuid.extend_from_slice(&dir.uuid.as_bytes()[..]); if let Some(o) = dir.last_complete_open { let open = meta.mut_last_complete_open(); @@ -1078,7 +967,7 @@ impl LockedDatabase { tx.commit()?; for (id, (mut meta, d)) in in_progress.drain() { - let dir = self.state.sample_file_dirs_by_id.get_mut(&id).unwrap(); + let dir = self.sample_file_dirs_by_id.get_mut(&id).unwrap(); meta.last_complete_open.clear(); mem::swap(&mut meta.last_complete_open, &mut meta.in_progress_open); d.write_meta(&meta)?; @@ -1088,30 +977,17 @@ impl LockedDatabase { Ok(()) } - pub fn streams_by_id(&self) -> &BTreeMap { &self.state.streams_by_id } + pub fn streams_by_id(&self) -> &BTreeMap { &self.streams_by_id } /// Returns an immutable view of the video sample entries. pub fn video_sample_entries(&self) -> btree_map::Values> { - self.state.video_sample_entries.values() - } - - /// Starts a transaction for a write operation. - /// Note transactions are not needed for read operations; this process holds a lock on the - /// database directory, and the connection is locked within the process, so having a - /// `LockedDatabase` is sufficient to ensure a consistent view. - pub fn tx(&mut self) -> Result { - Ok(Transaction { - state: &mut self.state, - mods_by_stream: FnvHashMap::default(), - tx: self.conn.transaction()?, - must_rollback: false, - }) + self.video_sample_entries.values() } /// Gets a given camera by uuid. pub fn get_camera(&self, uuid: Uuid) -> Option<&Camera> { - match self.state.cameras_by_uuid.get(&uuid) { - Some(id) => Some(self.state.cameras_by_id.get(id).expect("uuid->id requires id->cam")), + match self.cameras_by_uuid.get(&uuid) { + Some(id) => Some(self.cameras_by_id.get(id).expect("uuid->id requires id->cam")), None => None, } } @@ -1121,7 +997,7 @@ impl LockedDatabase { pub fn list_recordings_by_time(&self, stream_id: i32, desired_time: Range, f: F) -> Result<(), Error> where F: FnMut(ListRecordingsRow) -> Result<(), Error> { - let mut stmt = self.conn.prepare_cached(&self.state.list_recordings_by_time_sql)?; + let mut stmt = self.conn.prepare_cached(&self.list_recordings_by_time_sql)?; let rows = stmt.query_named(&[ (":stream_id", &stream_id), (":start_time_90k", &desired_time.start.0), @@ -1147,7 +1023,7 @@ impl LockedDatabase { let row = row?; let id = CompositeId(row.get_checked::<_, i64>(0)?); let vse_id = row.get_checked(8)?; - let video_sample_entry = match self.state.video_sample_entries.get(&vse_id) { + let video_sample_entry = match self.video_sample_entries.get(&vse_id) { Some(v) => v, None => bail!("recording {} references nonexistent video_sample_entry {}", id, vse_id), @@ -1245,7 +1121,7 @@ impl LockedDatabase { /// This uses a LRU cache to reduce the number of retrievals from the database. pub fn with_recording_playback(&self, id: CompositeId, f: F) -> Result where F: FnOnce(&RecordingPlayback) -> Result { - let mut cache = self.state.video_index_cache.borrow_mut(); + let mut cache = self.video_index_cache.borrow_mut(); if let Some(video_index) = cache.get_mut(&id.0) { trace!("cache hit for recording {}", id); return f(&RecordingPlayback { video_index }); @@ -1263,23 +1139,18 @@ impl LockedDatabase { Err(format_err!("no such recording {}", id)) } - /// Lists all garbage ids. - pub fn list_garbage(&self, dir_id: i32) -> Result, Error> { - let mut garbage = Vec::new(); - let mut stmt = self.conn.prepare_cached( - "select composite_id from garbage where sample_file_dir_id = ?")?; - let mut rows = stmt.query(&[&dir_id])?; - while let Some(row) = rows.next() { - let row = row?; - garbage.push(CompositeId(row.get_checked(0)?)); - } - Ok(garbage) - } - /// Lists the oldest sample files (to delete to free room). /// `f` should return true as long as further rows are desired. - pub fn list_oldest_sample_files(&self, stream_id: i32, mut f: F) -> Result<(), Error> + pub(crate) fn list_oldest_sample_files(&self, stream_id: i32, mut f: F) -> Result<(), Error> where F: FnMut(ListOldestSampleFilesRow) -> bool { + let s = match self.streams_by_id.get(&stream_id) { + None => bail!("no stream {}", stream_id), + Some(s) => s, + }; + let sample_file_dir_id = match s.sample_file_dir_id { + None => bail!("stream {} has no dir", stream_id), + Some(d) => d, + }; let mut stmt = self.conn.prepare_cached(LIST_OLDEST_SAMPLE_FILES_SQL)?; let mut rows = stmt.query_named(&[ (":start", &CompositeId::new(stream_id, 0).0), @@ -1292,6 +1163,7 @@ impl LockedDatabase { let duration = recording::Duration(row.get_checked(2)?); let should_continue = f(ListOldestSampleFilesRow{ id, + sample_file_dir_id, time: start .. start + duration, sample_file_bytes: row.get_checked(3)?, }); @@ -1328,7 +1200,7 @@ impl LockedDatabase { sha1.copy_from_slice(&sha1_vec); let data: Vec = row.get_checked(5)?; - self.state.video_sample_entries.insert(id, Arc::new(VideoSampleEntry { + self.video_sample_entries.insert(id, Arc::new(VideoSampleEntry { id: id as i32, width: row.get_checked::<_, i32>(2)? as u16, height: row.get_checked::<_, i32>(3)? as u16, @@ -1338,7 +1210,7 @@ impl LockedDatabase { })); } info!("Loaded {} video sample entries", - self.state.video_sample_entries.len()); + self.video_sample_entries.len()); Ok(()) } @@ -1368,15 +1240,17 @@ impl LockedDatabase { (None, None) => None, _ => bail!("open table missing id {}", id), }; - self.state.sample_file_dirs_by_id.insert(id, SampleFileDir { + self.sample_file_dirs_by_id.insert(id, SampleFileDir { id, uuid: dir_uuid.0, path: row.get_checked(1)?, dir: None, last_complete_open, + to_gc: Vec::new(), + garbage: raw::list_garbage(&self.conn, id)?, }); } - info!("Loaded {} sample file dirs", self.state.sample_file_dirs_by_id.len()); + info!("Loaded {} sample file dirs", self.sample_file_dirs_by_id.len()); Ok(()) } @@ -1401,7 +1275,7 @@ impl LockedDatabase { let row = row?; let id = row.get_checked(0)?; let uuid: FromSqlUuid = row.get_checked(1)?; - self.state.cameras_by_id.insert(id, Camera { + self.cameras_by_id.insert(id, Camera { id: id, uuid: uuid.0, short_name: row.get_checked(2)?, @@ -1411,9 +1285,9 @@ impl LockedDatabase { password: row.get_checked(6)?, streams: Default::default(), }); - self.state.cameras_by_uuid.insert(uuid.0, id); + self.cameras_by_uuid.insert(uuid.0, id); } - info!("Loaded {} cameras", self.state.cameras_by_id.len()); + info!("Loaded {} cameras", self.cameras_by_id.len()); Ok(()) } @@ -1429,6 +1303,7 @@ impl LockedDatabase { sample_file_dir_id, rtsp_path, retain_bytes, + flush_if_sec, next_recording_id, record from @@ -1442,28 +1317,32 @@ impl LockedDatabase { let type_ = StreamType::parse(&type_).ok_or_else( || format_err!("no such stream type {}", type_))?; let camera_id = row.get_checked(2)?; - let c = self.state + let c = self .cameras_by_id .get_mut(&camera_id) .ok_or_else(|| format_err!("missing camera {} for stream {}", camera_id, id))?; - self.state.streams_by_id.insert(id, Stream { + let flush_if_sec = row.get_checked(6)?; + self.streams_by_id.insert(id, Stream { id, type_, camera_id, sample_file_dir_id: row.get_checked(3)?, rtsp_path: row.get_checked(4)?, retain_bytes: row.get_checked(5)?, + flush_if_sec, + flush_if: recording::Duration(flush_if_sec * recording::TIME_UNITS_PER_SEC), range: None, sample_file_bytes: 0, duration: recording::Duration(0), days: BTreeMap::new(), - next_recording_id: row.get_checked(6)?, - record: row.get_checked(7)?, + next_recording_id: row.get_checked(7)?, + record: row.get_checked(8)?, + uncommitted: VecDeque::new(), }); c.streams[type_.index()] = Some(id); } - info!("Loaded {} streams", self.state.streams_by_id.len()); + info!("Loaded {} streams", self.streams_by_id.len()); Ok(()) } @@ -1477,7 +1356,7 @@ impl LockedDatabase { // Check if it already exists. // There shouldn't be too many entries, so it's fine to enumerate everything. - for (&id, v) in &self.state.video_sample_entries { + for (&id, v) in &self.video_sample_entries { if v.sha1 == sha1_bytes { // The width and height should match given that they're also specified within data // and thus included in the just-compared hash. @@ -1499,7 +1378,7 @@ impl LockedDatabase { ])?; let id = self.conn.last_insert_rowid() as i32; - self.state.video_sample_entries.insert(id, Arc::new(VideoSampleEntry { + self.video_sample_entries.insert(id, Arc::new(VideoSampleEntry { id, width, height, @@ -1515,14 +1394,14 @@ impl LockedDatabase { let mut meta = schema::DirMeta::default(); let uuid = Uuid::new_v4(); let uuid_bytes = &uuid.as_bytes()[..]; - let o = self.state + let o = self .open .as_ref() .ok_or_else(|| format_err!("database is read-only"))?; // Populate meta. { - meta.db_uuid.extend_from_slice(&self.state.uuid.as_bytes()[..]); + meta.db_uuid.extend_from_slice(&self.uuid.as_bytes()[..]); meta.dir_uuid.extend_from_slice(uuid_bytes); let open = meta.mut_in_progress_open(); open.id = o.id; @@ -1530,6 +1409,7 @@ impl LockedDatabase { } let dir = dir::SampleFileDir::create(&path, &meta)?; + // TODO: ensure the dir is empty? let uuid = Uuid::new_v4(); self.conn.execute(r#" insert into sample_file_dir (path, uuid, last_complete_open_id) @@ -1537,7 +1417,7 @@ impl LockedDatabase { "#, &[&path, &uuid_bytes, &o.id])?; let id = self.conn.last_insert_rowid() as i32; use ::std::collections::btree_map::Entry; - let e = self.state.sample_file_dirs_by_id.entry(id); + let e = self.sample_file_dirs_by_id.entry(id); let d = match e { Entry::Vacant(e) => e.insert(SampleFileDir { id, @@ -1545,6 +1425,8 @@ impl LockedDatabase { uuid, dir: Some(dir), last_complete_open: None, + to_gc: Vec::new(), + garbage: FnvHashSet::default(), }), Entry::Occupied(_) => Err(format_err!("duplicate sample file dir id {}", id))?, }; @@ -1555,7 +1437,7 @@ impl LockedDatabase { } pub fn delete_sample_file_dir(&mut self, dir_id: i32) -> Result<(), Error> { - for (&id, s) in self.state.streams_by_id.iter() { + for (&id, s) in self.streams_by_id.iter() { if s.sample_file_dir_id == Some(dir_id) { bail!("can't delete dir referenced by stream {}", id); } @@ -1565,7 +1447,7 @@ impl LockedDatabase { if self.conn.execute("delete from sample_file_dir where id = ?", &[&dir_id])? != 1 { bail!("no such dir {} to remove", dir_id); } - self.state.sample_file_dirs_by_id.remove(&dir_id).expect("sample file dir should exist!"); + self.sample_file_dirs_by_id.remove(&dir_id).expect("sample file dir should exist!"); Ok(()) } @@ -1590,12 +1472,12 @@ impl LockedDatabase { (":password", &camera.password), ])?; camera_id = tx.last_insert_rowid() as i32; - streams = StreamStateChanger::new(&tx, camera_id, None, &self.state.streams_by_id, + streams = StreamStateChanger::new(&tx, camera_id, None, &self.streams_by_id, &mut camera)?; } tx.commit()?; - let streams = streams.apply(&mut self.state.streams_by_id); - self.state.cameras_by_id.insert(camera_id, Camera { + let streams = streams.apply(&mut self.streams_by_id); + self.cameras_by_id.insert(camera_id, Camera { id: camera_id, uuid, short_name: camera.short_name, @@ -1605,7 +1487,7 @@ impl LockedDatabase { password: camera.password, streams, }); - self.state.cameras_by_uuid.insert(uuid, camera_id); + self.cameras_by_uuid.insert(uuid, camera_id); Ok(camera_id) } @@ -1614,12 +1496,12 @@ impl LockedDatabase { // TODO: sample_file_dir_id. disallow change when data is stored; change otherwise. let tx = self.conn.transaction()?; let streams; - let c = self.state + let c = self .cameras_by_id .get_mut(&camera_id) .ok_or_else(|| format_err!("no such camera {}", camera_id))?; { - streams = StreamStateChanger::new(&tx, camera_id, Some(c), &self.state.streams_by_id, + streams = StreamStateChanger::new(&tx, camera_id, Some(c), &self.streams_by_id, &mut camera)?; let mut stmt = tx.prepare_cached(r#" update camera set @@ -1649,20 +1531,20 @@ impl LockedDatabase { c.host = camera.host; c.username = camera.username; c.password = camera.password; - c.streams = streams.apply(&mut self.state.streams_by_id); + c.streams = streams.apply(&mut self.streams_by_id); Ok(()) } /// Deletes a camera and its streams. The camera must have no recordings. pub fn delete_camera(&mut self, id: i32) -> Result<(), Error> { - let uuid = self.state.cameras_by_id.get(&id) + let uuid = self.cameras_by_id.get(&id) .map(|c| c.uuid) .ok_or_else(|| format_err!("No such camera {} to remove", id))?; let mut streams_to_delete = Vec::new(); let tx = self.conn.transaction()?; { let mut stream_stmt = tx.prepare_cached(r"delete from stream where id = :id")?; - for (stream_id, stream) in &self.state.streams_by_id { + for (stream_id, stream) in &self.streams_by_id { if stream.camera_id != id { continue }; if stream.range.is_some() { bail!("Can't remove camera {}; has recordings.", id); @@ -1681,12 +1563,47 @@ impl LockedDatabase { } tx.commit()?; for id in streams_to_delete { - self.state.streams_by_id.remove(&id); + self.streams_by_id.remove(&id); } - self.state.cameras_by_id.remove(&id); - self.state.cameras_by_uuid.remove(&uuid); + self.cameras_by_id.remove(&id); + self.cameras_by_uuid.remove(&uuid); return Ok(()) } + + pub fn update_retention(&mut self, changes: &[RetentionChange]) -> Result<(), Error> { + let tx = self.conn.transaction()?; + { + let mut stmt = tx.prepare_cached(r#" + update stream + set + record = :record, + retain_bytes = :retain + where + id = :id + "#)?; + for c in changes { + if c.new_limit < 0 { + bail!("can't set limit for stream {} to {}; must be >= 0", + c.stream_id, c.new_limit); + } + let rows = stmt.execute_named(&[ + (":record", &c.new_record), + (":retain", &c.new_limit), + (":id", &c.stream_id), + ])?; + if rows != 1 { + bail!("no such stream {}", c.stream_id); + } + } + } + tx.commit()?; + for c in changes { + let s = self.streams_by_id.get_mut(&c.stream_id).expect("stream in db but not state"); + s.record = c.new_record; + s.retain_bytes = c.new_limit; + } + Ok(()) + } } /// Gets the schema version from the given database connection. @@ -1706,8 +1623,20 @@ pub fn get_schema_version(conn: &rusqlite::Connection) -> Result, Er /// The recording database. Abstracts away SQLite queries. Also maintains in-memory state /// (loaded on startup, and updated on successful commit) to avoid expensive scans over the /// recording table on common queries. -#[derive(Debug)] -pub struct Database(Mutex); +pub struct Database( + /// This is wrapped in an `Option` to allow the `Drop` implementation and `close` to coexist. + Option> +); + +impl Drop for Database { + fn drop(&mut self) { + if let Some(m) = self.0.take() { + if let Err(e) = m.into_inner().flush("drop") { + error!("Final database flush failed: {}", e); + } + } + } +} impl Database { /// Creates the database from a caller-supplied SQLite connection. @@ -1770,29 +1699,29 @@ impl Database { uuid, }) } else { None }; - let db = Database(Mutex::new(LockedDatabase{ + let db = Database(Some(Mutex::new(LockedDatabase { conn: conn, - state: State { - uuid, - open, - sample_file_dirs_by_id: BTreeMap::new(), - cameras_by_id: BTreeMap::new(), - cameras_by_uuid: BTreeMap::new(), - streams_by_id: BTreeMap::new(), - video_sample_entries: BTreeMap::new(), - video_index_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())), - list_recordings_by_time_sql: list_recordings_by_time_sql, - }, - })); + uuid, + open, + sample_file_dirs_by_id: BTreeMap::new(), + cameras_by_id: BTreeMap::new(), + cameras_by_uuid: BTreeMap::new(), + streams_by_id: BTreeMap::new(), + video_sample_entries: BTreeMap::new(), + video_index_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())), + list_recordings_by_time_sql, + to_delete: Vec::new(), + on_flush: Vec::new(), + }))); { let l = &mut *db.lock(); l.init_video_sample_entries()?; l.init_sample_file_dirs()?; l.init_cameras()?; l.init_streams()?; - for (&stream_id, ref mut stream) in &mut l.state.streams_by_id { + for (&stream_id, ref mut stream) in &mut l.streams_by_id { // TODO: we could use one thread per stream if we had multiple db conns. - let camera = l.state.cameras_by_id.get(&stream.camera_id).unwrap(); + let camera = l.cameras_by_id.get(&stream.camera_id).unwrap(); init_recordings(&mut l.conn, stream_id, camera, stream)?; } } @@ -1816,13 +1745,13 @@ impl Database { /// Locks the database; the returned reference is the only way to perform (read or write) /// operations. - pub fn lock(&self) -> MutexGuard { self.0.lock() } + pub fn lock(&self) -> MutexGuard { self.0.as_ref().unwrap().lock() } - /// For testing. Closes the database and return the connection. This allows verification that - /// a newly opened database is in an acceptable state. + /// For testing: closes the database (without flushing) and returns the connection. + /// This allows verification that a newly opened database is in an acceptable state. #[cfg(test)] - fn close(self) -> rusqlite::Connection { - self.0.into_inner().conn + fn close(mut self) -> rusqlite::Connection { + self.0.take().unwrap().into_inner().conn } } @@ -1992,7 +1921,7 @@ mod tests { #[test] fn test_no_meta_or_version() { testutil::init(); - let e = Database::new(Connection::open_in_memory().unwrap(), false).unwrap_err(); + let e = Database::new(Connection::open_in_memory().unwrap(), false).err().unwrap(); assert!(e.to_string().starts_with("no such table"), "{}", e); } @@ -2001,7 +1930,7 @@ mod tests { testutil::init(); let c = setup_conn(); c.execute_batch("delete from version; insert into version values (2, 0, '');").unwrap(); - let e = Database::new(c, false).unwrap_err(); + let e = Database::new(c, false).err().unwrap(); assert!(e.to_string().starts_with( "Database schema version 2 is too old (expected 3)"), "got: {:?}", e); } @@ -2011,7 +1940,7 @@ mod tests { testutil::init(); let c = setup_conn(); c.execute_batch("delete from version; insert into version values (4, 0, '');").unwrap(); - let e = Database::new(c, false).unwrap_err(); + let e = Database::new(c, false).err().unwrap(); assert!(e.to_string().starts_with( "Database schema version 4 is too new (expected 3)"), "got: {:?}", e); } @@ -2034,7 +1963,7 @@ mod tests { let db = Database::new(conn, true).unwrap(); let tmpdir = tempdir::TempDir::new("moonfire-nvr-test").unwrap(); let path = tmpdir.path().to_str().unwrap().to_owned(); - let sample_file_dir_id = Some({ db.lock() }.add_sample_file_dir(path).unwrap()); + let sample_file_dir_id = { db.lock() }.add_sample_file_dir(path).unwrap(); let camera_id = { db.lock() }.add_camera(CameraChange { short_name: "testcam".to_owned(), description: "".to_owned(), @@ -2042,16 +1971,28 @@ mod tests { username: "foo".to_owned(), password: "bar".to_owned(), streams: [ - StreamChange { sample_file_dir_id, rtsp_path: "/main".to_owned(), record: true }, - StreamChange { sample_file_dir_id, rtsp_path: "/sub".to_owned(), record: true }, + StreamChange { + sample_file_dir_id: Some(sample_file_dir_id), + rtsp_path: "/main".to_owned(), + record: true, + flush_if_sec: 0, + }, + StreamChange { + sample_file_dir_id: Some(sample_file_dir_id), + rtsp_path: "/sub".to_owned(), + record: true, + flush_if_sec: 0, + }, ], }).unwrap(); { let mut l = db.lock(); let stream_id = l.cameras_by_id().get(&camera_id).unwrap().streams[0].unwrap(); - let mut tx = l.tx().unwrap(); - tx.update_retention(stream_id, true, 42).unwrap(); - tx.commit().unwrap(); + l.update_retention(&[super::RetentionChange { + stream_id, + new_record: true, + new_limit: 42, + }]).unwrap(); } let camera_uuid = { db.lock().cameras_by_id().get(&camera_id).unwrap().uuid }; assert_no_recordings(&db, camera_uuid); @@ -2061,7 +2002,7 @@ mod tests { let db = Database::new(conn, true).unwrap(); assert_no_recordings(&db, camera_uuid); - assert_eq!(db.lock().list_garbage(sample_file_dir_id.unwrap()).unwrap(), &[]); + // TODO: assert_eq!(db.lock().list_garbage(sample_file_dir_id).unwrap(), &[]); let vse_id = db.lock().insert_video_sample_entry( 1920, 1080, include_bytes!("testdata/avc1").to_vec(), @@ -2071,9 +2012,7 @@ mod tests { // Inserting a recording should succeed and advance the next recording id. let start = recording::Time(1430006400 * TIME_UNITS_PER_SEC); let stream_id = camera_id; // TODO - let id = CompositeId::new(stream_id, 1); let recording = RecordingToInsert { - id, sample_file_bytes: 42, run_offset: 0, flags: 0, @@ -2085,12 +2024,14 @@ mod tests { video_index: [0u8; 100].to_vec(), sample_file_sha1: [0u8; 20], }; - { + let id = { let mut db = db.lock(); - let mut tx = db.tx().unwrap(); - tx.insert_recording(&recording).unwrap(); - tx.commit().unwrap(); - } + let (id, u) = db.add_recording(stream_id).unwrap(); + u.lock().recording = Some(recording.clone()); + u.lock().synced = true; + db.flush("add test").unwrap(); + id + }; assert_eq!(db.lock().streams_by_id().get(&stream_id).unwrap().next_recording_id, 2); // Queries should return the correct result (with caches update on insert). @@ -2108,36 +2049,18 @@ mod tests { let mut v = Vec::new(); db.list_oldest_sample_files(stream_id, |r| { v.push(r); true }).unwrap(); assert_eq!(1, v.len()); - let mut tx = db.tx().unwrap(); - tx.delete_recordings(&v).unwrap(); - tx.commit().unwrap(); + db.delete_recordings(&mut v); + db.flush("delete test").unwrap(); } assert_no_recordings(&db, camera_uuid); - assert_eq!(db.lock().list_garbage(sample_file_dir_id.unwrap()).unwrap(), vec![id]); - } - - #[test] - fn test_drop_tx() { - testutil::init(); - let conn = setup_conn(); - conn.execute("insert into garbage values (1, ?)", &[&CompositeId::new(1, 1).0]).unwrap(); - let db = Database::new(conn, true).unwrap(); - let mut db = db.lock(); - { - let mut tx = db.tx().unwrap(); - tx.mark_sample_files_deleted(&[CompositeId::new(1, 1)]).unwrap(); - // drop tx without committing. - } - - // The dropped tx should have done nothing. - assert_eq!(db.list_garbage(1).unwrap(), &[CompositeId::new(1, 1)]); - - // Following transactions should succeed. - { - let mut tx = db.tx().unwrap(); - tx.mark_sample_files_deleted(&[CompositeId::new(1, 1)]).unwrap(); - tx.commit().unwrap(); - } - assert_eq!(db.list_garbage(1).unwrap(), &[]); + let g: Vec<_> = db.lock() + .sample_file_dirs_by_id() + .get(&sample_file_dir_id) + .unwrap() + .garbage + .iter() + .map(|&id| id) + .collect(); + assert_eq!(&g, &[id]); } } diff --git a/db/dir.rs b/db/dir.rs index 7e1b685..ce431c7 100644 --- a/db/dir.rs +++ b/db/dir.rs @@ -36,6 +36,7 @@ use db::{self, CompositeId}; use failure::{Error, Fail}; use fnv::FnvHashMap; use libc::{self, c_char}; +use parking_lot::Mutex; use protobuf::{self, Message}; use recording; use openssl::hash; @@ -47,7 +48,7 @@ use std::io::{self, Read, Write}; use std::mem; use std::os::unix::ffi::OsStrExt; use std::os::unix::io::FromRawFd; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::sync::mpsc; use std::thread; @@ -62,9 +63,6 @@ pub struct SampleFileDir { /// The open file descriptor for the directory. The worker uses it to create files and sync the /// directory. Other threads use it to open sample files for reading during video serving. fd: Fd, - - // Lock order: don't acquire mutable.lock() while holding db.lock(). - mutable: Mutex, } /// A file descriptor associated with a directory (not necessarily the sample file dir). @@ -199,9 +197,6 @@ impl SampleFileDir { .map_err(|e| format_err!("unable to open sample file dir {}: {}", path, e))?; Ok(Arc::new(SampleFileDir { fd, - mutable: Mutex::new(SharedMutableState{ - next_id_by_stream: FnvHashMap::default(), - }), })) } @@ -258,40 +253,11 @@ impl SampleFileDir { prev: Option, stream_id: i32, video_sample_entry_id: i32) -> Result, Error> { - // Grab the next id. The dir itself will typically have an id (possibly one ahead of what's - // stored in the database), but not on the first attempt for a stream. - use std::collections::hash_map::Entry; - let recording_id; - match self.mutable.lock().unwrap().next_id_by_stream.entry(stream_id) { - Entry::Occupied(mut e) => { - let v = e.get_mut(); - recording_id = *v; - *v += 1; - }, - Entry::Vacant(e) => { - let mut l = db.lock(); - recording_id = l.streams_by_id().get(&stream_id).unwrap().next_recording_id; - e.insert(recording_id + 1); - }, - }; - - let id = CompositeId::new(stream_id, recording_id); + let (id, r) = db.lock().add_recording(stream_id)?; let p = SampleFileDir::get_rel_pathname(id); - - let f = match unsafe { self.fd.openat(p.as_ptr(), - libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, - 0o600) } { - Ok(f) => f, - Err(e) => { - // Put the id back to try again later. - let mut l = self.mutable.lock().unwrap(); - let v = l.next_id_by_stream.get_mut(&stream_id).unwrap(); - assert_eq!(*v, recording_id + 1); - *v -= 1; - return Err(e.into()); - }, - }; - Writer::open(f, id, prev, video_sample_entry_id, channel) + let f = unsafe { self.fd.openat(p.as_ptr(), libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, + 0o600) }.unwrap(); // TODO: don't unwrap! + Writer::open(f, id, r, prev, video_sample_entry_id, channel) } pub fn statfs(&self) -> Result { self.fd.statfs() } @@ -325,16 +291,11 @@ impl SampleFileDir { } } -/// State shared between users of the `SampleFileDirectory` struct and the syncer. -#[derive(Debug)] -struct SharedMutableState { - next_id_by_stream: FnvHashMap, -} - /// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct. enum SyncerCommand { - AsyncSaveRecording(db::RecordingToInsert, fs::File), - AsyncAbandonRecording(CompositeId), + AsyncSaveRecording(CompositeId, Arc>, fs::File), + //AsyncAbandonRecording(CompositeId), + DatabaseFlushed, Flush(mpsc::SyncSender<()>), } @@ -345,20 +306,9 @@ pub struct SyncerChannel(mpsc::Sender); /// State of the worker thread. struct Syncer { + dir_id: i32, dir: Arc, db: Arc, - - /// Files to be unlinked then immediately forgotten about. They are `>= next_recording_id` for - /// their stream, `next_recording_id` won't be advanced without a sync of the directory, and - /// extraneous files `>= next_recording_id` are unlinked on startup, so this should be - /// sufficient. - to_abandon: Vec, - - /// Files to be unlinked then removed from the garbage table. - to_unlink: Vec, - - /// Files to be removed from the garbage table. - to_mark_deleted: Vec, } /// Starts a syncer for the given sample file directory. @@ -371,13 +321,23 @@ struct Syncer { /// /// Returns a `SyncerChannel` which can be used to send commands (and can be cloned freely) and /// a `JoinHandle` for the syncer thread. At program shutdown, all `SyncerChannel` clones should be -/// removed and then the handle joined to allow all recordings to be persisted. +/// dropped and then the handle joined to allow all recordings to be persisted. +/// +/// Note that dropping all `SyncerChannel` clones currently includes calling +/// `LockedDatabase::clear_on_flush`, as this function installs a hook to watch database flushes. +/// TODO: add a join wrapper which arranges for the on flush hook to be removed automatically. pub fn start_syncer(db: Arc, dir_id: i32) -> Result<(SyncerChannel, thread::JoinHandle<()>), Error> { let db2 = db.clone(); let (mut syncer, path) = Syncer::new(&db.lock(), db2, dir_id)?; syncer.initial_rotation()?; let (snd, rcv) = mpsc::channel(); + db.lock().on_flush(Box::new({ + let snd = snd.clone(); + move || if let Err(e) = snd.send(SyncerCommand::DatabaseFlushed) { + warn!("Unable to notify syncer for dir {} of flush: {}", dir_id, e); + } + })); Ok((SyncerChannel(snd), thread::Builder::new() .name(format!("sync-{}", path)) @@ -440,13 +400,14 @@ fn get_rows_to_delete(db: &db::LockedDatabase, stream_id: i32, impl SyncerChannel { /// Asynchronously syncs the given writer, closes it, records it into the database, and /// starts rotation. - fn async_save_recording(&self, recording: db::RecordingToInsert, f: fs::File) { - self.0.send(SyncerCommand::AsyncSaveRecording(recording, f)).unwrap(); + fn async_save_recording(&self, id: CompositeId, recording: Arc>, + f: fs::File) { + self.0.send(SyncerCommand::AsyncSaveRecording(id, recording, f)).unwrap(); } - fn async_abandon_recording(&self, id: CompositeId) { - self.0.send(SyncerCommand::AsyncAbandonRecording(id)).unwrap(); - } + //fn async_abandon_recording(&self, id: CompositeId) { + // self.0.send(SyncerCommand::AsyncAbandonRecording(id)).unwrap(); + //} /// For testing: flushes the syncer, waiting for all currently-queued commands to complete. pub fn flush(&self) { @@ -463,9 +424,8 @@ impl Syncer { .get(&dir_id) .ok_or_else(|| format_err!("no dir {}", dir_id))?; let dir = d.get()?; - let to_unlink = l.list_garbage(dir_id)?; - // Get files to abandon. + // Abandon files. // First, get a list of the streams in question. let streams_to_next: FnvHashMap<_, _> = l.streams_by_id() @@ -479,13 +439,25 @@ impl Syncer { }) .collect(); let to_abandon = Syncer::list_files_to_abandon(&d.path, streams_to_next)?; + let mut undeletable = 0; + for &id in &to_abandon { + if let Err(e) = SampleFileDir::unlink(&dir.fd, id) { + if e.kind() == io::ErrorKind::NotFound { + warn!("dir: abandoned recording {} already deleted!", id); + } else { + warn!("dir: Unable to unlink abandoned recording {}: {}", id, e); + undeletable += 1; + } + } + } + if undeletable > 0 { + bail!("Unable to delete {} abandoned recordings.", undeletable); + } Ok((Syncer { + dir_id, dir, db, - to_abandon, - to_unlink, - to_mark_deleted: Vec::new(), }, d.path.clone())) } @@ -515,8 +487,9 @@ impl Syncer { loop { match cmds.recv() { Err(_) => return, // all senders have closed the channel; shutdown - Ok(SyncerCommand::AsyncSaveRecording(recording, f)) => self.save(recording, f), - Ok(SyncerCommand::AsyncAbandonRecording(uuid)) => self.abandon(uuid), + Ok(SyncerCommand::AsyncSaveRecording(id, r, f)) => self.save(id, r, f), + //Ok(SyncerCommand::AsyncAbandonRecording(uuid)) => self.abandon(uuid), + Ok(SyncerCommand::DatabaseFlushed) => { let _ = self.collect_garbage(true); }, Ok(SyncerCommand::Flush(_)) => {}, // just drop the supplied sender, closing it. }; } @@ -535,106 +508,102 @@ impl Syncer { fn do_rotation(&mut self, get_rows_to_delete: F) -> Result<(), Error> where F: FnOnce(&db::LockedDatabase) -> Result, Error> { - let to_delete = { - let mut db = self.db.lock(); - let to_delete = get_rows_to_delete(&*db)?; - let mut tx = db.tx()?; - tx.delete_recordings(&to_delete)?; - tx.commit()?; - to_delete - }; - for row in to_delete { - self.to_unlink.push(row.id); - } - self.try_unlink(); - if !self.to_unlink.is_empty() { - bail!("failed to unlink {} sample files", self.to_unlink.len()); - } - self.dir.sync()?; { let mut db = self.db.lock(); - let mut tx = db.tx()?; - tx.mark_sample_files_deleted(&self.to_mark_deleted)?; - tx.commit()?; + let mut to_delete = get_rows_to_delete(&*db)?; + db.delete_recordings(&mut to_delete); + db.flush("synchronous deletion")?; } - self.to_mark_deleted.clear(); - Ok(()) + self.collect_garbage(false)?; + self.db.lock().flush("synchronous garbage collection") + } + + fn collect_garbage(&mut self, warn_on_missing: bool) -> Result<(), Error> { + let mut garbage: Vec<_> = { + let l = self.db.lock(); + let d = match l.sample_file_dirs_by_id().get(&self.dir_id) { + None => { + error!("can't find dir {} in db!", self.dir_id); + bail!("can't find dir {} in db!", self.dir_id); + }, + Some(d) => d, + }; + d.garbage.iter().map(|id| *id).collect() + }; + let len_before = garbage.len(); + garbage.retain(|&id| { + if let Err(e) = SampleFileDir::unlink(&self.dir.fd, id) { + if e.kind() == io::ErrorKind::NotFound { + if warn_on_missing { + warn!("dir: recording {} already deleted!", id); + } + } else { + warn!("dir: Unable to unlink {}: {}", id, e); + return false; + } + } + true + }); + let res = if len_before > garbage.len() { + Err(format_err!("Unable to unlink {} files", len_before - garbage.len())) + } else { + Ok(()) + }; + if garbage.is_empty() { + // No progress. + return res; + } + if let Err(e) = self.dir.sync() { + error!("unable to sync dir: {}", e); + return res.and(Err(e.into())); + } + if let Err(e) = self.db.lock().delete_garbage(self.dir_id, &mut garbage) { + error!("unable to delete garbage ({} files) for dir {}: {}", + self.dir_id, garbage.len(), e); + return res.and(Err(e.into())); + } + res } /// Saves the given recording and causes rotation to happen. /// Note that part of rotation is deferred for the next cycle (saved writing or program startup) /// so that there can be only one dir sync and database transaction per save. - fn save(&mut self, recording: db::RecordingToInsert, f: fs::File) { - if let Err(e) = self.save_helper(&recording, f) { - error!("will discard recording {} due to error while saving: {}", recording.id, e); - self.abandon(recording.id); - return; - } - } - - fn abandon(&mut self, id: CompositeId) { - self.to_abandon.push(id); - self.try_unlink(); - } - /// Internal helper for `save`. This is separated out so that the question-mark operator /// can be used in the many error paths. - fn save_helper(&mut self, recording: &db::RecordingToInsert, f: fs::File) - -> Result<(), Error> { - self.try_unlink(); - if !self.to_unlink.is_empty() { - bail!("failed to unlink {} files.", self.to_unlink.len()); - } + /// TODO: less unwrapping! keep a queue? + fn save(&mut self, id: CompositeId, recording: Arc>, + f: fs::File) { + let stream_id = id.stream(); - // XXX: if these calls fail, any other writes are likely to fail as well. - f.sync_all()?; - self.dir.sync()?; - - let mut to_delete = Vec::new(); - let mut db = self.db.lock(); + // Free up a like number of bytes. { - let stream_id = recording.id.stream(); - let stream = - db.streams_by_id().get(&stream_id) - .ok_or_else(|| format_err!("no such stream {}", stream_id))?; - get_rows_to_delete(&db, stream_id, stream, - recording.sample_file_bytes as i64, &mut to_delete)?; + let mut to_delete = Vec::new(); + let len = recording.lock().recording.as_ref().unwrap().sample_file_bytes as i64; + let mut db = self.db.lock(); + { + let stream = db.streams_by_id().get(&stream_id).unwrap(); + get_rows_to_delete(&db, stream_id, stream, len, &mut to_delete).unwrap(); + } + db.delete_recordings(&mut to_delete); } - let mut tx = db.tx()?; - tx.mark_sample_files_deleted(&self.to_mark_deleted)?; - tx.delete_recordings(&to_delete)?; - tx.insert_recording(recording)?; - tx.commit()?; - self.to_mark_deleted.clear(); - self.to_unlink.extend(to_delete.iter().map(|row| row.id)); - self.to_unlink.extend_from_slice(&self.to_abandon); - self.to_abandon.clear(); - Ok(()) - } - - /// Tries to unlink all the files in `self.to_unlink` and `self.to_abandon`. - /// Any which can't be unlinked will be retained in the vec. - fn try_unlink(&mut self) { - let to_mark_deleted = &mut self.to_mark_deleted; - let fd = &self.dir.fd; - for &mut (ref mut v, mark_deleted) in &mut [(&mut self.to_unlink, true), - (&mut self.to_abandon, false)] { - v.retain(|&id| { - if let Err(e) = SampleFileDir::unlink(fd, id) { - if e.kind() == io::ErrorKind::NotFound { - warn!("dir: recording {} already deleted!", id); - } else { - warn!("dir: Unable to unlink {}: {}", id, e); - return true; - } - } - if mark_deleted { - to_mark_deleted.push(id); - } - false - }); - } + f.sync_all().unwrap(); + self.dir.sync().unwrap(); + recording.lock().synced = true; + let mut db = self.db.lock(); + let reason = { + let s = db.streams_by_id().get(&stream_id).unwrap(); + let c = db.cameras_by_id().get(&s.camera_id).unwrap(); + let unflushed = s.unflushed(); + if unflushed < s.flush_if { + debug!("{}-{}: unflushed={} < if={}, not flushing", + c.short_name, s.type_.as_str(), unflushed, s.flush_if); + return; + } + format!("{}-{}: unflushed={} >= if={}", + c.short_name, s.type_.as_str(), unflushed, s.flush_if) + }; + let _ = db.flush(&reason); } } @@ -651,6 +620,7 @@ pub struct Writer<'a>(Option>); struct InnerWriter<'a> { syncer_channel: &'a SyncerChannel, f: fs::File, + r: Arc>, index: recording::SampleIndexEncoder, id: CompositeId, corrupt: bool, @@ -744,11 +714,13 @@ pub struct PreviousWriter { impl<'a> Writer<'a> { /// Opens the writer; for use by `SampleFileDir` (which should supply `f`). - fn open(f: fs::File, id: CompositeId, prev: Option, + fn open(f: fs::File, id: CompositeId, r: Arc>, + prev: Option, video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel) -> Result { Ok(Writer(Some(InnerWriter { syncer_channel, f, + r, index: recording::SampleIndexEncoder::new(), id, corrupt: false, @@ -784,6 +756,7 @@ impl<'a> Writer<'a> { Err(e) => { if remaining.len() < pkt.len() { // Partially written packet. Truncate if possible. + // TODO: have the syncer do this / retry it if necessary? if let Err(e2) = w.f.set_len(w.index.sample_file_bytes as u64) { error!("After write to {} failed with {}, truncate failed with {}; \ sample file is corrupt.", w.id, e, e2); @@ -820,7 +793,7 @@ impl<'a> InnerWriter<'a> { fn close(mut self, next_pts: Option) -> Result { if self.corrupt { - self.syncer_channel.async_abandon_recording(self.id); + //self.syncer_channel.async_abandon_recording(self.id); bail!("recording {} is corrupt", self.id); } let unflushed = @@ -839,8 +812,7 @@ impl<'a> InnerWriter<'a> { let flags = if self.index.has_trailing_zero() { db::RecordingFlags::TrailingZero as i32 } else { 0 }; let local_start_delta = self.local_start - start; - let recording = db::RecordingToInsert{ - id: self.id, + let recording = db::RecordingToInsert { sample_file_bytes: self.index.sample_file_bytes, time: start .. end, local_time_delta: local_start_delta, @@ -852,7 +824,8 @@ impl<'a> InnerWriter<'a> { run_offset: self.run_offset, flags: flags, }; - self.syncer_channel.async_save_recording(recording, self.f); + self.r.lock().recording = Some(recording); + self.syncer_channel.async_save_recording(self.id, self.r, self.f); Ok(PreviousWriter{ end_time: end, local_time_delta: local_start_delta, diff --git a/db/lib.rs b/db/lib.rs index e718042..3e4f263 100644 --- a/db/lib.rs +++ b/db/lib.rs @@ -48,6 +48,7 @@ extern crate uuid; mod coding; pub mod db; pub mod dir; +mod raw; pub mod recording; pub mod schema; diff --git a/db/raw.rs b/db/raw.rs new file mode 100644 index 0000000..a62d29d --- /dev/null +++ b/db/raw.rs @@ -0,0 +1,203 @@ +// This file is part of Moonfire NVR, a security camera digital video recorder. +// Copyright (C) 2018 Scott Lamb +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// In addition, as a special exception, the copyright holders give +// permission to link the code of portions of this program with the +// OpenSSL library under certain conditions as described in each +// individual source file, and distribute linked combinations including +// the two. +// +// You must obey the GNU General Public License in all respects for all +// of the code used other than OpenSSL. If you modify file(s) with this +// exception, you may extend this exception to your version of the +// file(s), but you are not obligated to do so. If you do not wish to do +// so, delete this exception statement from your version. If you delete +// this exception statement from all source files in the program, then +// also delete it here. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Raw database access: SQLite statements which do not touch any cached state. + +use db::{self, CompositeId}; +use failure::Error; +use fnv::FnvHashSet; +use recording; +use rusqlite; +use std::ops::Range; + +const INSERT_RECORDING_SQL: &'static str = r#" + insert into recording (composite_id, stream_id, run_offset, flags, sample_file_bytes, + start_time_90k, duration_90k, local_time_delta_90k, video_samples, + video_sync_samples, video_sample_entry_id) + values (:composite_id, :stream_id, :run_offset, :flags, :sample_file_bytes, + :start_time_90k, :duration_90k, :local_time_delta_90k, + :video_samples, :video_sync_samples, :video_sample_entry_id) +"#; + +const INSERT_RECORDING_PLAYBACK_SQL: &'static str = r#" + insert into recording_playback (composite_id, sample_file_sha1, video_index) + values (:composite_id, :sample_file_sha1, :video_index) +"#; + +const STREAM_MIN_START_SQL: &'static str = r#" + select + start_time_90k + from + recording + where + stream_id = :stream_id + order by start_time_90k limit 1 +"#; + +const STREAM_MAX_START_SQL: &'static str = r#" + select + start_time_90k, + duration_90k + from + recording + where + stream_id = :stream_id + order by start_time_90k desc; +"#; + +/// Inserts the specified recording (for from `try_flush` only). +pub(crate) fn insert_recording(tx: &rusqlite::Transaction, id: CompositeId, + r: &db::RecordingToInsert) -> Result<(), Error> { + if r.time.end < r.time.start { + bail!("end time {} must be >= start time {}", r.time.end, r.time.start); + } + + let mut stmt = tx.prepare_cached(INSERT_RECORDING_SQL)?; + stmt.execute_named(&[ + (":composite_id", &id.0), + (":stream_id", &(id.stream() as i64)), + (":run_offset", &r.run_offset), + (":flags", &r.flags), + (":sample_file_bytes", &r.sample_file_bytes), + (":start_time_90k", &r.time.start.0), + (":duration_90k", &(r.time.end.0 - r.time.start.0)), + (":local_time_delta_90k", &r.local_time_delta.0), + (":video_samples", &r.video_samples), + (":video_sync_samples", &r.video_sync_samples), + (":video_sample_entry_id", &r.video_sample_entry_id), + ])?; + + let mut stmt = tx.prepare_cached(INSERT_RECORDING_PLAYBACK_SQL)?; + let sha1 = &r.sample_file_sha1[..]; + stmt.execute_named(&[ + (":composite_id", &id.0), + (":sample_file_sha1", &sha1), + (":video_index", &r.video_index), + ])?; + Ok(()) +} + +/// Deletes the given recordings from the `recording` and `recording_playback` tables. +/// Note they are not fully removed from the database; the ids are transferred to the +/// `garbage` table. +pub(crate) fn delete_recordings(tx: &rusqlite::Transaction, rows: &[db::ListOldestSampleFilesRow]) + -> Result<(), Error> { + let mut del1 = tx.prepare_cached( + "delete from recording_playback where composite_id = :composite_id")?; + let mut del2 = tx.prepare_cached( + "delete from recording where composite_id = :composite_id")?; + let mut insert = tx.prepare_cached(r#" + insert into garbage (sample_file_dir_id, composite_id) + values (:sample_file_dir_id, :composite_id) + "#)?; + for row in rows { + let changes = del1.execute_named(&[(":composite_id", &row.id.0)])?; + if changes != 1 { + bail!("no such recording_playback {}", row.id); + } + let changes = del2.execute_named(&[(":composite_id", &row.id.0)])?; + if changes != 1 { + bail!("no such recording {}", row.id); + } + insert.execute_named(&[ + (":sample_file_dir_id", &row.sample_file_dir_id), + (":composite_id", &row.id.0)], + )?; + } + Ok(()) +} + +/// Marks the given sample files as deleted. This shouldn't be called until the files have +/// been `unlink()`ed and the parent directory `fsync()`ed. +pub(crate) fn mark_sample_files_deleted(tx: &rusqlite::Transaction, ids: &[CompositeId]) + -> Result<(), Error> { + if ids.is_empty() { return Ok(()); } + let mut stmt = tx.prepare_cached("delete from garbage where composite_id = ?")?; + for &id in ids { + let changes = stmt.execute(&[&id.0])?; + if changes != 1 { + bail!("no garbage row for {}", id); + } + } + Ok(()) +} + +/// Gets the time range of recordings for the given stream. +pub(crate) fn get_range(conn: &rusqlite::Connection, stream_id: i32) + -> Result>, Error> { + // The minimum is straightforward, taking advantage of the start_time_90k index. + let mut stmt = conn.prepare_cached(STREAM_MIN_START_SQL)?; + let mut rows = stmt.query_named(&[(":stream_id", &stream_id)])?; + let min_start = match rows.next() { + Some(row) => recording::Time(row?.get_checked(0)?), + None => return Ok(None), + }; + + // There was a minimum, so there should be a maximum too. Calculating it is less + // straightforward because recordings could overlap. All recordings starting in the + // last MAX_RECORDING_DURATION must be examined in order to take advantage of the + // start_time_90k index. + let mut stmt = conn.prepare_cached(STREAM_MAX_START_SQL)?; + let mut rows = stmt.query_named(&[(":stream_id", &stream_id)])?; + let mut maxes_opt = None; + while let Some(row) = rows.next() { + let row = row?; + let row_start = recording::Time(row.get_checked(0)?); + let row_duration: i64 = row.get_checked(1)?; + let row_end = recording::Time(row_start.0 + row_duration); + let maxes = match maxes_opt { + None => row_start .. row_end, + Some(Range{start: s, end: e}) => s .. ::std::cmp::max(e, row_end), + }; + if row_start.0 <= maxes.start.0 - recording::MAX_RECORDING_DURATION { + break; + } + maxes_opt = Some(maxes); + } + let max_end = match maxes_opt { + Some(Range{start: _, end: e}) => e, + None => bail!("missing max for stream {} which had min {}", stream_id, min_start), + }; + Ok(Some(min_start .. max_end)) +} + +/// Lists all garbage ids for the given sample file directory. +pub(crate) fn list_garbage(conn: &rusqlite::Connection, dir_id: i32) + -> Result, Error> { + let mut garbage = FnvHashSet::default(); + let mut stmt = conn.prepare_cached( + "select composite_id from garbage where sample_file_dir_id = ?")?; + let mut rows = stmt.query(&[&dir_id])?; + while let Some(row) = rows.next() { + let row = row?; + garbage.insert(CompositeId(row.get_checked(0)?)); + } + Ok(garbage) +} diff --git a/db/schema.sql b/db/schema.sql index a8c87ea..97f5c13 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -108,6 +108,11 @@ create table stream ( -- file. Older files will be deleted as necessary to stay within this limit. retain_bytes integer not null check (retain_bytes >= 0), + -- Flush the database when completing a recording if this stream has at + -- least this many seconds of unflushed recordings. A value of 0 means that + -- every completed recording will cause a flush. + flush_if_sec integer not null, + -- The low 32 bits of the next recording id to assign for this stream. -- Typically this is the maximum current recording + 1, but it does -- not decrease if that recording is deleted. diff --git a/db/testutil.rs b/db/testutil.rs index 2c54ca5..b0d1450 100644 --- a/db/testutil.rs +++ b/db/testutil.rs @@ -86,28 +86,29 @@ impl TestDb { let dir; { let mut l = db.lock(); - { - sample_file_dir_id = l.add_sample_file_dir(path.to_owned()).unwrap(); - assert_eq!(TEST_CAMERA_ID, l.add_camera(db::CameraChange { - short_name: "test camera".to_owned(), - description: "".to_owned(), - host: "test-camera".to_owned(), - username: "foo".to_owned(), - password: "bar".to_owned(), - streams: [ - db::StreamChange { - sample_file_dir_id: Some(sample_file_dir_id), - rtsp_path: "/main".to_owned(), - record: true, - }, - Default::default(), - ], - }).unwrap()); - test_camera_uuid = l.cameras_by_id().get(&TEST_CAMERA_ID).unwrap().uuid; - let mut tx = l.tx().unwrap(); - tx.update_retention(TEST_STREAM_ID, true, 1048576).unwrap(); - tx.commit().unwrap(); - } + sample_file_dir_id = l.add_sample_file_dir(path.to_owned()).unwrap(); + assert_eq!(TEST_CAMERA_ID, l.add_camera(db::CameraChange { + short_name: "test camera".to_owned(), + description: "".to_owned(), + host: "test-camera".to_owned(), + username: "foo".to_owned(), + password: "bar".to_owned(), + streams: [ + db::StreamChange { + sample_file_dir_id: Some(sample_file_dir_id), + rtsp_path: "/main".to_owned(), + record: true, + flush_if_sec: 0, + }, + Default::default(), + ], + }).unwrap()); + test_camera_uuid = l.cameras_by_id().get(&TEST_CAMERA_ID).unwrap().uuid; + l.update_retention(&[db::RetentionChange { + stream_id: TEST_STREAM_ID, + new_record: true, + new_limit: 1048576, + }]).unwrap(); dir = l.sample_file_dirs_by_id().get(&sample_file_dir_id).unwrap().get().unwrap(); } let mut dirs_by_stream_id = FnvHashMap::default(); @@ -129,28 +130,25 @@ impl TestDb { let mut db = self.db.lock(); let video_sample_entry_id = db.insert_video_sample_entry( 1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap(); - let next = db.streams_by_id().get(&TEST_STREAM_ID).unwrap().next_recording_id; - { - let mut tx = db.tx().unwrap(); - const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); - tx.insert_recording(&db::RecordingToInsert { - id: db::CompositeId::new(TEST_STREAM_ID, next), - sample_file_bytes: encoder.sample_file_bytes, - time: START_TIME .. - START_TIME + recording::Duration(encoder.total_duration_90k as i64), - local_time_delta: recording::Duration(0), - video_samples: encoder.video_samples, - video_sync_samples: encoder.video_sync_samples, - video_sample_entry_id: video_sample_entry_id, - video_index: encoder.video_index, - sample_file_sha1: [0u8; 20], - run_offset: 0, - flags: db::RecordingFlags::TrailingZero as i32, - }).unwrap(); - tx.commit().unwrap(); - } + const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); + let (id, u) = db.add_recording(TEST_STREAM_ID).unwrap(); + u.lock().recording = Some(db::RecordingToInsert { + sample_file_bytes: encoder.sample_file_bytes, + time: START_TIME .. + START_TIME + recording::Duration(encoder.total_duration_90k as i64), + local_time_delta: recording::Duration(0), + video_samples: encoder.video_samples, + video_sync_samples: encoder.video_sync_samples, + video_sample_entry_id: video_sample_entry_id, + video_index: encoder.video_index, + sample_file_sha1: [0u8; 20], + run_offset: 0, + flags: db::RecordingFlags::TrailingZero as i32, + }); + u.lock().synced = true; + db.flush("create_recording_from_encoder").unwrap(); let mut row = None; - db.list_recordings_by_id(TEST_STREAM_ID, next .. next+1, + db.list_recordings_by_id(TEST_STREAM_ID, id.recording() .. id.recording()+1, |r| { row = Some(r); Ok(()) }).unwrap(); row.unwrap() } diff --git a/guide/schema.md b/guide/schema.md index daf82d2..1e21bee 100644 --- a/guide/schema.md +++ b/guide/schema.md @@ -207,6 +207,8 @@ is never used. Version 3 adds over version 1: * recording of sub streams (splits a new `stream` table out of `camera`) +* a per-stream knob `flush_if_sec` meant to reduce database commits (and + thus SSD write cycles). This improves practicality of many streams. * support for multiple sample file directories, to take advantage of multiple hard drives (or multiple RAID volumes). * an interlock between database and sample file directories to avoid various diff --git a/src/clock.rs b/src/clock.rs index ef60898..7e547dd 100644 --- a/src/clock.rs +++ b/src/clock.rs @@ -31,7 +31,7 @@ //! Clock interface and implementations for testability. use libc; -#[cfg(test)] use std::sync::Mutex; +#[cfg(test)] use parking_lot::Mutex; use std::mem; use std::thread; use time::{Duration, Timespec}; @@ -123,12 +123,12 @@ impl SimulatedClocks { #[cfg(test)] impl Clocks for SimulatedClocks { - fn realtime(&self) -> Timespec { self.boot + *self.uptime.lock().unwrap() } - fn monotonic(&self) -> Timespec { Timespec::new(0, 0) + *self.uptime.lock().unwrap() } + fn realtime(&self) -> Timespec { self.boot + *self.uptime.lock() } + fn monotonic(&self) -> Timespec { Timespec::new(0, 0) + *self.uptime.lock() } /// Advances the clock by the specified amount without actually sleeping. fn sleep(&self, how_long: Duration) { - let mut l = self.uptime.lock().unwrap(); + let mut l = self.uptime.lock(); *l = *l + how_long; } } diff --git a/src/cmds/config/cameras.rs b/src/cmds/config/cameras.rs index f6358b7..1346ab9 100644 --- a/src/cmds/config/cameras.rs +++ b/src/cmds/config/cameras.rs @@ -36,6 +36,7 @@ use self::cursive::views; use db::{self, dir}; use failure::Error; use std::collections::BTreeMap; +use std::str::FromStr; use std::sync::Arc; use stream::{self, Opener, Stream}; use super::{decode_size, encode_size}; @@ -62,6 +63,9 @@ fn get_change(siv: &mut Cursive) -> db::CameraChange { .unwrap().get_content().as_str().into(); let r = siv.find_id::(&format!("{}_record", t.as_str())) .unwrap().is_checked(); + let f = i64::from_str(siv.find_id::( + &format!("{}_flush_if_sec", t.as_str())).unwrap().get_content().as_str()) + .unwrap_or(0); let d = *siv.find_id::>>( &format!("{}_sample_file_dir", t.as_str())) .unwrap().selection(); @@ -69,6 +73,7 @@ fn get_change(siv: &mut Cursive) -> db::CameraChange { rtsp_path: p, sample_file_dir_id: d, record: r, + flush_if_sec: f, }; } c @@ -270,9 +275,11 @@ fn edit_camera_dialog(db: &Arc, siv: &mut Cursive, item: &Option, siv: &mut Cursive, item: &Option>| v.set_selection(selected_dir)); diff --git a/src/cmds/config/dirs.rs b/src/cmds/config/dirs.rs index 0b51a9a..7a31280 100644 --- a/src/cmds/config/dirs.rs +++ b/src/cmds/config/dirs.rs @@ -60,12 +60,15 @@ struct Model { /// Updates the limits in the database. Doesn't delete excess data (if any). fn update_limits_inner(model: &Model) -> Result<(), Error> { - let mut db = model.db.lock(); - let mut tx = db.tx()?; - for (&id, stream) in &model.streams { - tx.update_retention(id, stream.record, stream.retain.unwrap())?; + let mut changes = Vec::with_capacity(model.streams.len()); + for (&stream_id, stream) in &model.streams { + changes.push(db::RetentionChange { + stream_id, + new_record: stream.record, + new_limit: stream.retain.unwrap(), + }); } - tx.commit() + model.db.lock().update_retention(&changes) } fn update_limits(model: &Model, siv: &mut Cursive) { diff --git a/src/cmds/run.rs b/src/cmds/run.rs index 34dd96b..73cee32 100644 --- a/src/cmds/run.rs +++ b/src/cmds/run.rs @@ -199,6 +199,9 @@ pub fn run() -> Result<(), Error> { } if let Some(mut ss) = syncers { + // The syncers shut down when all channels to them have been dropped. + // The database maintains one; and `ss` holds one. Drop both. + db.lock().clear_on_flush(); for (_, s) in ss.drain() { drop(s.channel); s.join.join().unwrap(); diff --git a/src/cmds/upgrade/v1_to_v2.rs b/src/cmds/upgrade/v1_to_v2.rs index d672a97..ff908bb 100644 --- a/src/cmds/upgrade/v1_to_v2.rs +++ b/src/cmds/upgrade/v1_to_v2.rs @@ -159,6 +159,7 @@ impl<'a> super::Upgrader for U<'a> { record integer not null check (record in (1, 0)), rtsp_path text not null, retain_bytes integer not null check (retain_bytes >= 0), + flush_if_sec integer not null, next_recording_id integer not null check (next_recording_id >= 0), unique (camera_id, type) ); @@ -227,6 +228,7 @@ impl<'a> super::Upgrader for U<'a> { 1, old_camera.main_rtsp_path, old_camera.retain_bytes, + 0, old_camera.next_recording_id from old_camera cross join sample_file_dir; @@ -241,7 +243,8 @@ impl<'a> super::Upgrader for U<'a> { 0, old_camera.sub_rtsp_path, 0, - 0 + 60, + 1 from old_camera cross join sample_file_dir where diff --git a/src/json.rs b/src/json.rs index ba60d21..cc9ca40 100644 --- a/src/json.rs +++ b/src/json.rs @@ -34,7 +34,7 @@ use serde::ser::{SerializeMap, SerializeSeq, Serializer}; use std::collections::BTreeMap; use uuid::Uuid; -#[derive(Debug, Serialize)] +#[derive(Serialize)] #[serde(rename_all="camelCase")] pub struct TopLevel<'a> { pub time_zone_name: &'a str, diff --git a/src/mp4.rs b/src/mp4.rs index 72a8fed..69feeca 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -2105,6 +2105,7 @@ mod tests { const EXPECTED_ETAG: &'static str = "c56ef7eb3b4a713ceafebc3dc7958bd9e62a2fae"; assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); drop(db.syncer_channel); + db.db.lock().clear_on_flush(); db.syncer_join.join().unwrap(); } @@ -2125,6 +2126,7 @@ mod tests { const EXPECTED_ETAG: &'static str = "3bdc2c8ce521df50155d0ca4d7497ada448fa7c3"; assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); drop(db.syncer_channel); + db.db.lock().clear_on_flush(); db.syncer_join.join().unwrap(); } @@ -2145,6 +2147,7 @@ mod tests { const EXPECTED_ETAG: &'static str = "3986d3bd9b866c3455fb7359fb134aa2d9107af7"; assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); drop(db.syncer_channel); + db.db.lock().clear_on_flush(); db.syncer_join.join().unwrap(); } @@ -2165,6 +2168,7 @@ mod tests { const EXPECTED_ETAG: &'static str = "9e789398c9a71ca834fec8fbc55b389f99d12dda"; assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); drop(db.syncer_channel); + db.db.lock().clear_on_flush(); db.syncer_join.join().unwrap(); } } diff --git a/src/streamer.rs b/src/streamer.rs index a5ec5d3..9991725 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -205,8 +205,9 @@ mod tests { use failure::Error; use h264; use moonfire_ffmpeg; + use parking_lot::Mutex; use std::cmp; - use std::sync::{Arc, Mutex}; + use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use stream::{self, Opener, Stream}; use time; @@ -290,7 +291,7 @@ mod tests { stream::Source::Rtsp(url) => assert_eq!(url, &self.expected_url), stream::Source::File(_) => panic!("expected rtsp url"), }; - let mut l = self.streams.lock().unwrap(); + let mut l = self.streams.lock(); match l.pop() { Some(stream) => { trace!("MockOpener returning next stream"); @@ -361,7 +362,7 @@ mod tests { testutil::TEST_STREAM_ID, camera, s, 0, 3); } stream.run(); - assert!(opener.streams.lock().unwrap().is_empty()); + assert!(opener.streams.lock().is_empty()); db.syncer_channel.flush(); let db = db.db.lock();