move list_recordings_by_* logic into raw.rs

I want to start having the db.rs version augment this with the uncommitted
recordings, and it's nice to have the separation of the raw db vs augmented
versions. Also, this fits with the general theme of shrinking db.rs a bit.

I had to put the raw video_sample_entry_id into the rows rather than
the video_sample_entry Arc. In hindsight, this is better anyway: the common
callers don't need to do the btree lookup and arc clone on every row. I think
I'd originally done it that way only because I was quite new to rust and
didn't understand that db could be used from within the row callback given
that both borrows are immutable.
This commit is contained in:
Scott Lamb 2018-03-01 20:59:05 -08:00
parent b2a8b3c216
commit b17761e871
5 changed files with 115 additions and 106 deletions

115
db/db.rs
View File

@ -63,7 +63,6 @@ use recording::{self, TIME_UNITS_PER_SEC};
use rusqlite;
use schema;
use std::collections::{BTreeMap, VecDeque};
use std::collections::btree_map;
use std::cell::RefCell;
use std::cmp;
use std::io::Write;
@ -96,26 +95,6 @@ const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#"
const UPDATE_NEXT_RECORDING_ID_SQL: &'static str =
"update stream set next_recording_id = :next_recording_id where id = :stream_id";
const LIST_RECORDINGS_BY_ID_SQL: &'static str = r#"
select
recording.composite_id,
recording.run_offset,
recording.flags,
recording.start_time_90k,
recording.duration_90k,
recording.sample_file_bytes,
recording.video_samples,
recording.video_sync_samples,
recording.video_sample_entry_id
from
recording
where
:start <= composite_id and
composite_id < :end
order by
recording.composite_id
"#;
pub struct FromSqlUuid(pub Uuid);
impl rusqlite::types::FromSql for FromSqlUuid {
@ -155,7 +134,7 @@ pub struct VideoSampleEntry {
#[derive(Debug)]
pub struct ListRecordingsRow {
pub start: recording::Time,
pub video_sample_entry: Arc<VideoSampleEntry>,
pub video_sample_entry_id: i32,
pub id: CompositeId,
@ -176,7 +155,7 @@ pub struct ListAggregatedRecordingsRow {
pub video_samples: i64,
pub video_sync_samples: i64,
pub sample_file_bytes: i64,
pub video_sample_entry: Arc<VideoSampleEntry>,
pub video_sample_entry_id: i32,
pub stream_id: i32,
pub flags: i32,
pub run_start_id: i32,
@ -578,8 +557,7 @@ pub struct LockedDatabase {
cameras_by_id: BTreeMap<i32, Camera>,
streams_by_id: BTreeMap<i32, Stream>,
cameras_by_uuid: BTreeMap<Uuid, i32>, // values are ids.
video_sample_entries: BTreeMap<i32, Arc<VideoSampleEntry>>,
list_recordings_by_time_sql: String,
video_sample_entries_by_id: BTreeMap<i32, Arc<VideoSampleEntry>>,
video_index_cache: RefCell<LruCache<i64, Box<[u8]>, fnv::FnvBuildHasher>>,
on_flush: Vec<Box<Fn() + Send>>,
}
@ -1010,8 +988,8 @@ impl LockedDatabase {
pub fn streams_by_id(&self) -> &BTreeMap<i32, Stream> { &self.streams_by_id }
/// Returns an immutable view of the video sample entries.
pub fn video_sample_entries(&self) -> btree_map::Values<i32, Arc<VideoSampleEntry>> {
self.video_sample_entries.values()
pub fn video_sample_entries_by_id(&self) -> &BTreeMap<i32, Arc<VideoSampleEntry>> {
&self.video_sample_entries_by_id
}
/// Gets a given camera by uuid.
@ -1027,52 +1005,14 @@ impl LockedDatabase {
pub fn list_recordings_by_time(
&self, stream_id: i32, desired_time: Range<recording::Time>,
f: &mut FnMut(ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> {
let mut stmt = self.conn.prepare_cached(&self.list_recordings_by_time_sql)?;
let rows = stmt.query_named(&[
(":stream_id", &stream_id),
(":start_time_90k", &desired_time.start.0),
(":end_time_90k", &desired_time.end.0)])?;
self.list_recordings_inner(rows, f)
raw::list_recordings_by_time(&self.conn, stream_id, desired_time, f)
}
/// Lists the specified recordigs in ascending order by id.
/// Lists the specified recordings in ascending order by id.
pub fn list_recordings_by_id(
&self, stream_id: i32, desired_ids: Range<i32>,
f: &mut FnMut(ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> {
let mut stmt = self.conn.prepare_cached(LIST_RECORDINGS_BY_ID_SQL)?;
let rows = stmt.query_named(&[
(":start", &CompositeId::new(stream_id, desired_ids.start).0),
(":end", &CompositeId::new(stream_id, desired_ids.end).0),
])?;
self.list_recordings_inner(rows, f)
}
fn list_recordings_inner(
&self, mut rows: rusqlite::Rows,
f: &mut FnMut(ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> {
while let Some(row) = rows.next() {
let row = row?;
let id = CompositeId(row.get_checked::<_, i64>(0)?);
let vse_id = row.get_checked(8)?;
let video_sample_entry = match self.video_sample_entries.get(&vse_id) {
Some(v) => v,
None => bail!("recording {} references nonexistent video_sample_entry {}",
id, vse_id),
};
let out = ListRecordingsRow {
id,
run_offset: row.get_checked(1)?,
flags: row.get_checked(2)?,
start: recording::Time(row.get_checked(3)?),
duration_90k: row.get_checked(4)?,
sample_file_bytes: row.get_checked(5)?,
video_samples: row.get_checked(6)?,
video_sync_samples: row.get_checked(7)?,
video_sample_entry: video_sample_entry.clone(),
};
f(out)?;
}
Ok(())
raw::list_recordings_by_id(&self.conn, stream_id, desired_ids, f)
}
/// Calls `list_recordings_by_time` and aggregates consecutive recordings.
@ -1103,7 +1043,7 @@ impl LockedDatabase {
let needs_flush = if let Some(a) = aggs.get(&run_start_id) {
let new_dur = a.time.end - a.time.start +
recording::Duration(row.duration_90k as i64);
a.ids.end != recording_id || row.video_sample_entry.id != a.video_sample_entry.id ||
a.ids.end != recording_id || row.video_sample_entry_id != a.video_sample_entry_id ||
new_dur >= forced_split
} else {
false
@ -1133,7 +1073,7 @@ impl LockedDatabase {
video_samples: row.video_samples as i64,
video_sync_samples: row.video_sync_samples as i64,
sample_file_bytes: row.sample_file_bytes as i64,
video_sample_entry: row.video_sample_entry,
video_sample_entry_id: row.video_sample_entry_id,
stream_id,
run_start_id,
flags: row.flags,
@ -1219,7 +1159,7 @@ impl LockedDatabase {
sha1.copy_from_slice(&sha1_vec);
let data: Vec<u8> = row.get_checked(5)?;
self.video_sample_entries.insert(id, Arc::new(VideoSampleEntry {
self.video_sample_entries_by_id.insert(id, Arc::new(VideoSampleEntry {
id: id as i32,
width: row.get_checked::<_, i32>(2)? as u16,
height: row.get_checked::<_, i32>(3)? as u16,
@ -1229,7 +1169,7 @@ impl LockedDatabase {
}));
}
info!("Loaded {} video sample entries",
self.video_sample_entries.len());
self.video_sample_entries_by_id.len());
Ok(())
}
@ -1379,7 +1319,7 @@ impl LockedDatabase {
// Check if it already exists.
// There shouldn't be too many entries, so it's fine to enumerate everything.
for (&id, v) in &self.video_sample_entries {
for (&id, v) in &self.video_sample_entries_by_id {
if v.sha1 == sha1_bytes {
// The width and height should match given that they're also specified within data
// and thus included in the just-compared hash.
@ -1401,7 +1341,7 @@ impl LockedDatabase {
])?;
let id = self.conn.last_insert_rowid() as i32;
self.video_sample_entries.insert(id, Arc::new(VideoSampleEntry {
self.video_sample_entries_by_id.insert(id, Arc::new(VideoSampleEntry {
id,
width,
height,
@ -1707,27 +1647,6 @@ impl Database {
// Note: the meta check comes after the version check to improve the error message when
// trying to open a version 0 or version 1 database (which lacked the meta table).
let uuid = raw::get_db_uuid(&conn)?;
let list_recordings_by_time_sql = format!(r#"
select
recording.composite_id,
recording.run_offset,
recording.flags,
recording.start_time_90k,
recording.duration_90k,
recording.sample_file_bytes,
recording.video_samples,
recording.video_sync_samples,
recording.video_sample_entry_id
from
recording
where
stream_id = :stream_id and
recording.start_time_90k > :start_time_90k - {} and
recording.start_time_90k < :end_time_90k and
recording.start_time_90k + recording.duration_90k > :start_time_90k
order by
recording.start_time_90k
"#, recording::MAX_RECORDING_DURATION);
let open = if read_write {
let mut stmt = conn.prepare(" insert into open (uuid) values (?)")?;
let uuid = Uuid::new_v4();
@ -1746,9 +1665,8 @@ impl Database {
cameras_by_id: BTreeMap::new(),
cameras_by_uuid: BTreeMap::new(),
streams_by_id: BTreeMap::new(),
video_sample_entries: BTreeMap::new(),
video_sample_entries_by_id: BTreeMap::new(),
video_index_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())),
list_recordings_by_time_sql,
on_flush: Vec::new(),
})));
{
@ -1871,7 +1789,8 @@ mod tests {
assert_eq!(r.video_samples, row.video_samples);
assert_eq!(r.video_sync_samples, row.video_sync_samples);
assert_eq!(r.sample_file_bytes, row.sample_file_bytes);
assert_eq!(row.video_sample_entry.rfc6381_codec, "avc1.4d0029");
let vse = db.video_sample_entries_by_id().get(&row.video_sample_entry_id).unwrap();
assert_eq!(vse.rfc6381_codec, "avc1.4d0029");
Ok(())
}).unwrap();
}

View File

@ -38,6 +38,49 @@ use rusqlite;
use std::ops::Range;
use uuid::Uuid;
// Note: the magic number "27000000" below is recording::MAX_RECORDING_DURATION.
const LIST_RECORDINGS_BY_TIME_SQL: &'static str = r#"
select
recording.composite_id,
recording.run_offset,
recording.flags,
recording.start_time_90k,
recording.duration_90k,
recording.sample_file_bytes,
recording.video_samples,
recording.video_sync_samples,
recording.video_sample_entry_id
from
recording
where
stream_id = :stream_id and
recording.start_time_90k > :start_time_90k - 27000000 and
recording.start_time_90k < :end_time_90k and
recording.start_time_90k + recording.duration_90k > :start_time_90k
order by
recording.start_time_90k
"#;
const LIST_RECORDINGS_BY_ID_SQL: &'static str = r#"
select
recording.composite_id,
recording.run_offset,
recording.flags,
recording.start_time_90k,
recording.duration_90k,
recording.sample_file_bytes,
recording.video_samples,
recording.video_sync_samples,
recording.video_sample_entry_id
from
recording
where
:start <= composite_id and
composite_id < :end
order by
recording.composite_id
"#;
const INSERT_RECORDING_SQL: &'static str = r#"
insert into recording (composite_id, stream_id, open_id, run_offset, flags,
sample_file_bytes, start_time_90k, duration_90k,
@ -90,6 +133,51 @@ const LIST_OLDEST_RECORDINGS_SQL: &'static str = r#"
composite_id
"#;
/// Lists the specified recordings in ascending order by start time, passing them to a supplied
/// function. Given that the function is called with the database lock held, it should be quick.
pub(crate) fn list_recordings_by_time(
conn: &rusqlite::Connection, stream_id: i32, desired_time: Range<recording::Time>,
f: &mut FnMut(db::ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> {
let mut stmt = conn.prepare_cached(LIST_RECORDINGS_BY_TIME_SQL)?;
let rows = stmt.query_named(&[
(":stream_id", &stream_id),
(":start_time_90k", &desired_time.start.0),
(":end_time_90k", &desired_time.end.0)])?;
list_recordings_inner(rows, f)
}
/// Lists the specified recordings in ascending order by id.
pub(crate) fn list_recordings_by_id(
conn: &rusqlite::Connection, stream_id: i32, desired_ids: Range<i32>,
f: &mut FnMut(db::ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> {
let mut stmt = conn.prepare_cached(LIST_RECORDINGS_BY_ID_SQL)?;
let rows = stmt.query_named(&[
(":start", &CompositeId::new(stream_id, desired_ids.start).0),
(":end", &CompositeId::new(stream_id, desired_ids.end).0),
])?;
list_recordings_inner(rows, f)
}
fn list_recordings_inner(mut rows: rusqlite::Rows,
f: &mut FnMut(db::ListRecordingsRow) -> Result<(), Error>)
-> Result<(), Error> {
while let Some(row) = rows.next() {
let row = row?;
f(db::ListRecordingsRow {
id: CompositeId(row.get_checked(0)?),
run_offset: row.get_checked(1)?,
flags: row.get_checked(2)?,
start: recording::Time(row.get_checked(3)?),
duration_90k: row.get_checked(4)?,
sample_file_bytes: row.get_checked(5)?,
video_samples: row.get_checked(6)?,
video_sync_samples: row.get_checked(7)?,
video_sample_entry_id: row.get_checked(8)?,
})?;
}
Ok(())
}
pub(crate) fn get_db_uuid(conn: &rusqlite::Connection) -> Result<Uuid, Error> {
conn.query_row("select uuid from meta", &[], |row| -> Result<Uuid, Error> {
let uuid: FromSqlUuid = row.get_checked(0)?;

View File

@ -380,7 +380,7 @@ impl Segment {
frames: recording.video_samples as u16,
key_frames: recording.video_sync_samples as u16,
video_sample_entry_id_and_trailing_zero:
recording.video_sample_entry.id |
recording.video_sample_entry_id |
((((recording.flags & db::RecordingFlags::TrailingZero as i32) != 0) as i32) << 31),
};
@ -442,7 +442,7 @@ impl Segment {
self_.begin = Some(begin);
self_.file_end = it.pos;
self_.video_sample_entry_id_and_trailing_zero =
recording.video_sample_entry.id |
recording.video_sample_entry_id |
(((it.duration_90k == 0) as i32) << 31);
Ok(self_)
})

View File

@ -762,8 +762,9 @@ impl FileBuilder {
self.next_frame_num += s.s.frames as u32;
self.segments.push(s);
if !self.video_sample_entries.iter().any(|e| e.id == row.video_sample_entry.id) {
self.video_sample_entries.push(row.video_sample_entry);
if !self.video_sample_entries.iter().any(|e| e.id == row.video_sample_entry_id) {
let vse = db.video_sample_entries_by_id().get(&row.video_sample_entry_id).unwrap();
self.video_sample_entries.push(vse.clone());
}
Ok(())
}

View File

@ -259,6 +259,7 @@ impl ServiceInner {
.ok_or_else(|| format_err!("no such stream {}/{}", uuid, type_))?;
db.list_aggregated_recordings(stream_id, r, split, &mut |row| {
let end = row.ids.end - 1; // in api, ids are inclusive.
let vse = db.video_sample_entries_by_id().get(&row.video_sample_entry_id).unwrap();
out.recordings.push(json::Recording {
start_id: row.ids.start,
end_id: if end == row.ids.start + 1 { None } else { Some(end) },
@ -266,9 +267,9 @@ impl ServiceInner {
end_time_90k: row.time.end.0,
sample_file_bytes: row.sample_file_bytes,
video_samples: row.video_samples,
video_sample_entry_width: row.video_sample_entry.width,
video_sample_entry_height: row.video_sample_entry.height,
video_sample_entry_sha1: strutil::hex(&row.video_sample_entry.sha1),
video_sample_entry_width: vse.width,
video_sample_entry_height: vse.height,
video_sample_entry_sha1: strutil::hex(&vse.sha1),
});
Ok(())
})?;
@ -283,7 +284,7 @@ impl ServiceInner {
fn init_segment(&self, sha1: [u8; 20], req: &Request) -> Result<Response<slices::Body>, Error> {
let mut builder = mp4::FileBuilder::new(mp4::Type::InitSegment);
let db = self.db.lock();
for ent in db.video_sample_entries() {
for ent in db.video_sample_entries_by_id().values() {
if ent.sha1 == sha1 {
builder.append_video_sample_entry(ent.clone());
let mp4 = builder.build(self.db.clone(), self.dirs_by_stream_id.clone())?;