track cumulative duration and runs

This is useful for a combo scrub bar-based UI (#32) + live view UI (#59)
in a non-obvious way. When constructing a HTML Media Source Extensions
API SourceBuffer, the caller can specify a "mode" of either "segments"
or "sequence":

In "sequence" mode, playback assumes segments are added sequentially.
This is good enough for a live view-only UI (#59) but not for a scrub
bar UI in which you may want to seek backward to a segment you've never
seen before. You will then need to insert a segment out-of-sequence.
Imagine what happens when the user goes forward again until the end of
the segment inserted immediately before it. The user should see the
chronologically next segment or a pause for loading if it's unavailable.
The best approximation of this is to track the mapping of timestamps to
segments and insert a VTTCue with an enter/exit handler that seeks to
the right position. But seeking isn't instantaneous; the user will
likely briefly see first the segment they seeked to before. That's
janky. Additionally, the "canplaythrough" event will behave strangely.

In "segments" mode, playback respects the timestamps we set:

* The obvious choice is to use wall clock timestamps. This is fine if
  they're known to be fixed and correct. They're not. The
  currently-recording segment may be "unanchored", meaning its start
  timestamp is not yet fixed. Older timestamps may overlap if the system
  clock was stepped between runs. The latter isn't /too/ bad from a user
  perspective, though it's confusing as a developer. We probably will
  only end up showing the more recent recording for a given
  timestamp anyway. But the former is quite annoying. It means we have
  to throw away part of the SourceBuffer that we may want to seek back
  (causing UI pauses when that happens) or keep our own spare copy of it
  (memory bloat). I'd like to avoid the whole mess.

* Another approach is to use timestamps that are guaranteed to be in
  the correct order but that may have gaps. In particular, a timestamp
  of (recording_id * max_recording_duration) + time_within_recording.
  But again seeking isn't instantaneous. In my experiments, there's a
  visible pause between segments that drives me nuts.

* Finally, the approach that led me to this schema change. Use
  timestamps that place each segment after the one before, possibly with
  an intentional gap between runs (to force a wait where we have an
  actual gap). This should make the browser's natural playback behavior
  work properly: it never goes to an incorrect place, and it only waits
  when/if we want it to. We have to maintain a mapping between its
  timestamps and segment ids but that's doable.

This commit is only the schema change; the new data aren't exposed in
the API yet, much less used by a UI.

Note that stream.next_recording_id became stream.cum_recordings. I made
a slight definition change in the process: recording ids for new streams
start at 0 rather than 1. Various tests changed accordingly.

The upgrade process makes a best effort to backfill these new fields,
but of course it doesn't know the total duration or number of runs of
previously deleted rows. That's good enough.
This commit is contained in:
Scott Lamb 2020-06-09 16:17:32 -07:00
parent 6187aa64cf
commit f3ddbfe22a
9 changed files with 268 additions and 83 deletions

127
db/db.rs
View File

@ -103,8 +103,13 @@ const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#"
:rfc6381_codec, :data) :rfc6381_codec, :data)
"#; "#;
const UPDATE_NEXT_RECORDING_ID_SQL: &'static str = const UPDATE_STREAM_COUNTERS_SQL: &'static str = r#"
"update stream set next_recording_id = :next_recording_id where id = :stream_id"; update stream
set cum_recordings = :cum_recordings,
cum_duration_90k = :cum_duration_90k,
cum_runs = :cum_runs
where id = :stream_id
"#;
pub struct FromSqlUuid(pub Uuid); pub struct FromSqlUuid(pub Uuid);
@ -220,13 +225,20 @@ pub enum RecordingFlags {
Uncommitted = 1 << 31, Uncommitted = 1 << 31,
} }
/// A recording to pass to `insert_recording`. /// A recording to pass to `LockedDatabase::add_recording` and `raw::insert_recording`.
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
pub struct RecordingToInsert { pub struct RecordingToInsert {
pub run_offset: i32, pub run_offset: i32,
pub flags: i32, pub flags: i32,
pub sample_file_bytes: i32, pub sample_file_bytes: i32,
pub start: recording::Time, pub start: recording::Time,
/// Filled in by `add_recording`.
pub prev_duration: recording::Duration,
/// Filled in by `add_recording`.
pub prev_runs: i32,
pub duration_90k: i32, // a recording::Duration, but guaranteed to fit in i32. pub duration_90k: i32, // a recording::Duration, but guaranteed to fit in i32.
pub local_time_delta: recording::Duration, pub local_time_delta: recording::Duration,
pub video_samples: i32, pub video_samples: i32,
@ -437,22 +449,28 @@ pub struct Stream {
/// recordings.) /// recordings.)
pub bytes_to_add: i64, pub bytes_to_add: i64,
/// The total duration of recorded data. This may not be `range.end - range.start` due to /// The total duration of undeleted recorded data. This may not be `range.end - range.start`
/// gaps and overlap. /// due to gaps and overlap.
pub duration: recording::Duration, pub duration: recording::Duration,
/// Mapping of calendar day (in the server's time zone) to a summary of recordings on that day. /// Mapping of calendar day (in the server's time zone) to a summary of recordings on that day.
pub days: BTreeMap<StreamDayKey, StreamDayValue>, pub days: BTreeMap<StreamDayKey, StreamDayValue>,
pub record: bool, pub record: bool,
/// The `next_recording_id` currently committed to the database. /// The `cum_recordings` currently committed to the database.
pub(crate) next_recording_id: i32, pub(crate) cum_recordings: i32,
/// The `cum_duration_90k` currently committed to the database.
cum_duration: recording::Duration,
/// The `cum_runs` currently committed to the database.
cum_runs: i32,
/// The recordings which have been added via `LockedDatabase::add_recording` but have yet to /// The recordings which have been added via `LockedDatabase::add_recording` but have yet to
/// committed to the database. /// committed to the database.
/// ///
/// `uncommitted[i]` uses sample filename `CompositeId::new(id, next_recording_id + 1)`; /// `uncommitted[i]` uses sample filename `CompositeId::new(id, cum_recordings + i)`;
/// `next_recording_id` should be advanced when one is committed to maintain this invariant. /// `cum_recordings` should be advanced when one is committed to maintain this invariant.
/// ///
/// TODO: alter the serving path to show these just as if they were already committed. /// TODO: alter the serving path to show these just as if they were already committed.
uncommitted: VecDeque<Arc<Mutex<RecordingToInsert>>>, uncommitted: VecDeque<Arc<Mutex<RecordingToInsert>>>,
@ -734,9 +752,11 @@ impl StreamStateChanger {
// Insert stream. // Insert stream.
let mut stmt = tx.prepare_cached(r#" let mut stmt = tx.prepare_cached(r#"
insert into stream (camera_id, sample_file_dir_id, type, rtsp_url, record, insert into stream (camera_id, sample_file_dir_id, type, rtsp_url, record,
retain_bytes, flush_if_sec, next_recording_id) retain_bytes, flush_if_sec, cum_recordings,
cum_duration_90k, cum_runs)
values (:camera_id, :sample_file_dir_id, :type, :rtsp_url, :record, values (:camera_id, :sample_file_dir_id, :type, :rtsp_url, :record,
0, :flush_if_sec, 1) 0, :flush_if_sec, 0,
0, 0)
"#)?; "#)?;
stmt.execute_named(named_params!{ stmt.execute_named(named_params!{
":camera_id": camera_id, ":camera_id": camera_id,
@ -781,7 +801,9 @@ impl StreamStateChanger {
duration: recording::Duration(0), duration: recording::Duration(0),
days: BTreeMap::new(), days: BTreeMap::new(),
record: sc.record, record: sc.record,
next_recording_id: 1, cum_recordings: 0,
cum_duration: recording::Duration(0),
cum_runs: 0,
uncommitted: VecDeque::new(), uncommitted: VecDeque::new(),
synced_recordings: 0, synced_recordings: 0,
on_live_segment: Vec::new(), on_live_segment: Vec::new(),
@ -820,18 +842,35 @@ impl LockedDatabase {
pub fn flushes(&self) -> usize { self.flush_count } pub fn flushes(&self) -> usize { self.flush_count }
/// Adds a placeholder for an uncommitted recording. /// Adds a placeholder for an uncommitted recording.
///
/// The caller should write samples and fill the returned `RecordingToInsert` as it goes /// The caller should write samples and fill the returned `RecordingToInsert` as it goes
/// (noting that while holding the lock, it should not perform I/O or acquire the database /// (noting that while holding the lock, it should not perform I/O or acquire the database
/// lock). Then it should sync to permanent storage and call `mark_synced`. The data will /// lock). Then it should sync to permanent storage and call `mark_synced`. The data will
/// be written to the database on the next `flush`. /// be written to the database on the next `flush`.
pub(crate) fn add_recording(&mut self, stream_id: i32, r: RecordingToInsert) ///
/// A call to `add_recording` is also a promise that previous recordings (even if not yet
/// synced and committed) won't change.
///
/// This fills the `prev_duration` and `prev_runs` fields.
pub(crate) fn add_recording(&mut self, stream_id: i32, mut r: RecordingToInsert)
-> Result<(CompositeId, Arc<Mutex<RecordingToInsert>>), Error> { -> Result<(CompositeId, Arc<Mutex<RecordingToInsert>>), Error> {
let stream = match self.streams_by_id.get_mut(&stream_id) { let stream = match self.streams_by_id.get_mut(&stream_id) {
None => bail!("no such stream {}", stream_id), None => bail!("no such stream {}", stream_id),
Some(s) => s, Some(s) => s,
}; };
let id = CompositeId::new(stream_id, let id = CompositeId::new(stream_id,
stream.next_recording_id + (stream.uncommitted.len() as i32)); stream.cum_recordings + (stream.uncommitted.len() as i32));
match stream.uncommitted.back() {
Some(s) => {
let l = s.lock();
r.prev_duration = l.prev_duration + recording::Duration(l.duration_90k.into());
r.prev_runs = l.prev_runs + if l.run_offset == 0 { 1 } else { 0 };
},
None => {
r.prev_duration = stream.cum_duration;
r.prev_runs = stream.cum_runs;
},
};
let recording = Arc::new(Mutex::new(r)); let recording = Arc::new(Mutex::new(r));
stream.uncommitted.push_back(Arc::clone(&recording)); stream.uncommitted.push_back(Arc::clone(&recording));
Ok((id, recording)) Ok((id, recording))
@ -844,10 +883,10 @@ impl LockedDatabase {
None => bail!("no stream for recording {}", id), None => bail!("no stream for recording {}", id),
Some(s) => s, Some(s) => s,
}; };
let next_unsynced = stream.next_recording_id + (stream.synced_recordings as i32); let next_unsynced = stream.cum_recordings + (stream.synced_recordings as i32);
if id.recording() != next_unsynced { if id.recording() != next_unsynced {
bail!("can't sync {} when next unsynced recording is {} (next unflushed is {})", bail!("can't sync {} when next unsynced recording is {} (next unflushed is {})",
id, next_unsynced, stream.next_recording_id); id, next_unsynced, stream.cum_recordings);
} }
if stream.synced_recordings == stream.uncommitted.len() { if stream.synced_recordings == stream.uncommitted.len() {
bail!("can't sync un-added recording {}", id); bail!("can't sync un-added recording {}", id);
@ -924,19 +963,25 @@ impl LockedDatabase {
let mut new_ranges = FnvHashMap::with_capacity_and_hasher(self.streams_by_id.len(), let mut new_ranges = FnvHashMap::with_capacity_and_hasher(self.streams_by_id.len(),
Default::default()); Default::default());
{ {
let mut stmt = tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?; let mut stmt = tx.prepare_cached(UPDATE_STREAM_COUNTERS_SQL)?;
for (&stream_id, s) in &self.streams_by_id { for (&stream_id, s) in &self.streams_by_id {
// Process additions. // Process additions.
let mut new_duration = 0;
let mut new_runs = 0;
for i in 0..s.synced_recordings { for i in 0..s.synced_recordings {
let l = s.uncommitted[i].lock(); let l = s.uncommitted[i].lock();
raw::insert_recording( raw::insert_recording(
&tx, o, CompositeId::new(stream_id, s.next_recording_id + i as i32), &l)?; &tx, o, CompositeId::new(stream_id, s.cum_recordings + i as i32), &l)?;
new_duration += i64::from(l.duration_90k);
new_runs += if l.run_offset == 0 { 1 } else { 0 };
} }
if s.synced_recordings > 0 { if s.synced_recordings > 0 {
new_ranges.entry(stream_id).or_insert(None); new_ranges.entry(stream_id).or_insert(None);
stmt.execute_named(named_params!{ stmt.execute_named(named_params!{
":stream_id": stream_id, ":stream_id": stream_id,
":next_recording_id": s.next_recording_id + s.synced_recordings as i32, ":cum_recordings": s.cum_recordings + s.synced_recordings as i32,
":cum_duration_90k": s.cum_duration.0 + new_duration,
":cum_runs": s.cum_runs + new_runs,
})?; })?;
} }
@ -1026,10 +1071,14 @@ impl LockedDatabase {
log.added.reserve(s.synced_recordings); log.added.reserve(s.synced_recordings);
for _ in 0..s.synced_recordings { for _ in 0..s.synced_recordings {
let u = s.uncommitted.pop_front().unwrap(); let u = s.uncommitted.pop_front().unwrap();
log.added.push(CompositeId::new(stream_id, s.next_recording_id)); log.added.push(CompositeId::new(stream_id, s.cum_recordings));
s.next_recording_id += 1;
let l = u.lock(); let l = u.lock();
let end = l.start + recording::Duration(l.duration_90k as i64); s.cum_recordings += 1;
let dur = recording::Duration(l.duration_90k.into());
s.cum_duration += dur;
s.cum_runs += if l.run_offset == 0 { 1 } else { 0 };
let end = l.start + dur;
info!("range={:?}", l.start .. end);
s.add_recording(l.start .. end, l.sample_file_bytes); s.add_recording(l.start .. end, l.sample_file_bytes);
} }
s.synced_recordings = 0; s.synced_recordings = 0;
@ -1177,7 +1226,7 @@ impl LockedDatabase {
if l.start > desired_time.end || end < desired_time.start { if l.start > desired_time.end || end < desired_time.start {
continue; // there's no overlap with the requested range. continue; // there's no overlap with the requested range.
} }
l.to_list_row(CompositeId::new(stream_id, s.next_recording_id + i as i32), l.to_list_row(CompositeId::new(stream_id, s.cum_recordings + i as i32),
self.open.unwrap().id) self.open.unwrap().id)
} else { } else {
continue; continue;
@ -1196,18 +1245,18 @@ impl LockedDatabase {
None => bail!("no such stream {}", stream_id), None => bail!("no such stream {}", stream_id),
Some(s) => s, Some(s) => s,
}; };
if desired_ids.start < s.next_recording_id { if desired_ids.start < s.cum_recordings {
raw::list_recordings_by_id(&self.conn, stream_id, desired_ids.clone(), f)?; raw::list_recordings_by_id(&self.conn, stream_id, desired_ids.clone(), f)?;
} }
if desired_ids.end > s.next_recording_id { if desired_ids.end > s.cum_recordings {
let start = cmp::max(0, desired_ids.start - s.next_recording_id) as usize; let start = cmp::max(0, desired_ids.start - s.cum_recordings) as usize;
let end = cmp::min((desired_ids.end - s.next_recording_id) as usize, let end = cmp::min((desired_ids.end - s.cum_recordings) as usize,
s.uncommitted.len()); s.uncommitted.len());
for i in start .. end { for i in start .. end {
let row = { let row = {
let l = s.uncommitted[i].lock(); let l = s.uncommitted[i].lock();
if l.video_samples > 0 { if l.video_samples > 0 {
l.to_list_row(CompositeId::new(stream_id, s.next_recording_id + i as i32), l.to_list_row(CompositeId::new(stream_id, s.cum_recordings + i as i32),
self.open.unwrap().id) self.open.unwrap().id)
} else { } else {
continue; continue;
@ -1300,11 +1349,11 @@ impl LockedDatabase {
let s = self.streams_by_id let s = self.streams_by_id
.get(&id.stream()) .get(&id.stream())
.ok_or_else(|| format_err!("no stream for {}", id))?; .ok_or_else(|| format_err!("no stream for {}", id))?;
if s.next_recording_id <= id.recording() { if s.cum_recordings <= id.recording() {
let i = id.recording() - s.next_recording_id; let i = id.recording() - s.cum_recordings;
if i as usize >= s.uncommitted.len() { if i as usize >= s.uncommitted.len() {
bail!("no such recording {}; latest committed is {}, latest is {}", bail!("no such recording {}; latest committed is {}, latest is {}",
id, s.next_recording_id, s.next_recording_id + s.uncommitted.len() as i32); id, s.cum_recordings, s.cum_recordings + s.uncommitted.len() as i32);
} }
let l = s.uncommitted[i as usize].lock(); let l = s.uncommitted[i as usize].lock();
return f(&RecordingPlayback { video_index: &l.video_index }); return f(&RecordingPlayback { video_index: &l.video_index });
@ -1474,7 +1523,9 @@ impl LockedDatabase {
rtsp_url, rtsp_url,
retain_bytes, retain_bytes,
flush_if_sec, flush_if_sec,
next_recording_id, cum_recordings,
cum_duration_90k,
cum_runs,
record record
from from
stream; stream;
@ -1507,8 +1558,10 @@ impl LockedDatabase {
bytes_to_add: 0, bytes_to_add: 0,
duration: recording::Duration(0), duration: recording::Duration(0),
days: BTreeMap::new(), days: BTreeMap::new(),
next_recording_id: row.get(7)?, cum_recordings: row.get(7)?,
record: row.get(8)?, cum_duration: recording::Duration(row.get(8)?),
cum_runs: row.get(9)?,
record: row.get(10)?,
uncommitted: VecDeque::new(), uncommitted: VecDeque::new(),
synced_recordings: 0, synced_recordings: 0,
on_live_segment: Vec::new(), on_live_segment: Vec::new(),
@ -2310,12 +2363,14 @@ mod tests {
} }
let camera_uuid = { db.lock().cameras_by_id().get(&camera_id).unwrap().uuid }; let camera_uuid = { db.lock().cameras_by_id().get(&camera_id).unwrap().uuid };
assert_no_recordings(&db, camera_uuid); assert_no_recordings(&db, camera_uuid);
assert_eq!(db.lock().streams_by_id().get(&main_stream_id).unwrap().cum_recordings, 0);
// Closing and reopening the database should present the same contents. // Closing and reopening the database should present the same contents.
let conn = db.close(); let conn = db.close();
let db = Database::new(clock::RealClocks {}, conn, true).unwrap(); let db = Database::new(clock::RealClocks {}, conn, true).unwrap();
assert_eq!(db.lock().streams_by_id().get(&sub_stream_id).unwrap().flush_if_sec, 2); assert_eq!(db.lock().streams_by_id().get(&sub_stream_id).unwrap().flush_if_sec, 2);
assert_no_recordings(&db, camera_uuid); assert_no_recordings(&db, camera_uuid);
assert_eq!(db.lock().streams_by_id().get(&main_stream_id).unwrap().cum_recordings, 0);
// TODO: assert_eq!(db.lock().list_garbage(sample_file_dir_id).unwrap(), &[]); // TODO: assert_eq!(db.lock().list_garbage(sample_file_dir_id).unwrap(), &[]);
@ -2336,6 +2391,8 @@ mod tests {
run_offset: 0, run_offset: 0,
flags: 0, flags: 0,
start, start,
prev_duration: recording::Duration(0),
prev_runs: 0,
duration_90k: TIME_UNITS_PER_SEC as i32, duration_90k: TIME_UNITS_PER_SEC as i32,
local_time_delta: recording::Duration(0), local_time_delta: recording::Duration(0),
video_samples: 1, video_samples: 1,
@ -2351,7 +2408,7 @@ mod tests {
db.flush("add test").unwrap(); db.flush("add test").unwrap();
id id
}; };
assert_eq!(db.lock().streams_by_id().get(&main_stream_id).unwrap().next_recording_id, 2); assert_eq!(db.lock().streams_by_id().get(&main_stream_id).unwrap().cum_recordings, 1);
// Queries should return the correct result (with caches update on insert). // Queries should return the correct result (with caches update on insert).
assert_single_recording(&db, main_stream_id, &recording); assert_single_recording(&db, main_stream_id, &recording);

View File

@ -174,14 +174,15 @@ pub(crate) fn get_db_uuid(conn: &rusqlite::Connection) -> Result<Uuid, Error> {
/// Inserts the specified recording (for from `try_flush` only). /// Inserts the specified recording (for from `try_flush` only).
pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: CompositeId, pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: CompositeId,
r: &db::RecordingToInsert) -> Result<(), Error> { r: &db::RecordingToInsert) -> Result<(), Error> {
let mut stmt = tx.prepare_cached(r#" let mut stmt = tx.prepare_cached(r#"
insert into recording (composite_id, stream_id, open_id, run_offset, flags, insert into recording (composite_id, stream_id, open_id, run_offset, flags,
sample_file_bytes, start_time_90k, duration_90k, sample_file_bytes, start_time_90k, prev_duration_90k,
video_samples, video_sync_samples, video_sample_entry_id) prev_runs, duration_90k, video_samples, video_sync_samples,
video_sample_entry_id)
values (:composite_id, :stream_id, :open_id, :run_offset, :flags, values (:composite_id, :stream_id, :open_id, :run_offset, :flags,
:sample_file_bytes, :start_time_90k, :duration_90k, :sample_file_bytes, :start_time_90k, :prev_duration_90k,
:video_samples, :video_sync_samples, :prev_runs, :duration_90k, :video_samples, :video_sync_samples,
:video_sample_entry_id) :video_sample_entry_id)
"#).with_context(|e| format!("can't prepare recording insert: {}", e))?; "#).with_context(|e| format!("can't prepare recording insert: {}", e))?;
stmt.execute_named(named_params!{ stmt.execute_named(named_params!{
@ -193,6 +194,8 @@ pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: Com
":sample_file_bytes": r.sample_file_bytes, ":sample_file_bytes": r.sample_file_bytes,
":start_time_90k": r.start.0, ":start_time_90k": r.start.0,
":duration_90k": r.duration_90k, ":duration_90k": r.duration_90k,
":prev_duration_90k": r.prev_duration.0,
":prev_runs": r.prev_runs,
":video_samples": r.video_samples, ":video_samples": r.video_samples,
":video_sync_samples": r.video_sync_samples, ":video_sync_samples": r.video_sync_samples,
":video_sample_entry_id": r.video_sample_entry_id, ":video_sample_entry_id": r.video_sample_entry_id,

View File

@ -145,10 +145,16 @@ create table stream (
-- then fails again, forever. -- then fails again, forever.
flush_if_sec integer not null, flush_if_sec integer not null,
-- The low 32 bits of the next recording id to assign for this stream. -- The total number of recordings ever created on this stream, including
-- Typically this is the maximum current recording + 1, but it does -- deleted ones. This is used for assigning the next recording id.
-- not decrease if that recording is deleted. cum_recordings integer not null check (cum_recordings >= 0),
next_recording_id integer not null check (next_recording_id >= 0),
-- The total duration of all recordings ever created on this stream.
cum_duration_90k integer not null check (cum_duration_90k >= 0),
-- The total number of runs (recordings with run_offset = 0) ever created
-- on this stream.
cum_runs integer not null check (cum_runs >= 0),
unique (camera_id, type) unique (camera_id, type)
); );
@ -158,12 +164,13 @@ create table stream (
create table recording ( create table recording (
-- The high 32 bits of composite_id are taken from the stream's id, which -- The high 32 bits of composite_id are taken from the stream's id, which
-- improves locality. The low 32 bits are taken from the stream's -- improves locality. The low 32 bits are taken from the stream's
-- next_recording_id (which should be post-incremented in the same -- cum_recordings (which should be post-incremented in the same
-- transaction). It'd be simpler to use a "without rowid" table and separate -- transaction). It'd be simpler to use a "without rowid" table and separate
-- fields to make up the primary key, but -- fields to make up the primary key, but
-- <https://www.sqlite.org/withoutrowid.html> points out that "without rowid" -- <https://www.sqlite.org/withoutrowid.html> points out that "without
-- is not appropriate when the average row size is in excess of 50 bytes. -- rowid" is not appropriate when the average row size is in excess of 50
-- recording_cover rows (which match this id format) are typically 1--5 KiB. -- bytes. recording_cover rows (which match this id format) are typically
-- 1--5 KiB.
composite_id integer primary key, composite_id integer primary key,
-- The open in which this was committed to the database. For a given -- The open in which this was committed to the database. For a given
@ -172,13 +179,13 @@ create table recording (
-- This field allows disambiguation in etags and such. -- This field allows disambiguation in etags and such.
open_id integer not null references open (id), open_id integer not null references open (id),
-- This field is redundant with id above, but used to enforce the reference -- This field is redundant with composite_id above, but used to enforce the
-- constraint and to structure the recording_start_time index. -- reference constraint and to structure the recording_start_time index.
stream_id integer not null references stream (id), stream_id integer not null references stream (id),
-- The offset of this recording within a run. 0 means this was the first -- The offset of this recording within a run. 0 means this was the first
-- recording made from a RTSP session. The start of the run has id -- recording made from a RTSP session. The start of the run has composite_id
-- (id-run_offset). -- (composite_id-run_offset).
run_offset integer not null, run_offset integer not null,
-- flags is a bitmask: -- flags is a bitmask:
@ -193,9 +200,18 @@ create table recording (
-- The starting time of the recording, in 90 kHz units since -- The starting time of the recording, in 90 kHz units since
-- 1970-01-01 00:00:00 UTC excluding leap seconds. Currently on initial -- 1970-01-01 00:00:00 UTC excluding leap seconds. Currently on initial
-- connection, this is taken from the local system time; on subsequent -- connection, this is taken from the local system time; on subsequent
-- recordings, it exactly matches the previous recording's end time. -- recordings in a run, it exactly matches the previous recording's end
-- time.
start_time_90k integer not null check (start_time_90k > 0), start_time_90k integer not null check (start_time_90k > 0),
-- The total duration of all previous recordings on this stream. This is
-- returned in API requests and may be helpful for timestamps in a HTML
-- MediaSourceExtensions SourceBuffer.
prev_duration_90k integer not null check (prev_duration_90k >= 0),
-- The total number of previous runs (rows in which run_offset = 0).
prev_runs integer not null check (prev_runs >= 0),
-- The duration of the recording, in 90 kHz units. -- The duration of the recording, in 90 kHz units.
duration_90k integer not null duration_90k integer not null
check (duration_90k >= 0 and duration_90k < 5*60*90000), check (duration_90k >= 0 and duration_90k < 5*60*90000),
@ -280,7 +296,7 @@ create table recording_playback (
-- Files which are to be deleted (may or may not still exist). -- Files which are to be deleted (may or may not still exist).
-- Note that besides these files, for each stream, any recordings >= its -- Note that besides these files, for each stream, any recordings >= its
-- next_recording_id should be discarded on startup. -- cum_recordings should be discarded on startup.
create table garbage ( create table garbage (
-- This is _mostly_ redundant with composite_id, which contains the stream -- This is _mostly_ redundant with composite_id, which contains the stream
-- id and thus a linkage to the sample file directory. Listing it here -- id and thus a linkage to the sample file directory. Listing it here

View File

@ -141,6 +141,38 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
})?; })?;
} }
tx.execute_batch(r#" tx.execute_batch(r#"
alter table stream rename to old_stream;
create table stream (
id integer primary key,
camera_id integer not null references camera (id),
sample_file_dir_id integer references sample_file_dir (id),
type text not null check (type in ('main', 'sub')),
record integer not null check (record in (1, 0)),
rtsp_url text not null,
retain_bytes integer not null check (retain_bytes >= 0),
flush_if_sec integer not null,
cum_recordings integer not null check (cum_recordings >= 0),
cum_duration_90k integer not null check (cum_duration_90k >= 0),
cum_runs integer not null check (cum_runs >= 0),
unique (camera_id, type)
);
insert into stream
select
s.id,
s.camera_id,
s.sample_file_dir_id,
s.type,
s.record,
s.rtsp_url,
s.retain_bytes,
s.flush_if_sec,
s.next_recording_id as cum_recordings,
coalesce(sum(r.duration_90k), 0) as cum_duration_90k,
coalesce(sum(case when r.run_offset = 0 then 1 else 0 end), 0) as cum_runs
from
old_stream s left join recording r on (s.id = r.stream_id)
group by 1;
alter table recording rename to old_recording; alter table recording rename to old_recording;
create table recording ( create table recording (
composite_id integer primary key, composite_id integer primary key,
@ -150,6 +182,8 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
flags integer not null, flags integer not null,
sample_file_bytes integer not null check (sample_file_bytes > 0), sample_file_bytes integer not null check (sample_file_bytes > 0),
start_time_90k integer not null check (start_time_90k > 0), start_time_90k integer not null check (start_time_90k > 0),
prev_duration_90k integer not null check (prev_duration_90k >= 0),
prev_runs integer not null check (prev_runs >= 0),
duration_90k integer not null duration_90k integer not null
check (duration_90k >= 0 and duration_90k < 5*60*90000), check (duration_90k >= 0 and duration_90k < 5*60*90000),
video_samples integer not null check (video_samples > 0), video_samples integer not null check (video_samples > 0),
@ -157,7 +191,77 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
video_sample_entry_id integer references video_sample_entry (id), video_sample_entry_id integer references video_sample_entry (id),
check (composite_id >> 32 = stream_id) check (composite_id >> 32 = stream_id)
); );
insert into recording select * from old_recording; "#)?;
// SQLite added window functions in 3.25.0. macOS still ships SQLite 3.24.0 (no support).
// Compute cumulative columns by hand.
let mut cur_stream_id = None;
let mut cum_duration_90k = 0;
let mut cum_runs = 0;
let mut stmt = tx.prepare(r#"
select
composite_id,
open_id,
stream_id,
run_offset,
flags,
sample_file_bytes,
start_time_90k,
duration_90k,
video_samples,
video_sync_samples,
video_sample_entry_id
from
old_recording
order by composite_id
"#)?;
let mut insert = tx.prepare(r#"
insert into recording (composite_id, open_id, stream_id, run_offset, flags,
sample_file_bytes, start_time_90k, prev_duration_90k, prev_runs,
duration_90k, video_samples, video_sync_samples,
video_sample_entry_id)
values (:composite_id, :open_id, :stream_id, :run_offset, :flags,
:sample_file_bytes, :start_time_90k, :prev_duration_90k, :prev_runs,
:duration_90k, :video_samples, :video_sync_samples,
:video_sample_entry_id)
"#)?;
let mut rows = stmt.query(params![])?;
while let Some(row) = rows.next()? {
let composite_id: i64 = row.get(0)?;
let open_id: i32 = row.get(1)?;
let stream_id: i32 = row.get(2)?;
let run_offset: i32 = row.get(3)?;
let flags: i32 = row.get(4)?;
let sample_file_bytes: i32 = row.get(5)?;
let start_time_90k: i64 = row.get(6)?;
let duration_90k: i32 = row.get(7)?;
let video_samples: i32 = row.get(8)?;
let video_sync_samples: i32 = row.get(9)?;
let video_sample_entry_id: i32 = row.get(10)?;
if cur_stream_id != Some(stream_id) {
cum_duration_90k = 0;
cum_runs = 0;
cur_stream_id = Some(stream_id);
}
insert.execute_named(named_params!{
":composite_id": composite_id,
":open_id": open_id,
":stream_id": stream_id,
":run_offset": run_offset,
":flags": flags,
":sample_file_bytes": sample_file_bytes,
":start_time_90k": start_time_90k,
":prev_duration_90k": cum_duration_90k,
":prev_runs": cum_runs,
":duration_90k": duration_90k,
":video_samples": video_samples,
":video_sync_samples": video_sync_samples,
":video_sample_entry_id": video_sample_entry_id,
})?;
cum_duration_90k += duration_90k;
cum_runs += if run_offset == 0 { 1 } else { 0 };
}
tx.execute_batch(r#"
drop index recording_cover; drop index recording_cover;
create index recording_cover on recording ( create index recording_cover on recording (
stream_id, stream_id,
@ -172,7 +276,6 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
flags flags
); );
alter table recording_integrity rename to old_recording_integrity; alter table recording_integrity rename to old_recording_integrity;
create table recording_integrity ( create table recording_integrity (
composite_id integer primary key references recording (composite_id), composite_id integer primary key references recording (composite_id),
@ -201,6 +304,7 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
drop table old_recording_playback; drop table old_recording_playback;
drop table old_recording_integrity; drop table old_recording_integrity;
drop table old_recording; drop table old_recording;
drop table old_stream;
drop table old_video_sample_entry; drop table old_video_sample_entry;
update user_session update user_session

View File

@ -291,7 +291,7 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
.iter() .iter()
.filter_map(|(&k, v)| { .filter_map(|(&k, v)| {
if v.sample_file_dir_id == Some(dir_id) { if v.sample_file_dir_id == Some(dir_id) {
Some((k, v.next_recording_id)) Some((k, v.cum_recordings))
} else { } else {
None None
} }
@ -497,7 +497,7 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
} }
}; };
if s.next_recording_id <= f.recording.recording() { // not yet committed. if s.cum_recordings <= f.recording.recording() { // not yet committed.
break; break;
} }
@ -1008,7 +1008,7 @@ mod tests {
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
video_sample_entry_id); video_sample_entry_id);
let f = MockFile::new(); let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), h.dir.expect(MockDirAction::Create(CompositeId::new(1, 0),
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) }))); f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) })));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
@ -1025,13 +1025,13 @@ mod tests {
// Then a 1-byte recording. // Then a 1-byte recording.
let f = MockFile::new(); let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2), h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) }))); f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) })));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"4", recording::Time(3), 1, true).unwrap(); w.write(b"4", recording::Time(3), 1, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new({ h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 0), Box::new({
let db = h.db.clone(); let db = h.db.clone();
move |_| { move |_| {
// The drop(w) below should cause the old recording to be deleted (moved to // The drop(w) below should cause the old recording to be deleted (moved to
@ -1096,9 +1096,9 @@ mod tests {
}).unwrap(); }).unwrap();
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
video_sample_entry_id); video_sample_entry_id);
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), Box::new(|_id| Err(nix_eio())))); h.dir.expect(MockDirAction::Create(CompositeId::new(1, 0), Box::new(|_id| Err(nix_eio()))));
let f = MockFile::new(); let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), h.dir.expect(MockDirAction::Create(CompositeId::new(1, 0),
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
f.expect(MockFileAction::Write(Box::new(|buf| { f.expect(MockFileAction::Write(Box::new(|buf| {
assert_eq!(buf, b"1234"); assert_eq!(buf, b"1234");
@ -1167,7 +1167,7 @@ mod tests {
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
video_sample_entry_id); video_sample_entry_id);
let f = MockFile::new(); let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), h.dir.expect(MockDirAction::Create(CompositeId::new(1, 0),
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) }))); f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) })));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
@ -1185,13 +1185,13 @@ mod tests {
// Then a 1-byte recording. // Then a 1-byte recording.
let f = MockFile::new(); let f = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2), h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) }))); Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) }))); f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) })));
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
w.write(b"4", recording::Time(3), 1, true).unwrap(); w.write(b"4", recording::Time(3), 1, true).unwrap();
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new({ h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 0), Box::new({
let db = h.db.clone(); let db = h.db.clone();
move |_| { move |_| {
// The drop(w) below should cause the old recording to be deleted (moved to // The drop(w) below should cause the old recording to be deleted (moved to
@ -1208,7 +1208,7 @@ mod tests {
Err(nix_eio()) // force a retry. Err(nix_eio()) // force a retry.
} }
}))); })));
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new(|_| Ok(())))); h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 0), Box::new(|_| Ok(()))));
h.dir.expect(MockDirAction::Sync(Box::new(|| Err(nix_eio())))); h.dir.expect(MockDirAction::Sync(Box::new(|| Err(nix_eio()))));
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
@ -1264,7 +1264,7 @@ mod tests {
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
video_sample_entry_id); video_sample_entry_id);
let f1 = MockFile::new(); let f1 = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), h.dir.expect(MockDirAction::Create(CompositeId::new(1, 0),
Box::new({ let f = f1.clone(); move |_id| Ok(f.clone()) }))); Box::new({ let f = f1.clone(); move |_id| Ok(f.clone()) })));
f1.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) }))); f1.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) })));
f1.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); f1.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
@ -1285,7 +1285,7 @@ mod tests {
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID, let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
video_sample_entry_id); video_sample_entry_id);
let f2 = MockFile::new(); let f2 = MockFile::new();
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2), h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
Box::new({ let f = f2.clone(); move |_id| Ok(f.clone()) }))); Box::new({ let f = f2.clone(); move |_id| Ok(f.clone()) })));
f2.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) }))); f2.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) })));
f2.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); f2.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));

