diff --git a/db/db.rs b/db/db.rs index 40e7ecc..53f5b1c 100644 --- a/db/db.rs +++ b/db/db.rs @@ -63,7 +63,6 @@ use recording::{self, TIME_UNITS_PER_SEC}; use rusqlite; use schema; use std::collections::{BTreeMap, VecDeque}; -use std::collections::btree_map; use std::cell::RefCell; use std::cmp; use std::io::Write; @@ -96,26 +95,6 @@ const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#" const UPDATE_NEXT_RECORDING_ID_SQL: &'static str = "update stream set next_recording_id = :next_recording_id where id = :stream_id"; -const LIST_RECORDINGS_BY_ID_SQL: &'static str = r#" - select - recording.composite_id, - recording.run_offset, - recording.flags, - recording.start_time_90k, - recording.duration_90k, - recording.sample_file_bytes, - recording.video_samples, - recording.video_sync_samples, - recording.video_sample_entry_id - from - recording - where - :start <= composite_id and - composite_id < :end - order by - recording.composite_id -"#; - pub struct FromSqlUuid(pub Uuid); impl rusqlite::types::FromSql for FromSqlUuid { @@ -155,7 +134,7 @@ pub struct VideoSampleEntry { #[derive(Debug)] pub struct ListRecordingsRow { pub start: recording::Time, - pub video_sample_entry: Arc, + pub video_sample_entry_id: i32, pub id: CompositeId, @@ -176,7 +155,7 @@ pub struct ListAggregatedRecordingsRow { pub video_samples: i64, pub video_sync_samples: i64, pub sample_file_bytes: i64, - pub video_sample_entry: Arc, + pub video_sample_entry_id: i32, pub stream_id: i32, pub flags: i32, pub run_start_id: i32, @@ -578,8 +557,7 @@ pub struct LockedDatabase { cameras_by_id: BTreeMap, streams_by_id: BTreeMap, cameras_by_uuid: BTreeMap, // values are ids. - video_sample_entries: BTreeMap>, - list_recordings_by_time_sql: String, + video_sample_entries_by_id: BTreeMap>, video_index_cache: RefCell, fnv::FnvBuildHasher>>, on_flush: Vec>, } @@ -1010,8 +988,8 @@ impl LockedDatabase { pub fn streams_by_id(&self) -> &BTreeMap { &self.streams_by_id } /// Returns an immutable view of the video sample entries. - pub fn video_sample_entries(&self) -> btree_map::Values> { - self.video_sample_entries.values() + pub fn video_sample_entries_by_id(&self) -> &BTreeMap> { + &self.video_sample_entries_by_id } /// Gets a given camera by uuid. @@ -1027,52 +1005,14 @@ impl LockedDatabase { pub fn list_recordings_by_time( &self, stream_id: i32, desired_time: Range, f: &mut FnMut(ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> { - let mut stmt = self.conn.prepare_cached(&self.list_recordings_by_time_sql)?; - let rows = stmt.query_named(&[ - (":stream_id", &stream_id), - (":start_time_90k", &desired_time.start.0), - (":end_time_90k", &desired_time.end.0)])?; - self.list_recordings_inner(rows, f) + raw::list_recordings_by_time(&self.conn, stream_id, desired_time, f) } - /// Lists the specified recordigs in ascending order by id. + /// Lists the specified recordings in ascending order by id. pub fn list_recordings_by_id( &self, stream_id: i32, desired_ids: Range, f: &mut FnMut(ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> { - let mut stmt = self.conn.prepare_cached(LIST_RECORDINGS_BY_ID_SQL)?; - let rows = stmt.query_named(&[ - (":start", &CompositeId::new(stream_id, desired_ids.start).0), - (":end", &CompositeId::new(stream_id, desired_ids.end).0), - ])?; - self.list_recordings_inner(rows, f) - } - - fn list_recordings_inner( - &self, mut rows: rusqlite::Rows, - f: &mut FnMut(ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> { - while let Some(row) = rows.next() { - let row = row?; - let id = CompositeId(row.get_checked::<_, i64>(0)?); - let vse_id = row.get_checked(8)?; - let video_sample_entry = match self.video_sample_entries.get(&vse_id) { - Some(v) => v, - None => bail!("recording {} references nonexistent video_sample_entry {}", - id, vse_id), - }; - let out = ListRecordingsRow { - id, - run_offset: row.get_checked(1)?, - flags: row.get_checked(2)?, - start: recording::Time(row.get_checked(3)?), - duration_90k: row.get_checked(4)?, - sample_file_bytes: row.get_checked(5)?, - video_samples: row.get_checked(6)?, - video_sync_samples: row.get_checked(7)?, - video_sample_entry: video_sample_entry.clone(), - }; - f(out)?; - } - Ok(()) + raw::list_recordings_by_id(&self.conn, stream_id, desired_ids, f) } /// Calls `list_recordings_by_time` and aggregates consecutive recordings. @@ -1103,7 +1043,7 @@ impl LockedDatabase { let needs_flush = if let Some(a) = aggs.get(&run_start_id) { let new_dur = a.time.end - a.time.start + recording::Duration(row.duration_90k as i64); - a.ids.end != recording_id || row.video_sample_entry.id != a.video_sample_entry.id || + a.ids.end != recording_id || row.video_sample_entry_id != a.video_sample_entry_id || new_dur >= forced_split } else { false @@ -1133,7 +1073,7 @@ impl LockedDatabase { video_samples: row.video_samples as i64, video_sync_samples: row.video_sync_samples as i64, sample_file_bytes: row.sample_file_bytes as i64, - video_sample_entry: row.video_sample_entry, + video_sample_entry_id: row.video_sample_entry_id, stream_id, run_start_id, flags: row.flags, @@ -1219,7 +1159,7 @@ impl LockedDatabase { sha1.copy_from_slice(&sha1_vec); let data: Vec = row.get_checked(5)?; - self.video_sample_entries.insert(id, Arc::new(VideoSampleEntry { + self.video_sample_entries_by_id.insert(id, Arc::new(VideoSampleEntry { id: id as i32, width: row.get_checked::<_, i32>(2)? as u16, height: row.get_checked::<_, i32>(3)? as u16, @@ -1229,7 +1169,7 @@ impl LockedDatabase { })); } info!("Loaded {} video sample entries", - self.video_sample_entries.len()); + self.video_sample_entries_by_id.len()); Ok(()) } @@ -1379,7 +1319,7 @@ impl LockedDatabase { // Check if it already exists. // There shouldn't be too many entries, so it's fine to enumerate everything. - for (&id, v) in &self.video_sample_entries { + for (&id, v) in &self.video_sample_entries_by_id { if v.sha1 == sha1_bytes { // The width and height should match given that they're also specified within data // and thus included in the just-compared hash. @@ -1401,7 +1341,7 @@ impl LockedDatabase { ])?; let id = self.conn.last_insert_rowid() as i32; - self.video_sample_entries.insert(id, Arc::new(VideoSampleEntry { + self.video_sample_entries_by_id.insert(id, Arc::new(VideoSampleEntry { id, width, height, @@ -1707,27 +1647,6 @@ impl Database { // Note: the meta check comes after the version check to improve the error message when // trying to open a version 0 or version 1 database (which lacked the meta table). let uuid = raw::get_db_uuid(&conn)?; - let list_recordings_by_time_sql = format!(r#" - select - recording.composite_id, - recording.run_offset, - recording.flags, - recording.start_time_90k, - recording.duration_90k, - recording.sample_file_bytes, - recording.video_samples, - recording.video_sync_samples, - recording.video_sample_entry_id - from - recording - where - stream_id = :stream_id and - recording.start_time_90k > :start_time_90k - {} and - recording.start_time_90k < :end_time_90k and - recording.start_time_90k + recording.duration_90k > :start_time_90k - order by - recording.start_time_90k - "#, recording::MAX_RECORDING_DURATION); let open = if read_write { let mut stmt = conn.prepare(" insert into open (uuid) values (?)")?; let uuid = Uuid::new_v4(); @@ -1746,9 +1665,8 @@ impl Database { cameras_by_id: BTreeMap::new(), cameras_by_uuid: BTreeMap::new(), streams_by_id: BTreeMap::new(), - video_sample_entries: BTreeMap::new(), + video_sample_entries_by_id: BTreeMap::new(), video_index_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())), - list_recordings_by_time_sql, on_flush: Vec::new(), }))); { @@ -1871,7 +1789,8 @@ mod tests { assert_eq!(r.video_samples, row.video_samples); assert_eq!(r.video_sync_samples, row.video_sync_samples); assert_eq!(r.sample_file_bytes, row.sample_file_bytes); - assert_eq!(row.video_sample_entry.rfc6381_codec, "avc1.4d0029"); + let vse = db.video_sample_entries_by_id().get(&row.video_sample_entry_id).unwrap(); + assert_eq!(vse.rfc6381_codec, "avc1.4d0029"); Ok(()) }).unwrap(); } diff --git a/db/raw.rs b/db/raw.rs index 706f0ee..834938e 100644 --- a/db/raw.rs +++ b/db/raw.rs @@ -38,6 +38,49 @@ use rusqlite; use std::ops::Range; use uuid::Uuid; +// Note: the magic number "27000000" below is recording::MAX_RECORDING_DURATION. +const LIST_RECORDINGS_BY_TIME_SQL: &'static str = r#" + select + recording.composite_id, + recording.run_offset, + recording.flags, + recording.start_time_90k, + recording.duration_90k, + recording.sample_file_bytes, + recording.video_samples, + recording.video_sync_samples, + recording.video_sample_entry_id + from + recording + where + stream_id = :stream_id and + recording.start_time_90k > :start_time_90k - 27000000 and + recording.start_time_90k < :end_time_90k and + recording.start_time_90k + recording.duration_90k > :start_time_90k + order by + recording.start_time_90k +"#; + +const LIST_RECORDINGS_BY_ID_SQL: &'static str = r#" + select + recording.composite_id, + recording.run_offset, + recording.flags, + recording.start_time_90k, + recording.duration_90k, + recording.sample_file_bytes, + recording.video_samples, + recording.video_sync_samples, + recording.video_sample_entry_id + from + recording + where + :start <= composite_id and + composite_id < :end + order by + recording.composite_id +"#; + const INSERT_RECORDING_SQL: &'static str = r#" insert into recording (composite_id, stream_id, open_id, run_offset, flags, sample_file_bytes, start_time_90k, duration_90k, @@ -90,6 +133,51 @@ const LIST_OLDEST_RECORDINGS_SQL: &'static str = r#" composite_id "#; +/// Lists the specified recordings in ascending order by start time, passing them to a supplied +/// function. Given that the function is called with the database lock held, it should be quick. +pub(crate) fn list_recordings_by_time( + conn: &rusqlite::Connection, stream_id: i32, desired_time: Range, + f: &mut FnMut(db::ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> { + let mut stmt = conn.prepare_cached(LIST_RECORDINGS_BY_TIME_SQL)?; + let rows = stmt.query_named(&[ + (":stream_id", &stream_id), + (":start_time_90k", &desired_time.start.0), + (":end_time_90k", &desired_time.end.0)])?; + list_recordings_inner(rows, f) +} + +/// Lists the specified recordings in ascending order by id. +pub(crate) fn list_recordings_by_id( + conn: &rusqlite::Connection, stream_id: i32, desired_ids: Range, + f: &mut FnMut(db::ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> { + let mut stmt = conn.prepare_cached(LIST_RECORDINGS_BY_ID_SQL)?; + let rows = stmt.query_named(&[ + (":start", &CompositeId::new(stream_id, desired_ids.start).0), + (":end", &CompositeId::new(stream_id, desired_ids.end).0), + ])?; + list_recordings_inner(rows, f) +} + +fn list_recordings_inner(mut rows: rusqlite::Rows, + f: &mut FnMut(db::ListRecordingsRow) -> Result<(), Error>) + -> Result<(), Error> { + while let Some(row) = rows.next() { + let row = row?; + f(db::ListRecordingsRow { + id: CompositeId(row.get_checked(0)?), + run_offset: row.get_checked(1)?, + flags: row.get_checked(2)?, + start: recording::Time(row.get_checked(3)?), + duration_90k: row.get_checked(4)?, + sample_file_bytes: row.get_checked(5)?, + video_samples: row.get_checked(6)?, + video_sync_samples: row.get_checked(7)?, + video_sample_entry_id: row.get_checked(8)?, + })?; + } + Ok(()) +} + pub(crate) fn get_db_uuid(conn: &rusqlite::Connection) -> Result { conn.query_row("select uuid from meta", &[], |row| -> Result { let uuid: FromSqlUuid = row.get_checked(0)?; diff --git a/db/recording.rs b/db/recording.rs index 219a573..a4bb0dc 100644 --- a/db/recording.rs +++ b/db/recording.rs @@ -380,7 +380,7 @@ impl Segment { frames: recording.video_samples as u16, key_frames: recording.video_sync_samples as u16, video_sample_entry_id_and_trailing_zero: - recording.video_sample_entry.id | + recording.video_sample_entry_id | ((((recording.flags & db::RecordingFlags::TrailingZero as i32) != 0) as i32) << 31), }; @@ -442,7 +442,7 @@ impl Segment { self_.begin = Some(begin); self_.file_end = it.pos; self_.video_sample_entry_id_and_trailing_zero = - recording.video_sample_entry.id | + recording.video_sample_entry_id | (((it.duration_90k == 0) as i32) << 31); Ok(self_) }) diff --git a/src/mp4.rs b/src/mp4.rs index d56cf57..5e41d95 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -762,8 +762,9 @@ impl FileBuilder { self.next_frame_num += s.s.frames as u32; self.segments.push(s); - if !self.video_sample_entries.iter().any(|e| e.id == row.video_sample_entry.id) { - self.video_sample_entries.push(row.video_sample_entry); + if !self.video_sample_entries.iter().any(|e| e.id == row.video_sample_entry_id) { + let vse = db.video_sample_entries_by_id().get(&row.video_sample_entry_id).unwrap(); + self.video_sample_entries.push(vse.clone()); } Ok(()) } diff --git a/src/web.rs b/src/web.rs index 0aa1ae0..830e4ec 100644 --- a/src/web.rs +++ b/src/web.rs @@ -259,6 +259,7 @@ impl ServiceInner { .ok_or_else(|| format_err!("no such stream {}/{}", uuid, type_))?; db.list_aggregated_recordings(stream_id, r, split, &mut |row| { let end = row.ids.end - 1; // in api, ids are inclusive. + let vse = db.video_sample_entries_by_id().get(&row.video_sample_entry_id).unwrap(); out.recordings.push(json::Recording { start_id: row.ids.start, end_id: if end == row.ids.start + 1 { None } else { Some(end) }, @@ -266,9 +267,9 @@ impl ServiceInner { end_time_90k: row.time.end.0, sample_file_bytes: row.sample_file_bytes, video_samples: row.video_samples, - video_sample_entry_width: row.video_sample_entry.width, - video_sample_entry_height: row.video_sample_entry.height, - video_sample_entry_sha1: strutil::hex(&row.video_sample_entry.sha1), + video_sample_entry_width: vse.width, + video_sample_entry_height: vse.height, + video_sample_entry_sha1: strutil::hex(&vse.sha1), }); Ok(()) })?; @@ -283,7 +284,7 @@ impl ServiceInner { fn init_segment(&self, sha1: [u8; 20], req: &Request) -> Result, Error> { let mut builder = mp4::FileBuilder::new(mp4::Type::InitSegment); let db = self.db.lock(); - for ent in db.video_sample_entries() { + for ent in db.video_sample_entries_by_id().values() { if ent.sha1 == sha1 { builder.append_video_sample_entry(ent.clone()); let mp4 = builder.build(self.db.clone(), self.dirs_by_stream_id.clone())?;