From b17761e8718f9bf27b35d9a209db2abf297c2c41 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Thu, 1 Mar 2018 20:59:05 -0800 Subject: [PATCH] move list_recordings_by_* logic into raw.rs I want to start having the db.rs version augment this with the uncommitted recordings, and it's nice to have the separation of the raw db vs augmented versions. Also, this fits with the general theme of shrinking db.rs a bit. I had to put the raw video_sample_entry_id into the rows rather than the video_sample_entry Arc. In hindsight, this is better anyway: the common callers don't need to do the btree lookup and arc clone on every row. I think I'd originally done it that way only because I was quite new to rust and didn't understand that db could be used from within the row callback given that both borrows are immutable. --- db/db.rs | 115 +++++++----------------------------------------- db/raw.rs | 88 ++++++++++++++++++++++++++++++++++++ db/recording.rs | 4 +- src/mp4.rs | 5 ++- src/web.rs | 9 ++-- 5 files changed, 115 insertions(+), 106 deletions(-) diff --git a/db/db.rs b/db/db.rs index 40e7ecc..53f5b1c 100644 --- a/db/db.rs +++ b/db/db.rs @@ -63,7 +63,6 @@ use recording::{self, TIME_UNITS_PER_SEC}; use rusqlite; use schema; use std::collections::{BTreeMap, VecDeque}; -use std::collections::btree_map; use std::cell::RefCell; use std::cmp; use std::io::Write; @@ -96,26 +95,6 @@ const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#" const UPDATE_NEXT_RECORDING_ID_SQL: &'static str = "update stream set next_recording_id = :next_recording_id where id = :stream_id"; -const LIST_RECORDINGS_BY_ID_SQL: &'static str = r#" - select - recording.composite_id, - recording.run_offset, - recording.flags, - recording.start_time_90k, - recording.duration_90k, - recording.sample_file_bytes, - recording.video_samples, - recording.video_sync_samples, - recording.video_sample_entry_id - from - recording - where - :start <= composite_id and - composite_id < :end - order by - recording.composite_id -"#; - pub struct FromSqlUuid(pub Uuid); impl rusqlite::types::FromSql for FromSqlUuid { @@ -155,7 +134,7 @@ pub struct VideoSampleEntry { #[derive(Debug)] pub struct ListRecordingsRow { pub start: recording::Time, - pub video_sample_entry: Arc, + pub video_sample_entry_id: i32, pub id: CompositeId, @@ -176,7 +155,7 @@ pub struct ListAggregatedRecordingsRow { pub video_samples: i64, pub video_sync_samples: i64, pub sample_file_bytes: i64, - pub video_sample_entry: Arc, + pub video_sample_entry_id: i32, pub stream_id: i32, pub flags: i32, pub run_start_id: i32, @@ -578,8 +557,7 @@ pub struct LockedDatabase { cameras_by_id: BTreeMap, streams_by_id: BTreeMap, cameras_by_uuid: BTreeMap, // values are ids. - video_sample_entries: BTreeMap>, - list_recordings_by_time_sql: String, + video_sample_entries_by_id: BTreeMap>, video_index_cache: RefCell, fnv::FnvBuildHasher>>, on_flush: Vec>, } @@ -1010,8 +988,8 @@ impl LockedDatabase { pub fn streams_by_id(&self) -> &BTreeMap { &self.streams_by_id } /// Returns an immutable view of the video sample entries. - pub fn video_sample_entries(&self) -> btree_map::Values> { - self.video_sample_entries.values() + pub fn video_sample_entries_by_id(&self) -> &BTreeMap> { + &self.video_sample_entries_by_id } /// Gets a given camera by uuid. @@ -1027,52 +1005,14 @@ impl LockedDatabase { pub fn list_recordings_by_time( &self, stream_id: i32, desired_time: Range, f: &mut FnMut(ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> { - let mut stmt = self.conn.prepare_cached(&self.list_recordings_by_time_sql)?; - let rows = stmt.query_named(&[ - (":stream_id", &stream_id), - (":start_time_90k", &desired_time.start.0), - (":end_time_90k", &desired_time.end.0)])?; - self.list_recordings_inner(rows, f) + raw::list_recordings_by_time(&self.conn, stream_id, desired_time, f) } - /// Lists the specified recordigs in ascending order by id. + /// Lists the specified recordings in ascending order by id. pub fn list_recordings_by_id( &self, stream_id: i32, desired_ids: Range, f: &mut FnMut(ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> { - let mut stmt = self.conn.prepare_cached(LIST_RECORDINGS_BY_ID_SQL)?; - let rows = stmt.query_named(&[ - (":start", &CompositeId::new(stream_id, desired_ids.start).0), - (":end", &CompositeId::new(stream_id, desired_ids.end).0), - ])?; - self.list_recordings_inner(rows, f) - } - - fn list_recordings_inner( - &self, mut rows: rusqlite::Rows, - f: &mut FnMut(ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> { - while let Some(row) = rows.next() { - let row = row?; - let id = CompositeId(row.get_checked::<_, i64>(0)?); - let vse_id = row.get_checked(8)?; - let video_sample_entry = match self.video_sample_entries.get(&vse_id) { - Some(v) => v, - None => bail!("recording {} references nonexistent video_sample_entry {}", - id, vse_id), - }; - let out = ListRecordingsRow { - id, - run_offset: row.get_checked(1)?, - flags: row.get_checked(2)?, - start: recording::Time(row.get_checked(3)?), - duration_90k: row.get_checked(4)?, - sample_file_bytes: row.get_checked(5)?, - video_samples: row.get_checked(6)?, - video_sync_samples: row.get_checked(7)?, - video_sample_entry: video_sample_entry.clone(), - }; - f(out)?; - } - Ok(()) + raw::list_recordings_by_id(&self.conn, stream_id, desired_ids, f) } /// Calls `list_recordings_by_time` and aggregates consecutive recordings. @@ -1103,7 +1043,7 @@ impl LockedDatabase { let needs_flush = if let Some(a) = aggs.get(&run_start_id) { let new_dur = a.time.end - a.time.start + recording::Duration(row.duration_90k as i64); - a.ids.end != recording_id || row.video_sample_entry.id != a.video_sample_entry.id || + a.ids.end != recording_id || row.video_sample_entry_id != a.video_sample_entry_id || new_dur >= forced_split } else { false @@ -1133,7 +1073,7 @@ impl LockedDatabase { video_samples: row.video_samples as i64, video_sync_samples: row.video_sync_samples as i64, sample_file_bytes: row.sample_file_bytes as i64, - video_sample_entry: row.video_sample_entry, + video_sample_entry_id: row.video_sample_entry_id, stream_id, run_start_id, flags: row.flags, @@ -1219,7 +1159,7 @@ impl LockedDatabase { sha1.copy_from_slice(&sha1_vec); let data: Vec = row.get_checked(5)?; - self.video_sample_entries.insert(id, Arc::new(VideoSampleEntry { + self.video_sample_entries_by_id.insert(id, Arc::new(VideoSampleEntry { id: id as i32, width: row.get_checked::<_, i32>(2)? as u16, height: row.get_checked::<_, i32>(3)? as u16, @@ -1229,7 +1169,7 @@ impl LockedDatabase { })); } info!("Loaded {} video sample entries", - self.video_sample_entries.len()); + self.video_sample_entries_by_id.len()); Ok(()) } @@ -1379,7 +1319,7 @@ impl LockedDatabase { // Check if it already exists. // There shouldn't be too many entries, so it's fine to enumerate everything. - for (&id, v) in &self.video_sample_entries { + for (&id, v) in &self.video_sample_entries_by_id { if v.sha1 == sha1_bytes { // The width and height should match given that they're also specified within data // and thus included in the just-compared hash. @@ -1401,7 +1341,7 @@ impl LockedDatabase { ])?; let id = self.conn.last_insert_rowid() as i32; - self.video_sample_entries.insert(id, Arc::new(VideoSampleEntry { + self.video_sample_entries_by_id.insert(id, Arc::new(VideoSampleEntry { id, width, height, @@ -1707,27 +1647,6 @@ impl Database { // Note: the meta check comes after the version check to improve the error message when // trying to open a version 0 or version 1 database (which lacked the meta table). let uuid = raw::get_db_uuid(&conn)?; - let list_recordings_by_time_sql = format!(r#" - select - recording.composite_id, - recording.run_offset, - recording.flags, - recording.start_time_90k, - recording.duration_90k, - recording.sample_file_bytes, - recording.video_samples, - recording.video_sync_samples, - recording.video_sample_entry_id - from - recording - where - stream_id = :stream_id and - recording.start_time_90k > :start_time_90k - {} and - recording.start_time_90k < :end_time_90k and - recording.start_time_90k + recording.duration_90k > :start_time_90k - order by - recording.start_time_90k - "#, recording::MAX_RECORDING_DURATION); let open = if read_write { let mut stmt = conn.prepare(" insert into open (uuid) values (?)")?; let uuid = Uuid::new_v4(); @@ -1746,9 +1665,8 @@ impl Database { cameras_by_id: BTreeMap::new(), cameras_by_uuid: BTreeMap::new(), streams_by_id: BTreeMap::new(), - video_sample_entries: BTreeMap::new(), + video_sample_entries_by_id: BTreeMap::new(), video_index_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())), - list_recordings_by_time_sql, on_flush: Vec::new(), }))); { @@ -1871,7 +1789,8 @@ mod tests { assert_eq!(r.video_samples, row.video_samples); assert_eq!(r.video_sync_samples, row.video_sync_samples); assert_eq!(r.sample_file_bytes, row.sample_file_bytes); - assert_eq!(row.video_sample_entry.rfc6381_codec, "avc1.4d0029"); + let vse = db.video_sample_entries_by_id().get(&row.video_sample_entry_id).unwrap(); + assert_eq!(vse.rfc6381_codec, "avc1.4d0029"); Ok(()) }).unwrap(); } diff --git a/db/raw.rs b/db/raw.rs index 706f0ee..834938e 100644 --- a/db/raw.rs +++ b/db/raw.rs @@ -38,6 +38,49 @@ use rusqlite; use std::ops::Range; use uuid::Uuid; +// Note: the magic number "27000000" below is recording::MAX_RECORDING_DURATION. +const LIST_RECORDINGS_BY_TIME_SQL: &'static str = r#" + select + recording.composite_id, + recording.run_offset, + recording.flags, + recording.start_time_90k, + recording.duration_90k, + recording.sample_file_bytes, + recording.video_samples, + recording.video_sync_samples, + recording.video_sample_entry_id + from + recording + where + stream_id = :stream_id and + recording.start_time_90k > :start_time_90k - 27000000 and + recording.start_time_90k < :end_time_90k and + recording.start_time_90k + recording.duration_90k > :start_time_90k + order by + recording.start_time_90k +"#; + +const LIST_RECORDINGS_BY_ID_SQL: &'static str = r#" + select + recording.composite_id, + recording.run_offset, + recording.flags, + recording.start_time_90k, + recording.duration_90k, + recording.sample_file_bytes, + recording.video_samples, + recording.video_sync_samples, + recording.video_sample_entry_id + from + recording + where + :start <= composite_id and + composite_id < :end + order by + recording.composite_id +"#; + const INSERT_RECORDING_SQL: &'static str = r#" insert into recording (composite_id, stream_id, open_id, run_offset, flags, sample_file_bytes, start_time_90k, duration_90k, @@ -90,6 +133,51 @@ const LIST_OLDEST_RECORDINGS_SQL: &'static str = r#" composite_id "#; +/// Lists the specified recordings in ascending order by start time, passing them to a supplied +/// function. Given that the function is called with the database lock held, it should be quick. +pub(crate) fn list_recordings_by_time( + conn: &rusqlite::Connection, stream_id: i32, desired_time: Range, + f: &mut FnMut(db::ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> { + let mut stmt = conn.prepare_cached(LIST_RECORDINGS_BY_TIME_SQL)?; + let rows = stmt.query_named(&[ + (":stream_id", &stream_id), + (":start_time_90k", &desired_time.start.0), + (":end_time_90k", &desired_time.end.0)])?; + list_recordings_inner(rows, f) +} + +/// Lists the specified recordings in ascending order by id. +pub(crate) fn list_recordings_by_id( + conn: &rusqlite::Connection, stream_id: i32, desired_ids: Range, + f: &mut FnMut(db::ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> { + let mut stmt = conn.prepare_cached(LIST_RECORDINGS_BY_ID_SQL)?; + let rows = stmt.query_named(&[ + (":start", &CompositeId::new(stream_id, desired_ids.start).0), + (":end", &CompositeId::new(stream_id, desired_ids.end).0), + ])?; + list_recordings_inner(rows, f) +} + +fn list_recordings_inner(mut rows: rusqlite::Rows, + f: &mut FnMut(db::ListRecordingsRow) -> Result<(), Error>) + -> Result<(), Error> { + while let Some(row) = rows.next() { + let row = row?; + f(db::ListRecordingsRow { + id: CompositeId(row.get_checked(0)?), + run_offset: row.get_checked(1)?, + flags: row.get_checked(2)?, + start: recording::Time(row.get_checked(3)?), + duration_90k: row.get_checked(4)?, + sample_file_bytes: row.get_checked(5)?, + video_samples: row.get_checked(6)?, + video_sync_samples: row.get_checked(7)?, + video_sample_entry_id: row.get_checked(8)?, + })?; + } + Ok(()) +} + pub(crate) fn get_db_uuid(conn: &rusqlite::Connection) -> Result { conn.query_row("select uuid from meta", &[], |row| -> Result { let uuid: FromSqlUuid = row.get_checked(0)?; diff --git a/db/recording.rs b/db/recording.rs index 219a573..a4bb0dc 100644 --- a/db/recording.rs +++ b/db/recording.rs @@ -380,7 +380,7 @@ impl Segment { frames: recording.video_samples as u16, key_frames: recording.video_sync_samples as u16, video_sample_entry_id_and_trailing_zero: - recording.video_sample_entry.id | + recording.video_sample_entry_id | ((((recording.flags & db::RecordingFlags::TrailingZero as i32) != 0) as i32) << 31), }; @@ -442,7 +442,7 @@ impl Segment { self_.begin = Some(begin); self_.file_end = it.pos; self_.video_sample_entry_id_and_trailing_zero = - recording.video_sample_entry.id | + recording.video_sample_entry_id | (((it.duration_90k == 0) as i32) << 31); Ok(self_) }) diff --git a/src/mp4.rs b/src/mp4.rs index d56cf57..5e41d95 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -762,8 +762,9 @@ impl FileBuilder { self.next_frame_num += s.s.frames as u32; self.segments.push(s); - if !self.video_sample_entries.iter().any(|e| e.id == row.video_sample_entry.id) { - self.video_sample_entries.push(row.video_sample_entry); + if !self.video_sample_entries.iter().any(|e| e.id == row.video_sample_entry_id) { + let vse = db.video_sample_entries_by_id().get(&row.video_sample_entry_id).unwrap(); + self.video_sample_entries.push(vse.clone()); } Ok(()) } diff --git a/src/web.rs b/src/web.rs index 0aa1ae0..830e4ec 100644 --- a/src/web.rs +++ b/src/web.rs @@ -259,6 +259,7 @@ impl ServiceInner { .ok_or_else(|| format_err!("no such stream {}/{}", uuid, type_))?; db.list_aggregated_recordings(stream_id, r, split, &mut |row| { let end = row.ids.end - 1; // in api, ids are inclusive. + let vse = db.video_sample_entries_by_id().get(&row.video_sample_entry_id).unwrap(); out.recordings.push(json::Recording { start_id: row.ids.start, end_id: if end == row.ids.start + 1 { None } else { Some(end) }, @@ -266,9 +267,9 @@ impl ServiceInner { end_time_90k: row.time.end.0, sample_file_bytes: row.sample_file_bytes, video_samples: row.video_samples, - video_sample_entry_width: row.video_sample_entry.width, - video_sample_entry_height: row.video_sample_entry.height, - video_sample_entry_sha1: strutil::hex(&row.video_sample_entry.sha1), + video_sample_entry_width: vse.width, + video_sample_entry_height: vse.height, + video_sample_entry_sha1: strutil::hex(&vse.sha1), }); Ok(()) })?; @@ -283,7 +284,7 @@ impl ServiceInner { fn init_segment(&self, sha1: [u8; 20], req: &Request) -> Result, Error> { let mut builder = mp4::FileBuilder::new(mp4::Type::InitSegment); let db = self.db.lock(); - for ent in db.video_sample_entries() { + for ent in db.video_sample_entries_by_id().values() { if ent.sha1 == sha1 { builder.append_video_sample_entry(ent.clone()); let mp4 = builder.build(self.db.clone(), self.dirs_by_stream_id.clone())?;