View File

@ -386,7 +386,7 @@ Precondition: database open read-write.
1. Remove all sample files (of all three categories described below: 1. Remove all sample files (of all three categories described below:
`recording` table rows, `garbage` table rows, and files with recording `recording` table rows, `garbage` table rows, and files with recording
ids >= their stream's `next_recording_id`); see "delete a recording" ids >= their stream's `cum_recordings`); see "delete a recording"
procedure below. procedure below.
2. Rewrite the directory metadata with `in_progress_open` set to the current open, 2. Rewrite the directory metadata with `in_progress_open` set to the current open,
`last_complete_open` cleared. `last_complete_open` cleared.
@ -403,7 +403,7 @@ three invariants about sample files:
2. Exactly one of the following statements is true for every sample file: 2. Exactly one of the following statements is true for every sample file:
* It has a `recording` table row. * It has a `recording` table row.
* It has a `garbage` table row. * It has a `garbage` table row.
* Its recording id is greater than or equal to the `next_recording_id` * Its recording id is greater than or equal to the `cum_recordings`
for its stream. for its stream.
3. After an orderly shutdown of Moonfire NVR, there is a `recording` table row 3. After an orderly shutdown of Moonfire NVR, there is a `recording` table row
for every sample file, even if there have been previous crashes. for every sample file, even if there have been previous crashes.
@ -441,11 +441,11 @@ These invariants are updated through the following procedure:
1. Acquire a lock to guarantee this is the only Moonfire NVR process running 1. Acquire a lock to guarantee this is the only Moonfire NVR process running
against the given database. This lock is not released until program shutdown. against the given database. This lock is not released until program shutdown.
2. Query `garbage` table and `next_recording_id` field in the `stream` table. 2. Query `garbage` table and `cum_recordings` field in the `stream` table.
3. `unlink()` all the sample files associated with garbage rows, ignoring 3. `unlink()` all the sample files associated with garbage rows, ignoring
`ENOENT`. `ENOENT`.
4. For each stream, `unlink()` all the existing files with recording ids >= 4. For each stream, `unlink()` all the existing files with recording ids >=
`next_recording_id`. `cum_recordings`.
4. `fsync()` the sample file directory. 4. `fsync()` the sample file directory.
5. Delete all rows from the `garbage` table. 5. Delete all rows from the `garbage` table.

View File

@ -254,5 +254,9 @@ Version 6 adds over version 5:
or Blake2b (for sessions). or Blake2b (for sessions).
* a preliminary schema for [object * a preliminary schema for [object
detection](https://en.wikipedia.org/wiki/Object_detection). detection](https://en.wikipedia.org/wiki/Object_detection).
* for each recording row, the cumulative total duration and "runs" recorded
before it on that stream. This is useful for MediaSourceExtension-based
web browser UIs when setting timestamps of video segments in the
SourceBuffer.
On upgrading to this version, sessions will be revoked. On upgrading to this version, sessions will be revoked.

View File

@ -1863,7 +1863,8 @@ mod tests {
let db = tdb.db.lock(); let db = tdb.db.lock();
db.list_recordings_by_time(TEST_STREAM_ID, all_time, &mut |r| { db.list_recordings_by_time(TEST_STREAM_ID, all_time, &mut |r| {
let d = r.duration_90k; let d = r.duration_90k;
assert!(skip_90k + shorten_90k < d); assert!(skip_90k + shorten_90k < d, "skip_90k={} shorten_90k={} r={:?}",
skip_90k, shorten_90k, r);
builder.append(&*db, r, skip_90k .. d - shorten_90k).unwrap(); builder.append(&*db, r, skip_90k .. d - shorten_90k).unwrap();
Ok(()) Ok(())
}).unwrap(); }).unwrap();
@ -2193,7 +2194,7 @@ mod tests {
assert_eq!("e95f2d261cdebac5b9983abeea59e8eb053dc4efac866722544c665d9de7c49d", assert_eq!("e95f2d261cdebac5b9983abeea59e8eb053dc4efac866722544c665d9de7c49d",
hash.to_hex().as_str()); hash.to_hex().as_str());
const EXPECTED_ETAG: &'static str = const EXPECTED_ETAG: &'static str =
"\"16d80691792326c314990b15f1f0387e1dd12119614fea3ecaeca88325f6000b\""; "\"61031ab36449b4d1186e9513b5e40df84e78bfb2807c0035b360437bb905cdd5\"";
assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag()); assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag());
drop(db.syncer_channel); drop(db.syncer_channel);
db.db.lock().clear_on_flush(); db.db.lock().clear_on_flush();
@ -2216,7 +2217,7 @@ mod tests {
assert_eq!("77e09be8ee5ca353ca56f9a80bb7420680713c80a0831d236fac45a96aa3b3d4", assert_eq!("77e09be8ee5ca353ca56f9a80bb7420680713c80a0831d236fac45a96aa3b3d4",
hash.to_hex().as_str()); hash.to_hex().as_str());
const EXPECTED_ETAG: &'static str = const EXPECTED_ETAG: &'static str =
"\"932883a0d7c5e464c9f1b1a62d36db670631eee7c1eefc74deb331c1f623affb\""; "\"8e048b22b21c9b93d889e8dfbeeb56fa1b17dc0956190f5c3acc84f6f674089f\"";
assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag()); assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag());
drop(db.syncer_channel); drop(db.syncer_channel);
db.db.lock().clear_on_flush(); db.db.lock().clear_on_flush();
@ -2239,7 +2240,7 @@ mod tests {
assert_eq!("f9807cfc6b96a399f3a5ad62d090f55a18543a9eeb1f48d59f86564ffd9b1e84", assert_eq!("f9807cfc6b96a399f3a5ad62d090f55a18543a9eeb1f48d59f86564ffd9b1e84",
hash.to_hex().as_str()); hash.to_hex().as_str());
const EXPECTED_ETAG: &'static str = const EXPECTED_ETAG: &'static str =
"\"53e9e33e28bafb6af8cee2f8b71d7751874a83a3aa7782396878b3caeacec526\""; "\"196192eccd8be2c840dfc4073355efe5c917999641e3d0a2b87e0d2eab40267f\"";
assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag()); assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag());
drop(db.syncer_channel); drop(db.syncer_channel);
db.db.lock().clear_on_flush(); db.db.lock().clear_on_flush();
@ -2262,7 +2263,7 @@ mod tests {
assert_eq!("5211104e1fdfe3bbc0d7d7d479933940305ff7f23201e97308db23a022ee6339", assert_eq!("5211104e1fdfe3bbc0d7d7d479933940305ff7f23201e97308db23a022ee6339",
hash.to_hex().as_str()); hash.to_hex().as_str());
const EXPECTED_ETAG: &'static str = const EXPECTED_ETAG: &'static str =
"\"f77e81297b5ca9d1c1dcf0d0f8eebbdea8d41b4c8af1917f9d3fe84de6e68a5b\""; "\"9e50099d86ae1c742e65f7a4151c4427f42051a87158405a35b4e5550fd05c30\"";
assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag()); assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag());
drop(db.syncer_channel); drop(db.syncer_channel);
db.db.lock().clear_on_flush(); db.db.lock().clear_on_flush();

