mirror of
https://github.com/scottlamb/moonfire-nvr.git
synced 2025-01-12 15:33:22 -05:00
track cumulative duration and runs
This is useful for a combo scrub bar-based UI (#32) + live view UI (#59) in a non-obvious way. When constructing a HTML Media Source Extensions API SourceBuffer, the caller can specify a "mode" of either "segments" or "sequence": In "sequence" mode, playback assumes segments are added sequentially. This is good enough for a live view-only UI (#59) but not for a scrub bar UI in which you may want to seek backward to a segment you've never seen before. You will then need to insert a segment out-of-sequence. Imagine what happens when the user goes forward again until the end of the segment inserted immediately before it. The user should see the chronologically next segment or a pause for loading if it's unavailable. The best approximation of this is to track the mapping of timestamps to segments and insert a VTTCue with an enter/exit handler that seeks to the right position. But seeking isn't instantaneous; the user will likely briefly see first the segment they seeked to before. That's janky. Additionally, the "canplaythrough" event will behave strangely. In "segments" mode, playback respects the timestamps we set: * The obvious choice is to use wall clock timestamps. This is fine if they're known to be fixed and correct. They're not. The currently-recording segment may be "unanchored", meaning its start timestamp is not yet fixed. Older timestamps may overlap if the system clock was stepped between runs. The latter isn't /too/ bad from a user perspective, though it's confusing as a developer. We probably will only end up showing the more recent recording for a given timestamp anyway. But the former is quite annoying. It means we have to throw away part of the SourceBuffer that we may want to seek back (causing UI pauses when that happens) or keep our own spare copy of it (memory bloat). I'd like to avoid the whole mess. * Another approach is to use timestamps that are guaranteed to be in the correct order but that may have gaps. In particular, a timestamp of (recording_id * max_recording_duration) + time_within_recording. But again seeking isn't instantaneous. In my experiments, there's a visible pause between segments that drives me nuts. * Finally, the approach that led me to this schema change. Use timestamps that place each segment after the one before, possibly with an intentional gap between runs (to force a wait where we have an actual gap). This should make the browser's natural playback behavior work properly: it never goes to an incorrect place, and it only waits when/if we want it to. We have to maintain a mapping between its timestamps and segment ids but that's doable. This commit is only the schema change; the new data aren't exposed in the API yet, much less used by a UI. Note that stream.next_recording_id became stream.cum_recordings. I made a slight definition change in the process: recording ids for new streams start at 0 rather than 1. Various tests changed accordingly. The upgrade process makes a best effort to backfill these new fields, but of course it doesn't know the total duration or number of runs of previously deleted rows. That's good enough.
This commit is contained in:
parent
6187aa64cf
commit
f3ddbfe22a
127
db/db.rs
127
db/db.rs
@ -103,8 +103,13 @@ const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#"
|
||||
:rfc6381_codec, :data)
|
||||
"#;
|
||||
|
||||
const UPDATE_NEXT_RECORDING_ID_SQL: &'static str =
|
||||
"update stream set next_recording_id = :next_recording_id where id = :stream_id";
|
||||
const UPDATE_STREAM_COUNTERS_SQL: &'static str = r#"
|
||||
update stream
|
||||
set cum_recordings = :cum_recordings,
|
||||
cum_duration_90k = :cum_duration_90k,
|
||||
cum_runs = :cum_runs
|
||||
where id = :stream_id
|
||||
"#;
|
||||
|
||||
pub struct FromSqlUuid(pub Uuid);
|
||||
|
||||
@ -220,13 +225,20 @@ pub enum RecordingFlags {
|
||||
Uncommitted = 1 << 31,
|
||||
}
|
||||
|
||||
/// A recording to pass to `insert_recording`.
|
||||
/// A recording to pass to `LockedDatabase::add_recording` and `raw::insert_recording`.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct RecordingToInsert {
|
||||
pub run_offset: i32,
|
||||
pub flags: i32,
|
||||
pub sample_file_bytes: i32,
|
||||
pub start: recording::Time,
|
||||
|
||||
/// Filled in by `add_recording`.
|
||||
pub prev_duration: recording::Duration,
|
||||
|
||||
/// Filled in by `add_recording`.
|
||||
pub prev_runs: i32,
|
||||
|
||||
pub duration_90k: i32, // a recording::Duration, but guaranteed to fit in i32.
|
||||
pub local_time_delta: recording::Duration,
|
||||
pub video_samples: i32,
|
||||
@ -437,22 +449,28 @@ pub struct Stream {
|
||||
/// recordings.)
|
||||
pub bytes_to_add: i64,
|
||||
|
||||
/// The total duration of recorded data. This may not be `range.end - range.start` due to
|
||||
/// gaps and overlap.
|
||||
/// The total duration of undeleted recorded data. This may not be `range.end - range.start`
|
||||
/// due to gaps and overlap.
|
||||
pub duration: recording::Duration,
|
||||
|
||||
/// Mapping of calendar day (in the server's time zone) to a summary of recordings on that day.
|
||||
pub days: BTreeMap<StreamDayKey, StreamDayValue>,
|
||||
pub record: bool,
|
||||
|
||||
/// The `next_recording_id` currently committed to the database.
|
||||
pub(crate) next_recording_id: i32,
|
||||
/// The `cum_recordings` currently committed to the database.
|
||||
pub(crate) cum_recordings: i32,
|
||||
|
||||
/// The `cum_duration_90k` currently committed to the database.
|
||||
cum_duration: recording::Duration,
|
||||
|
||||
/// The `cum_runs` currently committed to the database.
|
||||
cum_runs: i32,
|
||||
|
||||
/// The recordings which have been added via `LockedDatabase::add_recording` but have yet to
|
||||
/// committed to the database.
|
||||
///
|
||||
/// `uncommitted[i]` uses sample filename `CompositeId::new(id, next_recording_id + 1)`;
|
||||
/// `next_recording_id` should be advanced when one is committed to maintain this invariant.
|
||||
/// `uncommitted[i]` uses sample filename `CompositeId::new(id, cum_recordings + i)`;
|
||||
/// `cum_recordings` should be advanced when one is committed to maintain this invariant.
|
||||
///
|
||||
/// TODO: alter the serving path to show these just as if they were already committed.
|
||||
uncommitted: VecDeque<Arc<Mutex<RecordingToInsert>>>,
|
||||
@ -734,9 +752,11 @@ impl StreamStateChanger {
|
||||
// Insert stream.
|
||||
let mut stmt = tx.prepare_cached(r#"
|
||||
insert into stream (camera_id, sample_file_dir_id, type, rtsp_url, record,
|
||||
retain_bytes, flush_if_sec, next_recording_id)
|
||||
retain_bytes, flush_if_sec, cum_recordings,
|
||||
cum_duration_90k, cum_runs)
|
||||
values (:camera_id, :sample_file_dir_id, :type, :rtsp_url, :record,
|
||||
0, :flush_if_sec, 1)
|
||||
0, :flush_if_sec, 0,
|
||||
0, 0)
|
||||
"#)?;
|
||||
stmt.execute_named(named_params!{
|
||||
":camera_id": camera_id,
|
||||
@ -781,7 +801,9 @@ impl StreamStateChanger {
|
||||
duration: recording::Duration(0),
|
||||
days: BTreeMap::new(),
|
||||
record: sc.record,
|
||||
next_recording_id: 1,
|
||||
cum_recordings: 0,
|
||||
cum_duration: recording::Duration(0),
|
||||
cum_runs: 0,
|
||||
uncommitted: VecDeque::new(),
|
||||
synced_recordings: 0,
|
||||
on_live_segment: Vec::new(),
|
||||
@ -820,18 +842,35 @@ impl LockedDatabase {
|
||||
pub fn flushes(&self) -> usize { self.flush_count }
|
||||
|
||||
/// Adds a placeholder for an uncommitted recording.
|
||||
///
|
||||
/// The caller should write samples and fill the returned `RecordingToInsert` as it goes
|
||||
/// (noting that while holding the lock, it should not perform I/O or acquire the database
|
||||
/// lock). Then it should sync to permanent storage and call `mark_synced`. The data will
|
||||
/// be written to the database on the next `flush`.
|
||||
pub(crate) fn add_recording(&mut self, stream_id: i32, r: RecordingToInsert)
|
||||
///
|
||||
/// A call to `add_recording` is also a promise that previous recordings (even if not yet
|
||||
/// synced and committed) won't change.
|
||||
///
|
||||
/// This fills the `prev_duration` and `prev_runs` fields.
|
||||
pub(crate) fn add_recording(&mut self, stream_id: i32, mut r: RecordingToInsert)
|
||||
-> Result<(CompositeId, Arc<Mutex<RecordingToInsert>>), Error> {
|
||||
let stream = match self.streams_by_id.get_mut(&stream_id) {
|
||||
None => bail!("no such stream {}", stream_id),
|
||||
Some(s) => s,
|
||||
};
|
||||
let id = CompositeId::new(stream_id,
|
||||
stream.next_recording_id + (stream.uncommitted.len() as i32));
|
||||
stream.cum_recordings + (stream.uncommitted.len() as i32));
|
||||
match stream.uncommitted.back() {
|
||||
Some(s) => {
|
||||
let l = s.lock();
|
||||
r.prev_duration = l.prev_duration + recording::Duration(l.duration_90k.into());
|
||||
r.prev_runs = l.prev_runs + if l.run_offset == 0 { 1 } else { 0 };
|
||||
},
|
||||
None => {
|
||||
r.prev_duration = stream.cum_duration;
|
||||
r.prev_runs = stream.cum_runs;
|
||||
},
|
||||
};
|
||||
let recording = Arc::new(Mutex::new(r));
|
||||
stream.uncommitted.push_back(Arc::clone(&recording));
|
||||
Ok((id, recording))
|
||||
@ -844,10 +883,10 @@ impl LockedDatabase {
|
||||
None => bail!("no stream for recording {}", id),
|
||||
Some(s) => s,
|
||||
};
|
||||
let next_unsynced = stream.next_recording_id + (stream.synced_recordings as i32);
|
||||
let next_unsynced = stream.cum_recordings + (stream.synced_recordings as i32);
|
||||
if id.recording() != next_unsynced {
|
||||
bail!("can't sync {} when next unsynced recording is {} (next unflushed is {})",
|
||||
id, next_unsynced, stream.next_recording_id);
|
||||
id, next_unsynced, stream.cum_recordings);
|
||||
}
|
||||
if stream.synced_recordings == stream.uncommitted.len() {
|
||||
bail!("can't sync un-added recording {}", id);
|
||||
@ -924,19 +963,25 @@ impl LockedDatabase {
|
||||
let mut new_ranges = FnvHashMap::with_capacity_and_hasher(self.streams_by_id.len(),
|
||||
Default::default());
|
||||
{
|
||||
let mut stmt = tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?;
|
||||
let mut stmt = tx.prepare_cached(UPDATE_STREAM_COUNTERS_SQL)?;
|
||||
for (&stream_id, s) in &self.streams_by_id {
|
||||
// Process additions.
|
||||
let mut new_duration = 0;
|
||||
let mut new_runs = 0;
|
||||
for i in 0..s.synced_recordings {
|
||||
let l = s.uncommitted[i].lock();
|
||||
raw::insert_recording(
|
||||
&tx, o, CompositeId::new(stream_id, s.next_recording_id + i as i32), &l)?;
|
||||
&tx, o, CompositeId::new(stream_id, s.cum_recordings + i as i32), &l)?;
|
||||
new_duration += i64::from(l.duration_90k);
|
||||
new_runs += if l.run_offset == 0 { 1 } else { 0 };
|
||||
}
|
||||
if s.synced_recordings > 0 {
|
||||
new_ranges.entry(stream_id).or_insert(None);
|
||||
stmt.execute_named(named_params!{
|
||||
":stream_id": stream_id,
|
||||
":next_recording_id": s.next_recording_id + s.synced_recordings as i32,
|
||||
":cum_recordings": s.cum_recordings + s.synced_recordings as i32,
|
||||
":cum_duration_90k": s.cum_duration.0 + new_duration,
|
||||
":cum_runs": s.cum_runs + new_runs,
|
||||
})?;
|
||||
}
|
||||
|
||||
@ -1026,10 +1071,14 @@ impl LockedDatabase {
|
||||
log.added.reserve(s.synced_recordings);
|
||||
for _ in 0..s.synced_recordings {
|
||||
let u = s.uncommitted.pop_front().unwrap();
|
||||
log.added.push(CompositeId::new(stream_id, s.next_recording_id));
|
||||
s.next_recording_id += 1;
|
||||
log.added.push(CompositeId::new(stream_id, s.cum_recordings));
|
||||
let l = u.lock();
|
||||
let end = l.start + recording::Duration(l.duration_90k as i64);
|
||||
s.cum_recordings += 1;
|
||||
let dur = recording::Duration(l.duration_90k.into());
|
||||
s.cum_duration += dur;
|
||||
s.cum_runs += if l.run_offset == 0 { 1 } else { 0 };
|
||||
let end = l.start + dur;
|
||||
info!("range={:?}", l.start .. end);
|
||||
s.add_recording(l.start .. end, l.sample_file_bytes);
|
||||
}
|
||||
s.synced_recordings = 0;
|
||||
@ -1177,7 +1226,7 @@ impl LockedDatabase {
|
||||
if l.start > desired_time.end || end < desired_time.start {
|
||||
continue; // there's no overlap with the requested range.
|
||||
}
|
||||
l.to_list_row(CompositeId::new(stream_id, s.next_recording_id + i as i32),
|
||||
l.to_list_row(CompositeId::new(stream_id, s.cum_recordings + i as i32),
|
||||
self.open.unwrap().id)
|
||||
} else {
|
||||
continue;
|
||||
@ -1196,18 +1245,18 @@ impl LockedDatabase {
|
||||
None => bail!("no such stream {}", stream_id),
|
||||
Some(s) => s,
|
||||
};
|
||||
if desired_ids.start < s.next_recording_id {
|
||||
if desired_ids.start < s.cum_recordings {
|
||||
raw::list_recordings_by_id(&self.conn, stream_id, desired_ids.clone(), f)?;
|
||||
}
|
||||
if desired_ids.end > s.next_recording_id {
|
||||
let start = cmp::max(0, desired_ids.start - s.next_recording_id) as usize;
|
||||
let end = cmp::min((desired_ids.end - s.next_recording_id) as usize,
|
||||
if desired_ids.end > s.cum_recordings {
|
||||
let start = cmp::max(0, desired_ids.start - s.cum_recordings) as usize;
|
||||
let end = cmp::min((desired_ids.end - s.cum_recordings) as usize,
|
||||
s.uncommitted.len());
|
||||
for i in start .. end {
|
||||
let row = {
|
||||
let l = s.uncommitted[i].lock();
|
||||
if l.video_samples > 0 {
|
||||
l.to_list_row(CompositeId::new(stream_id, s.next_recording_id + i as i32),
|
||||
l.to_list_row(CompositeId::new(stream_id, s.cum_recordings + i as i32),
|
||||
self.open.unwrap().id)
|
||||
} else {
|
||||
continue;
|
||||
@ -1300,11 +1349,11 @@ impl LockedDatabase {
|
||||
let s = self.streams_by_id
|
||||
.get(&id.stream())
|
||||
.ok_or_else(|| format_err!("no stream for {}", id))?;
|
||||
if s.next_recording_id <= id.recording() {
|
||||
let i = id.recording() - s.next_recording_id;
|
||||
if s.cum_recordings <= id.recording() {
|
||||
let i = id.recording() - s.cum_recordings;
|
||||
if i as usize >= s.uncommitted.len() {
|
||||
bail!("no such recording {}; latest committed is {}, latest is {}",
|
||||
id, s.next_recording_id, s.next_recording_id + s.uncommitted.len() as i32);
|
||||
id, s.cum_recordings, s.cum_recordings + s.uncommitted.len() as i32);
|
||||
}
|
||||
let l = s.uncommitted[i as usize].lock();
|
||||
return f(&RecordingPlayback { video_index: &l.video_index });
|
||||
@ -1474,7 +1523,9 @@ impl LockedDatabase {
|
||||
rtsp_url,
|
||||
retain_bytes,
|
||||
flush_if_sec,
|
||||
next_recording_id,
|
||||
cum_recordings,
|
||||
cum_duration_90k,
|
||||
cum_runs,
|
||||
record
|
||||
from
|
||||
stream;
|
||||
@ -1507,8 +1558,10 @@ impl LockedDatabase {
|
||||
bytes_to_add: 0,
|
||||
duration: recording::Duration(0),
|
||||
days: BTreeMap::new(),
|
||||
next_recording_id: row.get(7)?,
|
||||
record: row.get(8)?,
|
||||
cum_recordings: row.get(7)?,
|
||||
cum_duration: recording::Duration(row.get(8)?),
|
||||
cum_runs: row.get(9)?,
|
||||
record: row.get(10)?,
|
||||
uncommitted: VecDeque::new(),
|
||||
synced_recordings: 0,
|
||||
on_live_segment: Vec::new(),
|
||||
@ -2310,12 +2363,14 @@ mod tests {
|
||||
}
|
||||
let camera_uuid = { db.lock().cameras_by_id().get(&camera_id).unwrap().uuid };
|
||||
assert_no_recordings(&db, camera_uuid);
|
||||
assert_eq!(db.lock().streams_by_id().get(&main_stream_id).unwrap().cum_recordings, 0);
|
||||
|
||||
// Closing and reopening the database should present the same contents.
|
||||
let conn = db.close();
|
||||
let db = Database::new(clock::RealClocks {}, conn, true).unwrap();
|
||||
assert_eq!(db.lock().streams_by_id().get(&sub_stream_id).unwrap().flush_if_sec, 2);
|
||||
assert_no_recordings(&db, camera_uuid);
|
||||
assert_eq!(db.lock().streams_by_id().get(&main_stream_id).unwrap().cum_recordings, 0);
|
||||
|
||||
// TODO: assert_eq!(db.lock().list_garbage(sample_file_dir_id).unwrap(), &[]);
|
||||
|
||||
@ -2336,6 +2391,8 @@ mod tests {
|
||||
run_offset: 0,
|
||||
flags: 0,
|
||||
start,
|
||||
prev_duration: recording::Duration(0),
|
||||
prev_runs: 0,
|
||||
duration_90k: TIME_UNITS_PER_SEC as i32,
|
||||
local_time_delta: recording::Duration(0),
|
||||
video_samples: 1,
|
||||
@ -2351,7 +2408,7 @@ mod tests {
|
||||
db.flush("add test").unwrap();
|
||||
id
|
||||
};
|
||||
assert_eq!(db.lock().streams_by_id().get(&main_stream_id).unwrap().next_recording_id, 2);
|
||||
assert_eq!(db.lock().streams_by_id().get(&main_stream_id).unwrap().cum_recordings, 1);
|
||||
|
||||
// Queries should return the correct result (with caches update on insert).
|
||||
assert_single_recording(&db, main_stream_id, &recording);
|
||||
|
13
db/raw.rs
13
db/raw.rs
@ -174,14 +174,15 @@ pub(crate) fn get_db_uuid(conn: &rusqlite::Connection) -> Result<Uuid, Error> {
|
||||
|
||||
/// Inserts the specified recording (for from `try_flush` only).
|
||||
pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: CompositeId,
|
||||
r: &db::RecordingToInsert) -> Result<(), Error> {
|
||||
r: &db::RecordingToInsert) -> Result<(), Error> {
|
||||
let mut stmt = tx.prepare_cached(r#"
|
||||
insert into recording (composite_id, stream_id, open_id, run_offset, flags,
|
||||
sample_file_bytes, start_time_90k, duration_90k,
|
||||
video_samples, video_sync_samples, video_sample_entry_id)
|
||||
sample_file_bytes, start_time_90k, prev_duration_90k,
|
||||
prev_runs, duration_90k, video_samples, video_sync_samples,
|
||||
video_sample_entry_id)
|
||||
values (:composite_id, :stream_id, :open_id, :run_offset, :flags,
|
||||
:sample_file_bytes, :start_time_90k, :duration_90k,
|
||||
:video_samples, :video_sync_samples,
|
||||
:sample_file_bytes, :start_time_90k, :prev_duration_90k,
|
||||
:prev_runs, :duration_90k, :video_samples, :video_sync_samples,
|
||||
:video_sample_entry_id)
|
||||
"#).with_context(|e| format!("can't prepare recording insert: {}", e))?;
|
||||
stmt.execute_named(named_params!{
|
||||
@ -193,6 +194,8 @@ pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: Com
|
||||
":sample_file_bytes": r.sample_file_bytes,
|
||||
":start_time_90k": r.start.0,
|
||||
":duration_90k": r.duration_90k,
|
||||
":prev_duration_90k": r.prev_duration.0,
|
||||
":prev_runs": r.prev_runs,
|
||||
":video_samples": r.video_samples,
|
||||
":video_sync_samples": r.video_sync_samples,
|
||||
":video_sample_entry_id": r.video_sample_entry_id,
|
||||
|
@ -145,10 +145,16 @@ create table stream (
|
||||
-- then fails again, forever.
|
||||
flush_if_sec integer not null,
|
||||
|
||||
-- The low 32 bits of the next recording id to assign for this stream.
|
||||
-- Typically this is the maximum current recording + 1, but it does
|
||||
-- not decrease if that recording is deleted.
|
||||
next_recording_id integer not null check (next_recording_id >= 0),
|
||||
-- The total number of recordings ever created on this stream, including
|
||||
-- deleted ones. This is used for assigning the next recording id.
|
||||
cum_recordings integer not null check (cum_recordings >= 0),
|
||||
|
||||
-- The total duration of all recordings ever created on this stream.
|
||||
cum_duration_90k integer not null check (cum_duration_90k >= 0),
|
||||
|
||||
-- The total number of runs (recordings with run_offset = 0) ever created
|
||||
-- on this stream.
|
||||
cum_runs integer not null check (cum_runs >= 0),
|
||||
|
||||
unique (camera_id, type)
|
||||
);
|
||||
@ -158,12 +164,13 @@ create table stream (
|
||||
create table recording (
|
||||
-- The high 32 bits of composite_id are taken from the stream's id, which
|
||||
-- improves locality. The low 32 bits are taken from the stream's
|
||||
-- next_recording_id (which should be post-incremented in the same
|
||||
-- cum_recordings (which should be post-incremented in the same
|
||||
-- transaction). It'd be simpler to use a "without rowid" table and separate
|
||||
-- fields to make up the primary key, but
|
||||
-- <https://www.sqlite.org/withoutrowid.html> points out that "without rowid"
|
||||
-- is not appropriate when the average row size is in excess of 50 bytes.
|
||||
-- recording_cover rows (which match this id format) are typically 1--5 KiB.
|
||||
-- <https://www.sqlite.org/withoutrowid.html> points out that "without
|
||||
-- rowid" is not appropriate when the average row size is in excess of 50
|
||||
-- bytes. recording_cover rows (which match this id format) are typically
|
||||
-- 1--5 KiB.
|
||||
composite_id integer primary key,
|
||||
|
||||
-- The open in which this was committed to the database. For a given
|
||||
@ -172,13 +179,13 @@ create table recording (
|
||||
-- This field allows disambiguation in etags and such.
|
||||
open_id integer not null references open (id),
|
||||
|
||||
-- This field is redundant with id above, but used to enforce the reference
|
||||
-- constraint and to structure the recording_start_time index.
|
||||
-- This field is redundant with composite_id above, but used to enforce the
|
||||
-- reference constraint and to structure the recording_start_time index.
|
||||
stream_id integer not null references stream (id),
|
||||
|
||||
-- The offset of this recording within a run. 0 means this was the first
|
||||
-- recording made from a RTSP session. The start of the run has id
|
||||
-- (id-run_offset).
|
||||
-- recording made from a RTSP session. The start of the run has composite_id
|
||||
-- (composite_id-run_offset).
|
||||
run_offset integer not null,
|
||||
|
||||
-- flags is a bitmask:
|
||||
@ -193,9 +200,18 @@ create table recording (
|
||||
-- The starting time of the recording, in 90 kHz units since
|
||||
-- 1970-01-01 00:00:00 UTC excluding leap seconds. Currently on initial
|
||||
-- connection, this is taken from the local system time; on subsequent
|
||||
-- recordings, it exactly matches the previous recording's end time.
|
||||
-- recordings in a run, it exactly matches the previous recording's end
|
||||
-- time.
|
||||
start_time_90k integer not null check (start_time_90k > 0),
|
||||
|
||||
-- The total duration of all previous recordings on this stream. This is
|
||||
-- returned in API requests and may be helpful for timestamps in a HTML
|
||||
-- MediaSourceExtensions SourceBuffer.
|
||||
prev_duration_90k integer not null check (prev_duration_90k >= 0),
|
||||
|
||||
-- The total number of previous runs (rows in which run_offset = 0).
|
||||
prev_runs integer not null check (prev_runs >= 0),
|
||||
|
||||
-- The duration of the recording, in 90 kHz units.
|
||||
duration_90k integer not null
|
||||
check (duration_90k >= 0 and duration_90k < 5*60*90000),
|
||||
@ -280,7 +296,7 @@ create table recording_playback (
|
||||
|
||||
-- Files which are to be deleted (may or may not still exist).
|
||||
-- Note that besides these files, for each stream, any recordings >= its
|
||||
-- next_recording_id should be discarded on startup.
|
||||
-- cum_recordings should be discarded on startup.
|
||||
create table garbage (
|
||||
-- This is _mostly_ redundant with composite_id, which contains the stream
|
||||
-- id and thus a linkage to the sample file directory. Listing it here
|
||||
|
@ -141,6 +141,38 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
|
||||
})?;
|
||||
}
|
||||
tx.execute_batch(r#"
|
||||
alter table stream rename to old_stream;
|
||||
create table stream (
|
||||
id integer primary key,
|
||||
camera_id integer not null references camera (id),
|
||||
sample_file_dir_id integer references sample_file_dir (id),
|
||||
type text not null check (type in ('main', 'sub')),
|
||||
record integer not null check (record in (1, 0)),
|
||||
rtsp_url text not null,
|
||||
retain_bytes integer not null check (retain_bytes >= 0),
|
||||
flush_if_sec integer not null,
|
||||
cum_recordings integer not null check (cum_recordings >= 0),
|
||||
cum_duration_90k integer not null check (cum_duration_90k >= 0),
|
||||
cum_runs integer not null check (cum_runs >= 0),
|
||||
unique (camera_id, type)
|
||||
);
|
||||
insert into stream
|
||||
select
|
||||
s.id,
|
||||
s.camera_id,
|
||||
s.sample_file_dir_id,
|
||||
s.type,
|
||||
s.record,
|
||||
s.rtsp_url,
|
||||
s.retain_bytes,
|
||||
s.flush_if_sec,
|
||||
s.next_recording_id as cum_recordings,
|
||||
coalesce(sum(r.duration_90k), 0) as cum_duration_90k,
|
||||
coalesce(sum(case when r.run_offset = 0 then 1 else 0 end), 0) as cum_runs
|
||||
from
|
||||
old_stream s left join recording r on (s.id = r.stream_id)
|
||||
group by 1;
|
||||
|
||||
alter table recording rename to old_recording;
|
||||
create table recording (
|
||||
composite_id integer primary key,
|
||||
@ -150,6 +182,8 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
|
||||
flags integer not null,
|
||||
sample_file_bytes integer not null check (sample_file_bytes > 0),
|
||||
start_time_90k integer not null check (start_time_90k > 0),
|
||||
prev_duration_90k integer not null check (prev_duration_90k >= 0),
|
||||
prev_runs integer not null check (prev_runs >= 0),
|
||||
duration_90k integer not null
|
||||
check (duration_90k >= 0 and duration_90k < 5*60*90000),
|
||||
video_samples integer not null check (video_samples > 0),
|
||||
@ -157,7 +191,77 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
|
||||
video_sample_entry_id integer references video_sample_entry (id),
|
||||
check (composite_id >> 32 = stream_id)
|
||||
);
|
||||
insert into recording select * from old_recording;
|
||||
"#)?;
|
||||
|
||||
// SQLite added window functions in 3.25.0. macOS still ships SQLite 3.24.0 (no support).
|
||||
// Compute cumulative columns by hand.
|
||||
let mut cur_stream_id = None;
|
||||
let mut cum_duration_90k = 0;
|
||||
let mut cum_runs = 0;
|
||||
let mut stmt = tx.prepare(r#"
|
||||
select
|
||||
composite_id,
|
||||
open_id,
|
||||
stream_id,
|
||||
run_offset,
|
||||
flags,
|
||||
sample_file_bytes,
|
||||
start_time_90k,
|
||||
duration_90k,
|
||||
video_samples,
|
||||
video_sync_samples,
|
||||
video_sample_entry_id
|
||||
from
|
||||
old_recording
|
||||
order by composite_id
|
||||
"#)?;
|
||||
let mut insert = tx.prepare(r#"
|
||||
insert into recording (composite_id, open_id, stream_id, run_offset, flags,
|
||||
sample_file_bytes, start_time_90k, prev_duration_90k, prev_runs,
|
||||
duration_90k, video_samples, video_sync_samples,
|
||||
video_sample_entry_id)
|
||||
values (:composite_id, :open_id, :stream_id, :run_offset, :flags,
|
||||
:sample_file_bytes, :start_time_90k, :prev_duration_90k, :prev_runs,
|
||||
:duration_90k, :video_samples, :video_sync_samples,
|
||||
:video_sample_entry_id)
|
||||
"#)?;
|
||||
let mut rows = stmt.query(params![])?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let composite_id: i64 = row.get(0)?;
|
||||
let open_id: i32 = row.get(1)?;
|
||||
let stream_id: i32 = row.get(2)?;
|
||||
let run_offset: i32 = row.get(3)?;
|
||||
let flags: i32 = row.get(4)?;
|
||||
let sample_file_bytes: i32 = row.get(5)?;
|
||||
let start_time_90k: i64 = row.get(6)?;
|
||||
let duration_90k: i32 = row.get(7)?;
|
||||
let video_samples: i32 = row.get(8)?;
|
||||
let video_sync_samples: i32 = row.get(9)?;
|
||||
let video_sample_entry_id: i32 = row.get(10)?;
|
||||
if cur_stream_id != Some(stream_id) {
|
||||
cum_duration_90k = 0;
|
||||
cum_runs = 0;
|
||||
cur_stream_id = Some(stream_id);
|
||||
}
|
||||
insert.execute_named(named_params!{
|
||||
":composite_id": composite_id,
|
||||
":open_id": open_id,
|
||||
":stream_id": stream_id,
|
||||
":run_offset": run_offset,
|
||||
":flags": flags,
|
||||
":sample_file_bytes": sample_file_bytes,
|
||||
":start_time_90k": start_time_90k,
|
||||
":prev_duration_90k": cum_duration_90k,
|
||||
":prev_runs": cum_runs,
|
||||
":duration_90k": duration_90k,
|
||||
":video_samples": video_samples,
|
||||
":video_sync_samples": video_sync_samples,
|
||||
":video_sample_entry_id": video_sample_entry_id,
|
||||
})?;
|
||||
cum_duration_90k += duration_90k;
|
||||
cum_runs += if run_offset == 0 { 1 } else { 0 };
|
||||
}
|
||||
tx.execute_batch(r#"
|
||||
drop index recording_cover;
|
||||
create index recording_cover on recording (
|
||||
stream_id,
|
||||
@ -172,7 +276,6 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
|
||||
flags
|
||||
);
|
||||
|
||||
|
||||
alter table recording_integrity rename to old_recording_integrity;
|
||||
create table recording_integrity (
|
||||
composite_id integer primary key references recording (composite_id),
|
||||
@ -201,6 +304,7 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error>
|
||||
drop table old_recording_playback;
|
||||
drop table old_recording_integrity;
|
||||
drop table old_recording;
|
||||
drop table old_stream;
|
||||
drop table old_video_sample_entry;
|
||||
|
||||
update user_session
|
||||
|
26
db/writer.rs
26
db/writer.rs
@ -291,7 +291,7 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
|
||||
.iter()
|
||||
.filter_map(|(&k, v)| {
|
||||
if v.sample_file_dir_id == Some(dir_id) {
|
||||
Some((k, v.next_recording_id))
|
||||
Some((k, v.cum_recordings))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@ -497,7 +497,7 @@ impl<C: Clocks + Clone, D: DirWriter> Syncer<C, D> {
|
||||
}
|
||||
};
|
||||
|
||||
if s.next_recording_id <= f.recording.recording() { // not yet committed.
|
||||
if s.cum_recordings <= f.recording.recording() { // not yet committed.
|
||||
break;
|
||||
}
|
||||
|
||||
@ -1008,7 +1008,7 @@ mod tests {
|
||||
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
|
||||
video_sample_entry_id);
|
||||
let f = MockFile::new();
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 0),
|
||||
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
|
||||
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) })));
|
||||
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
||||
@ -1025,13 +1025,13 @@ mod tests {
|
||||
|
||||
// Then a 1-byte recording.
|
||||
let f = MockFile::new();
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2),
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
|
||||
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
|
||||
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) })));
|
||||
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
||||
w.write(b"4", recording::Time(3), 1, true).unwrap();
|
||||
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
|
||||
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new({
|
||||
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 0), Box::new({
|
||||
let db = h.db.clone();
|
||||
move |_| {
|
||||
// The drop(w) below should cause the old recording to be deleted (moved to
|
||||
@ -1096,9 +1096,9 @@ mod tests {
|
||||
}).unwrap();
|
||||
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
|
||||
video_sample_entry_id);
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1), Box::new(|_id| Err(nix_eio()))));
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 0), Box::new(|_id| Err(nix_eio()))));
|
||||
let f = MockFile::new();
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 0),
|
||||
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
|
||||
f.expect(MockFileAction::Write(Box::new(|buf| {
|
||||
assert_eq!(buf, b"1234");
|
||||
@ -1167,7 +1167,7 @@ mod tests {
|
||||
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
|
||||
video_sample_entry_id);
|
||||
let f = MockFile::new();
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 0),
|
||||
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
|
||||
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) })));
|
||||
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
||||
@ -1185,13 +1185,13 @@ mod tests {
|
||||
|
||||
// Then a 1-byte recording.
|
||||
let f = MockFile::new();
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2),
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
|
||||
Box::new({ let f = f.clone(); move |_id| Ok(f.clone()) })));
|
||||
f.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) })));
|
||||
f.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
||||
w.write(b"4", recording::Time(3), 1, true).unwrap();
|
||||
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
|
||||
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new({
|
||||
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 0), Box::new({
|
||||
let db = h.db.clone();
|
||||
move |_| {
|
||||
// The drop(w) below should cause the old recording to be deleted (moved to
|
||||
@ -1208,7 +1208,7 @@ mod tests {
|
||||
Err(nix_eio()) // force a retry.
|
||||
}
|
||||
})));
|
||||
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 1), Box::new(|_| Ok(()))));
|
||||
h.dir.expect(MockDirAction::Unlink(CompositeId::new(1, 0), Box::new(|_| Ok(()))));
|
||||
h.dir.expect(MockDirAction::Sync(Box::new(|| Err(nix_eio()))));
|
||||
h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(()))));
|
||||
|
||||
@ -1264,7 +1264,7 @@ mod tests {
|
||||
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
|
||||
video_sample_entry_id);
|
||||
let f1 = MockFile::new();
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 0),
|
||||
Box::new({ let f = f1.clone(); move |_id| Ok(f.clone()) })));
|
||||
f1.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"123"); Ok(3) })));
|
||||
f1.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
||||
@ -1285,7 +1285,7 @@ mod tests {
|
||||
let mut w = Writer::new(&h.dir, &h.db, &h.channel, testutil::TEST_STREAM_ID,
|
||||
video_sample_entry_id);
|
||||
let f2 = MockFile::new();
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 2),
|
||||
h.dir.expect(MockDirAction::Create(CompositeId::new(1, 1),
|
||||
Box::new({ let f = f2.clone(); move |_id| Ok(f.clone()) })));
|
||||
f2.expect(MockFileAction::Write(Box::new(|buf| { assert_eq!(buf, b"4"); Ok(1) })));
|
||||
f2.expect(MockFileAction::SyncAll(Box::new(|| Ok(()))));
|
||||
|
@ -386,7 +386,7 @@ Precondition: database open read-write.
|
||||
|
||||
1. Remove all sample files (of all three categories described below:
|
||||
`recording` table rows, `garbage` table rows, and files with recording
|
||||
ids >= their stream's `next_recording_id`); see "delete a recording"
|
||||
ids >= their stream's `cum_recordings`); see "delete a recording"
|
||||
procedure below.
|
||||
2. Rewrite the directory metadata with `in_progress_open` set to the current open,
|
||||
`last_complete_open` cleared.
|
||||
@ -403,7 +403,7 @@ three invariants about sample files:
|
||||
2. Exactly one of the following statements is true for every sample file:
|
||||
* It has a `recording` table row.
|
||||
* It has a `garbage` table row.
|
||||
* Its recording id is greater than or equal to the `next_recording_id`
|
||||
* Its recording id is greater than or equal to the `cum_recordings`
|
||||
for its stream.
|
||||
3. After an orderly shutdown of Moonfire NVR, there is a `recording` table row
|
||||
for every sample file, even if there have been previous crashes.
|
||||
@ -441,11 +441,11 @@ These invariants are updated through the following procedure:
|
||||
|
||||
1. Acquire a lock to guarantee this is the only Moonfire NVR process running
|
||||
against the given database. This lock is not released until program shutdown.
|
||||
2. Query `garbage` table and `next_recording_id` field in the `stream` table.
|
||||
2. Query `garbage` table and `cum_recordings` field in the `stream` table.
|
||||
3. `unlink()` all the sample files associated with garbage rows, ignoring
|
||||
`ENOENT`.
|
||||
4. For each stream, `unlink()` all the existing files with recording ids >=
|
||||
`next_recording_id`.
|
||||
`cum_recordings`.
|
||||
4. `fsync()` the sample file directory.
|
||||
5. Delete all rows from the `garbage` table.
|
||||
|
||||
|
@ -254,5 +254,9 @@ Version 6 adds over version 5:
|
||||
or Blake2b (for sessions).
|
||||
* a preliminary schema for [object
|
||||
detection](https://en.wikipedia.org/wiki/Object_detection).
|
||||
* for each recording row, the cumulative total duration and "runs" recorded
|
||||
before it on that stream. This is useful for MediaSourceExtension-based
|
||||
web browser UIs when setting timestamps of video segments in the
|
||||
SourceBuffer.
|
||||
|
||||
On upgrading to this version, sessions will be revoked.
|
||||
|
11
src/mp4.rs
11
src/mp4.rs
@ -1863,7 +1863,8 @@ mod tests {
|
||||
let db = tdb.db.lock();
|
||||
db.list_recordings_by_time(TEST_STREAM_ID, all_time, &mut |r| {
|
||||
let d = r.duration_90k;
|
||||
assert!(skip_90k + shorten_90k < d);
|
||||
assert!(skip_90k + shorten_90k < d, "skip_90k={} shorten_90k={} r={:?}",
|
||||
skip_90k, shorten_90k, r);
|
||||
builder.append(&*db, r, skip_90k .. d - shorten_90k).unwrap();
|
||||
Ok(())
|
||||
}).unwrap();
|
||||
@ -2193,7 +2194,7 @@ mod tests {
|
||||
assert_eq!("e95f2d261cdebac5b9983abeea59e8eb053dc4efac866722544c665d9de7c49d",
|
||||
hash.to_hex().as_str());
|
||||
const EXPECTED_ETAG: &'static str =
|
||||
"\"16d80691792326c314990b15f1f0387e1dd12119614fea3ecaeca88325f6000b\"";
|
||||
"\"61031ab36449b4d1186e9513b5e40df84e78bfb2807c0035b360437bb905cdd5\"";
|
||||
assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag());
|
||||
drop(db.syncer_channel);
|
||||
db.db.lock().clear_on_flush();
|
||||
@ -2216,7 +2217,7 @@ mod tests {
|
||||
assert_eq!("77e09be8ee5ca353ca56f9a80bb7420680713c80a0831d236fac45a96aa3b3d4",
|
||||
hash.to_hex().as_str());
|
||||
const EXPECTED_ETAG: &'static str =
|
||||
"\"932883a0d7c5e464c9f1b1a62d36db670631eee7c1eefc74deb331c1f623affb\"";
|
||||
"\"8e048b22b21c9b93d889e8dfbeeb56fa1b17dc0956190f5c3acc84f6f674089f\"";
|
||||
assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag());
|
||||
drop(db.syncer_channel);
|
||||
db.db.lock().clear_on_flush();
|
||||
@ -2239,7 +2240,7 @@ mod tests {
|
||||
assert_eq!("f9807cfc6b96a399f3a5ad62d090f55a18543a9eeb1f48d59f86564ffd9b1e84",
|
||||
hash.to_hex().as_str());
|
||||
const EXPECTED_ETAG: &'static str =
|
||||
"\"53e9e33e28bafb6af8cee2f8b71d7751874a83a3aa7782396878b3caeacec526\"";
|
||||
"\"196192eccd8be2c840dfc4073355efe5c917999641e3d0a2b87e0d2eab40267f\"";
|
||||
assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag());
|
||||
drop(db.syncer_channel);
|
||||
db.db.lock().clear_on_flush();
|
||||
@ -2262,7 +2263,7 @@ mod tests {
|
||||
assert_eq!("5211104e1fdfe3bbc0d7d7d479933940305ff7f23201e97308db23a022ee6339",
|
||||
hash.to_hex().as_str());
|
||||
const EXPECTED_ETAG: &'static str =
|
||||
"\"f77e81297b5ca9d1c1dcf0d0f8eebbdea8d41b4c8af1917f9d3fe84de6e68a5b\"";
|
||||
"\"9e50099d86ae1c742e65f7a4151c4427f42051a87158405a35b4e5550fd05c30\"";
|
||||
assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag());
|
||||
drop(db.syncer_channel);
|
||||
db.db.lock().clear_on_flush();
|
||||
|
@ -382,7 +382,7 @@ mod tests {
|
||||
// 3-second boundaries (such as 2016-04-26 00:00:03), rotation happens somewhat later:
|
||||
// * the first rotation is always skipped
|
||||
// * the second rotation is deferred until a key frame.
|
||||
assert_eq!(get_frames(&db, CompositeId::new(testutil::TEST_STREAM_ID, 1)), &[
|
||||
assert_eq!(get_frames(&db, CompositeId::new(testutil::TEST_STREAM_ID, 0)), &[
|
||||
Frame{start_90k: 0, duration_90k: 90379, is_key: true},
|
||||
Frame{start_90k: 90379, duration_90k: 89884, is_key: false},
|
||||
Frame{start_90k: 180263, duration_90k: 89749, is_key: false},
|
||||
@ -392,20 +392,20 @@ mod tests {
|
||||
Frame{start_90k: 540015, duration_90k: 90021, is_key: false}, // pts_time 6.0001...
|
||||
Frame{start_90k: 630036, duration_90k: 89958, is_key: false},
|
||||
]);
|
||||
assert_eq!(get_frames(&db, CompositeId::new(testutil::TEST_STREAM_ID, 2)), &[
|
||||
assert_eq!(get_frames(&db, CompositeId::new(testutil::TEST_STREAM_ID, 1)), &[
|
||||
Frame{start_90k: 0, duration_90k: 90011, is_key: true},
|
||||
Frame{start_90k: 90011, duration_90k: 0, is_key: false},
|
||||
]);
|
||||
let mut recordings = Vec::new();
|
||||
db.list_recordings_by_id(testutil::TEST_STREAM_ID, 1..3, &mut |r| {
|
||||
db.list_recordings_by_id(testutil::TEST_STREAM_ID, 0..2, &mut |r| {
|
||||
recordings.push(r);
|
||||
Ok(())
|
||||
}).unwrap();
|
||||
assert_eq!(2, recordings.len());
|
||||
assert_eq!(1, recordings[0].id.recording());
|
||||
assert_eq!(0, recordings[0].id.recording());
|
||||
assert_eq!(recording::Time(128700575999999), recordings[0].start);
|
||||
assert_eq!(0, recordings[0].flags);
|
||||
assert_eq!(2, recordings[1].id.recording());
|
||||
assert_eq!(1, recordings[1].id.recording());
|
||||
assert_eq!(recording::Time(128700576719993), recordings[1].start);
|
||||
assert_eq!(db::RecordingFlags::TrailingZero as i32, recordings[1].flags);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user