View File

@ -382,7 +382,7 @@ mod tests {
// 3-second boundaries (such as 2016-04-26 00:00:03), rotation happens somewhat later: // 3-second boundaries (such as 2016-04-26 00:00:03), rotation happens somewhat later:
// * the first rotation is always skipped // * the first rotation is always skipped
// * the second rotation is deferred until a key frame. // * the second rotation is deferred until a key frame.
assert_eq!(get_frames(&db, CompositeId::new(testutil::TEST_STREAM_ID, 1)), &[ assert_eq!(get_frames(&db, CompositeId::new(testutil::TEST_STREAM_ID, 0)), &[
Frame{start_90k: 0, duration_90k: 90379, is_key: true}, Frame{start_90k: 0, duration_90k: 90379, is_key: true},
Frame{start_90k: 90379, duration_90k: 89884, is_key: false}, Frame{start_90k: 90379, duration_90k: 89884, is_key: false},
Frame{start_90k: 180263, duration_90k: 89749, is_key: false}, Frame{start_90k: 180263, duration_90k: 89749, is_key: false},
@ -392,20 +392,20 @@ mod tests {
Frame{start_90k: 540015, duration_90k: 90021, is_key: false}, // pts_time 6.0001... Frame{start_90k: 540015, duration_90k: 90021, is_key: false}, // pts_time 6.0001...
Frame{start_90k: 630036, duration_90k: 89958, is_key: false}, Frame{start_90k: 630036, duration_90k: 89958, is_key: false},
]); ]);
assert_eq!(get_frames(&db, CompositeId::new(testutil::TEST_STREAM_ID, 2)), &[ assert_eq!(get_frames(&db, CompositeId::new(testutil::TEST_STREAM_ID, 1)), &[
Frame{start_90k: 0, duration_90k: 90011, is_key: true}, Frame{start_90k: 0, duration_90k: 90011, is_key: true},
Frame{start_90k: 90011, duration_90k: 0, is_key: false}, Frame{start_90k: 90011, duration_90k: 0, is_key: false},
]); ]);
let mut recordings = Vec::new(); let mut recordings = Vec::new();
db.list_recordings_by_id(testutil::TEST_STREAM_ID, 1..3, &mut |r| { db.list_recordings_by_id(testutil::TEST_STREAM_ID, 0..2, &mut |r| {
recordings.push(r); recordings.push(r);
Ok(()) Ok(())
}).unwrap(); }).unwrap();
assert_eq!(2, recordings.len()); assert_eq!(2, recordings.len());
assert_eq!(1, recordings[0].id.recording()); assert_eq!(0, recordings[0].id.recording());
assert_eq!(recording::Time(128700575999999), recordings[0].start); assert_eq!(recording::Time(128700575999999), recordings[0].start);
assert_eq!(0, recordings[0].flags); assert_eq!(0, recordings[0].flags);
assert_eq!(2, recordings[1].id.recording()); assert_eq!(1, recordings[1].id.recording());
assert_eq!(recording::Time(128700576719993), recordings[1].start); assert_eq!(recording::Time(128700576719993), recordings[1].start);
assert_eq!(db::RecordingFlags::TrailingZero as i32, recordings[1].flags); assert_eq!(db::RecordingFlags::TrailingZero as i32, recordings[1].flags);
} }