diff --git a/db/db.rs b/db/db.rs index 15d12e4..197d2c5 100644 --- a/db/db.rs +++ b/db/db.rs @@ -161,6 +161,7 @@ pub struct ListAggregatedRecordingsRow { pub run_start_id: i32, pub open_id: u32, pub first_uncommitted: Option, + pub growing: bool, } /// Select fields from the `recordings_playback` table. Retrieve with `with_recording_playback`. @@ -174,16 +175,18 @@ pub enum RecordingFlags { TrailingZero = 1, // These values (starting from high bit on down) are never written to the database. - Uncommitted = 2147483648, + Growing = 1 << 30, + Uncommitted = 1 << 31, } /// A recording to pass to `insert_recording`. -#[derive(Clone, Debug)] -pub(crate) struct RecordingToInsert { +#[derive(Clone, Debug, Default)] +pub struct RecordingToInsert { pub run_offset: i32, pub flags: i32, pub sample_file_bytes: i32, - pub time: Range, + pub start: recording::Time, + pub duration_90k: i32, // a recording::Duration, but guaranteed to fit in i32. pub local_time_delta: recording::Duration, pub video_samples: i32, pub video_sync_samples: i32, @@ -195,10 +198,10 @@ pub(crate) struct RecordingToInsert { impl RecordingToInsert { fn to_list_row(&self, id: CompositeId, open_id: u32) -> ListRecordingsRow { ListRecordingsRow { - start: self.time.start, + start: self.start, video_sample_entry_id: self.video_sample_entry_id, id, - duration_90k: (self.time.end - self.time.start).0 as i32, + duration_90k: self.duration_90k, video_samples: self.video_samples, video_sync_samples: self.video_sync_samples, sample_file_bytes: self.sample_file_bytes, @@ -399,7 +402,7 @@ pub struct Stream { /// `next_recording_id` should be advanced when one is committed to maintain this invariant. /// /// TODO: alter the serving path to show these just as if they were already committed. - uncommitted: VecDeque>>, + uncommitted: VecDeque>>, /// The number of recordings in `uncommitted` which are synced and ready to commit. synced_recordings: usize, @@ -410,24 +413,14 @@ impl Stream { /// Note recordings must be flushed in order, so a recording is considered unsynced if any /// before it are unsynced. pub(crate) fn unflushed(&self) -> recording::Duration { - let mut dur = recording::Duration(0); - for i in 0 .. self.synced_recordings { - let l = self.uncommitted[i].lock(); - if let Some(ref r) = l.recording { - dur += r.time.end - r.time.start; - } + let mut sum = 0; + for i in 0..self.synced_recordings { + sum += self.uncommitted[i].lock().duration_90k as i64; } - dur + recording::Duration(sum) } } -#[derive(Debug)] -pub(crate) struct UncommittedRecording { - /// The recording to add. Absent if not yet ready. - /// TODO: modify `SampleIndexEncoder` to update this record as it goes. - pub(crate) recording: Option, -} - #[derive(Clone, Debug, Default)] pub struct StreamChange { pub sample_file_dir_id: Option, @@ -764,21 +757,19 @@ impl LockedDatabase { } /// Adds a placeholder for an uncommitted recording. - /// The caller should write and sync the file and populate the returned `UncommittedRecording` - /// (noting that the database lock must not be acquired while holding the - /// `UncommittedRecording`'s lock) then call `mark_synced`. The data will be written to the - /// database on the next `flush`. - pub(crate) fn add_recording(&mut self, stream_id: i32) - -> Result<(CompositeId, Arc>), Error> { + /// The caller should write samples and fill the returned `RecordingToInsert` as it goes + /// (noting that while holding the lock, it should not perform I/O or acquire the database + /// lock). Then it should sync to permanent storage and call `mark_synced`. The data will + /// be written to the database on the next `flush`. + pub(crate) fn add_recording(&mut self, stream_id: i32, r: RecordingToInsert) + -> Result<(CompositeId, Arc>), Error> { let stream = match self.streams_by_id.get_mut(&stream_id) { None => bail!("no such stream {}", stream_id), Some(s) => s, }; let id = CompositeId::new(stream_id, stream.next_recording_id + (stream.uncommitted.len() as i32)); - let recording = Arc::new(Mutex::new(UncommittedRecording { - recording: None, - })); + let recording = Arc::new(Mutex::new(r)); stream.uncommitted.push_back(Arc::clone(&recording)); Ok((id, recording)) } @@ -799,11 +790,7 @@ impl LockedDatabase { bail!("can't sync un-added recording {}", id); } let l = stream.uncommitted[stream.synced_recordings].lock(); - let r = match l.recording.as_ref() { - None => bail!("can't sync unfinished recording {}", id), - Some(r) => r, - }; - stream.bytes_to_add += r.sample_file_bytes as i64; + stream.bytes_to_add += l.sample_file_bytes as i64; stream.synced_recordings += 1; Ok(()) } @@ -841,9 +828,8 @@ impl LockedDatabase { // Process additions. for i in 0..s.synced_recordings { let l = s.uncommitted[i].lock(); - let r = l.recording.as_ref().unwrap(); raw::insert_recording( - &tx, o, CompositeId::new(stream_id, s.next_recording_id + i as i32), &r)?; + &tx, o, CompositeId::new(stream_id, s.next_recording_id + i as i32), &l)?; } if s.synced_recordings > 0 { new_ranges.entry(stream_id).or_insert(None); @@ -911,9 +897,8 @@ impl LockedDatabase { for _ in 0..s.synced_recordings { let u = s.uncommitted.pop_front().unwrap(); let l = u.lock(); - if let Some(ref r) = l.recording { - s.add_recording(r.time.clone(), r.sample_file_bytes); - } + let end = l.start + recording::Duration(l.duration_90k as i64); + s.add_recording(l.start .. end, l.sample_file_bytes); } s.synced_recordings = 0; @@ -1036,14 +1021,15 @@ impl LockedDatabase { Some(s) => s, }; raw::list_recordings_by_time(&self.conn, stream_id, desired_time.clone(), f)?; - for i in 0 .. s.synced_recordings { + for (i, u) in s.uncommitted.iter().enumerate() { let row = { - let l = s.uncommitted[i].lock(); - if let Some(ref r) = l.recording { - if r.time.start > desired_time.end || r.time.end < r.time.start { + let l = u.lock(); + if l.video_samples > 0 { + let end = l.start + recording::Duration(l.duration_90k as i64); + if l.start > desired_time.end || end < desired_time.start { continue; // there's no overlap with the requested range. } - r.to_list_row(CompositeId::new(stream_id, s.next_recording_id + i as i32), + l.to_list_row(CompositeId::new(stream_id, s.next_recording_id + i as i32), self.open.unwrap().id) } else { continue; @@ -1066,12 +1052,14 @@ impl LockedDatabase { raw::list_recordings_by_id(&self.conn, stream_id, desired_ids.clone(), f)?; } if desired_ids.end > s.next_recording_id { - let start = cmp::min(0, desired_ids.start - s.next_recording_id); - for i in start .. desired_ids.end - s.next_recording_id { + let start = cmp::max(0, desired_ids.start - s.next_recording_id) as usize; + let end = cmp::min((desired_ids.end - s.next_recording_id) as usize, + s.uncommitted.len()); + for i in start .. end { let row = { - let l = s.uncommitted[i as usize].lock(); - if let Some(ref r) = l.recording { - r.to_list_row(CompositeId::new(stream_id, s.next_recording_id + i as i32), + let l = s.uncommitted[i].lock(); + if l.video_samples > 0 { + l.to_list_row(CompositeId::new(stream_id, s.next_recording_id + i as i32), self.open.unwrap().id) } else { continue; @@ -1122,25 +1110,31 @@ impl LockedDatabase { f(&a)?; } let uncommitted = (row.flags & RecordingFlags::Uncommitted as i32) != 0; - let need_insert = if let Some(ref mut a) = aggs.get_mut(&run_start_id) { - if a.time.end != row.start { - bail!("stream {} recording {} ends at {}; {} starts at {}; expected same", - stream_id, a.ids.end - 1, a.time.end, row.id, row.start); - } - a.time.end.0 += row.duration_90k as i64; - a.ids.end = recording_id + 1; - a.video_samples += row.video_samples as i64; - a.video_sync_samples += row.video_sync_samples as i64; - a.sample_file_bytes += row.sample_file_bytes as i64; - if uncommitted { - a.first_uncommitted = a.first_uncommitted.or(Some(recording_id)); - } - false - } else { - true - }; - if need_insert { - aggs.insert(run_start_id, ListAggregatedRecordingsRow{ + let growing = (row.flags & RecordingFlags::Growing as i32) != 0; + use std::collections::btree_map::Entry; + match aggs.entry(run_start_id) { + Entry::Occupied(mut e) => { + let a = e.get_mut(); + if a.time.end != row.start { + bail!("stream {} recording {} ends at {}; {} starts at {}; expected same", + stream_id, a.ids.end - 1, a.time.end, row.id, row.start); + } + if a.open_id != row.open_id { + bail!("stream {} recording {} has open id {}; {} has {}; expected same", + stream_id, a.ids.end - 1, a.open_id, row.id, row.open_id); + } + a.time.end.0 += row.duration_90k as i64; + a.ids.end = recording_id + 1; + a.video_samples += row.video_samples as i64; + a.video_sync_samples += row.video_sync_samples as i64; + a.sample_file_bytes += row.sample_file_bytes as i64; + if uncommitted { + a.first_uncommitted = a.first_uncommitted.or(Some(recording_id)); + } + a.growing = growing; + }, + Entry::Vacant(e) => { + e.insert(ListAggregatedRecordingsRow { time: row.start .. recording::Time(row.start.0 + row.duration_90k as i64), ids: recording_id .. recording_id+1, video_samples: row.video_samples as i64, @@ -1151,7 +1145,9 @@ impl LockedDatabase { run_start_id, open_id: row.open_id, first_uncommitted: if uncommitted { Some(recording_id) } else { None }, - }); + growing, + }); + }, }; Ok(()) })?; @@ -1177,11 +1173,7 @@ impl LockedDatabase { id, s.next_recording_id, s.next_recording_id + s.uncommitted.len() as i32); } let l = s.uncommitted[i as usize].lock(); - if let Some(ref r) = l.recording { - return f(&RecordingPlayback { video_index: &r.video_index }); - } else { - bail!("recording {} is not ready", id); - } + return f(&RecordingPlayback { video_index: &l.video_index }); } // Committed path. @@ -1861,9 +1853,10 @@ mod tests { { let db = db.lock(); let stream = db.streams_by_id().get(&stream_id).unwrap(); - assert_eq!(Some(r.time.clone()), stream.range); + let dur = recording::Duration(r.duration_90k as i64); + assert_eq!(Some(r.start .. r.start + dur), stream.range); assert_eq!(r.sample_file_bytes as i64, stream.sample_file_bytes); - assert_eq!(r.time.end - r.time.start, stream.duration); + assert_eq!(dur, stream.duration); db.cameras_by_id().get(&stream.camera_id).unwrap(); } @@ -1877,8 +1870,8 @@ mod tests { db.list_recordings_by_time(stream_id, all_time, &mut |row| { rows += 1; recording_id = Some(row.id); - assert_eq!(r.time, - row.start .. row.start + recording::Duration(row.duration_90k as i64)); + assert_eq!(r.start, row.start); + assert_eq!(r.duration_90k, row.duration_90k); assert_eq!(r.video_samples, row.video_samples); assert_eq!(r.video_sync_samples, row.video_sync_samples); assert_eq!(r.sample_file_bytes, row.sample_file_bytes); @@ -1893,8 +1886,8 @@ mod tests { raw::list_oldest_recordings(&db.lock().conn, CompositeId::new(stream_id, 0), &mut |row| { rows += 1; assert_eq!(recording_id, Some(row.id)); - assert_eq!(r.time.start, row.start); - assert_eq!(r.time.end - r.time.start, recording::Duration(row.duration as i64)); + assert_eq!(r.start, row.start); + assert_eq!(r.duration_90k, row.duration); assert_eq!(r.sample_file_bytes, row.sample_file_bytes); true }).unwrap(); @@ -2084,7 +2077,8 @@ mod tests { sample_file_bytes: 42, run_offset: 0, flags: 0, - time: start .. start + recording::Duration(TIME_UNITS_PER_SEC), + start, + duration_90k: TIME_UNITS_PER_SEC as i32, local_time_delta: recording::Duration(0), video_samples: 1, video_sync_samples: 1, @@ -2094,8 +2088,7 @@ mod tests { }; let id = { let mut db = db.lock(); - let (id, u) = db.add_recording(main_stream_id).unwrap(); - u.lock().recording = Some(recording.clone()); + let (id, _) = db.add_recording(main_stream_id, recording.clone()).unwrap(); db.mark_synced(id).unwrap(); db.flush("add test").unwrap(); id diff --git a/db/dir.rs b/db/dir.rs index 17bdaf8..50b6fbc 100644 --- a/db/dir.rs +++ b/db/dir.rs @@ -646,14 +646,11 @@ enum WriterState { /// with at least one sample. The sample may have zero duration. struct InnerWriter { f: fs::File, - r: Arc>, - index: recording::SampleIndexEncoder, + r: Arc>, + e: recording::SampleIndexEncoder, id: CompositeId, hasher: hash::Hasher, - /// The end time of the previous segment in this run, if any. - prev_end: Option, - /// The start time of this segment, based solely on examining the local clock after frames in /// this segment were received. Frames can suffer from various kinds of delay (initial /// buffering, encoding, and network transmission), so this time is set to far in the future on @@ -663,8 +660,6 @@ struct InnerWriter { adjuster: ClockAdjuster, - run_offset: i32, - /// A sample which has been written to disk but not added to `index`. Index writes are one /// sample behind disk writes because the duration of a sample is the difference between its /// pts and the next sample's pts. A sample is flushed when the next sample is written, when @@ -735,7 +730,7 @@ struct UnflushedSample { /// State associated with a run's previous recording; used within `Writer`. #[derive(Copy, Clone)] struct PreviousWriter { - end_time: recording::Time, + end: recording::Time, local_time_delta: recording::Duration, run_offset: i32, } @@ -762,7 +757,13 @@ impl<'a> Writer<'a> { WriterState::Open(ref mut w) => return Ok(w), WriterState::Closed(prev) => Some(prev), }; - let (id, r) = self.db.lock().add_recording(self.stream_id)?; + let (id, r) = self.db.lock().add_recording(self.stream_id, db::RecordingToInsert { + run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0), + start: prev.map(|p| p.end).unwrap_or(recording::Time(i64::max_value())), + video_sample_entry_id: self.video_sample_entry_id, + flags: db::RecordingFlags::Growing as i32, + ..Default::default() + })?; let p = SampleFileDir::get_rel_pathname(id); let f = retry_forever(&mut || unsafe { self.dir.fd.openat(p.as_ptr(), libc::O_WRONLY | libc::O_EXCL | libc::O_CREAT, 0o600) @@ -771,13 +772,11 @@ impl<'a> Writer<'a> { self.state = WriterState::Open(InnerWriter { f, r, - index: recording::SampleIndexEncoder::new(), + e: recording::SampleIndexEncoder::new(), id, hasher: hash::Hasher::new(hash::MessageDigest::sha1())?, - prev_end: prev.map(|p| p.end_time), local_start: recording::Time(i64::max_value()), adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)), - run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0), unflushed_sample: None, }); match self.state { @@ -812,8 +811,7 @@ impl<'a> Writer<'a> { unflushed.pts_90k, pts_90k); } let duration = w.adjuster.adjust(duration); - w.index.add_sample(duration, unflushed.len, unflushed.is_key); - w.extend_local_start(unflushed.local_time); + w.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time); } let mut remaining = pkt; while !remaining.is_empty() { @@ -836,7 +834,7 @@ impl<'a> Writer<'a> { pub fn close(&mut self, next_pts: Option) { self.state = match mem::replace(&mut self.state, WriterState::Unopened) { WriterState::Open(w) => { - let prev = w.close(self.channel, self.video_sample_entry_id, next_pts); + let prev = w.close(self.channel, next_pts); WriterState::Closed(prev) }, s => s, @@ -845,45 +843,42 @@ impl<'a> Writer<'a> { } impl InnerWriter { - fn extend_local_start(&mut self, pkt_local_time: recording::Time) { - let new = pkt_local_time - recording::Duration(self.index.total_duration_90k as i64); + fn add_sample(&mut self, duration_90k: i32, bytes: i32, is_key: bool, + pkt_local_time: recording::Time) { + let mut l = self.r.lock(); + self.e.add_sample(duration_90k, bytes, is_key, &mut l); + let new = pkt_local_time - recording::Duration(l.duration_90k as i64); self.local_start = cmp::min(self.local_start, new); + if l.run_offset == 0 { // start time isn't anchored to previous recording's end; adjust. + l.start = self.local_start; + } } - fn close(mut self, channel: &SyncerChannel, video_sample_entry_id: i32, - next_pts: Option) -> PreviousWriter { + fn close(mut self, channel: &SyncerChannel, next_pts: Option) -> PreviousWriter { let unflushed = self.unflushed_sample.take().expect("should always be an unflushed sample"); - let duration = self.adjuster.adjust(match next_pts { - None => 0, - Some(p) => (p - unflushed.pts_90k) as i32, - }); - self.index.add_sample(duration, unflushed.len, unflushed.is_key); - self.extend_local_start(unflushed.local_time); + let (duration, flags) = match next_pts { + None => (self.adjuster.adjust(0), db::RecordingFlags::TrailingZero as i32), + Some(p) => (self.adjuster.adjust((p - unflushed.pts_90k) as i32), 0), + }; let mut sha1_bytes = [0u8; 20]; sha1_bytes.copy_from_slice(&self.hasher.finish().unwrap()[..]); - let start = self.prev_end.unwrap_or(self.local_start); - let end = start + recording::Duration(self.index.total_duration_90k as i64); - let flags = if self.index.has_trailing_zero() { db::RecordingFlags::TrailingZero as i32 } - else { 0 }; - let local_start_delta = self.local_start - start; - let recording = db::RecordingToInsert { - sample_file_bytes: self.index.sample_file_bytes, - time: start .. end, - local_time_delta: local_start_delta, - video_samples: self.index.video_samples, - video_sync_samples: self.index.video_sync_samples, - video_sample_entry_id, - video_index: self.index.video_index, - sample_file_sha1: sha1_bytes, - run_offset: self.run_offset, - flags: flags, - }; - self.r.lock().recording = Some(recording); + let (local_time_delta, run_offset, end); + self.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time); + { + let mut l = self.r.lock(); + l.flags = flags; + local_time_delta = self.local_start - l.start; + l.local_time_delta = local_time_delta; + l.sample_file_sha1 = sha1_bytes; + run_offset = l.run_offset; + end = l.start + recording::Duration(l.duration_90k as i64); + } + drop(self.r); channel.async_save_recording(self.id, self.f); PreviousWriter { - end_time: end, - local_time_delta: local_start_delta, - run_offset: self.run_offset, + end, + local_time_delta, + run_offset, } } } @@ -894,7 +889,7 @@ impl<'a> Drop for Writer<'a> { // Swallow any error. The caller should only drop the Writer without calling close() // if there's already been an error. The caller should report that. No point in // complaining again. - let _ = w.close(self.channel, self.video_sample_entry_id, None); + let _ = w.close(self.channel, None); } } } diff --git a/db/raw.rs b/db/raw.rs index bc0b99b..7c2ad78 100644 --- a/db/raw.rs +++ b/db/raw.rs @@ -191,10 +191,6 @@ pub(crate) fn get_db_uuid(conn: &rusqlite::Connection) -> Result { /// Inserts the specified recording (for from `try_flush` only). pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: CompositeId, r: &db::RecordingToInsert) -> Result<(), Error> { - if r.time.end < r.time.start { - bail!("end time {} must be >= start time {}", r.time.end, r.time.start); - } - let mut stmt = tx.prepare_cached(INSERT_RECORDING_SQL)?; stmt.execute_named(&[ (":composite_id", &id.0), @@ -203,8 +199,8 @@ pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: Com (":run_offset", &r.run_offset), (":flags", &r.flags), (":sample_file_bytes", &r.sample_file_bytes), - (":start_time_90k", &r.time.start.0), - (":duration_90k", &(r.time.end.0 - r.time.start.0)), + (":start_time_90k", &r.start.0), + (":duration_90k", &r.duration_90k), (":local_time_delta_90k", &r.local_time_delta.0), (":video_samples", &r.video_samples), (":video_sync_samples", &r.video_sync_samples), diff --git a/db/recording.rs b/db/recording.rs index 35a5c09..f11f1ec 100644 --- a/db/recording.rs +++ b/db/recording.rs @@ -43,7 +43,7 @@ pub const DESIRED_RECORDING_DURATION: i64 = 60 * TIME_UNITS_PER_SEC; pub const MAX_RECORDING_DURATION: i64 = 5 * 60 * TIME_UNITS_PER_SEC; /// A time specified as 90,000ths of a second since 1970-01-01 00:00:00 UTC. -#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)] pub struct Time(pub i64); impl Time { @@ -290,43 +290,30 @@ impl SampleIndexIterator { #[derive(Debug)] pub struct SampleIndexEncoder { - // Internal state. prev_duration_90k: i32, prev_bytes_key: i32, prev_bytes_nonkey: i32, - - // Eventual output. - // TODO: move to another struct? - pub sample_file_bytes: i32, - pub total_duration_90k: i32, - pub video_samples: i32, - pub video_sync_samples: i32, - pub video_index: Vec, } impl SampleIndexEncoder { pub fn new() -> Self { - SampleIndexEncoder{ + SampleIndexEncoder { prev_duration_90k: 0, prev_bytes_key: 0, prev_bytes_nonkey: 0, - total_duration_90k: 0, - sample_file_bytes: 0, - video_samples: 0, - video_sync_samples: 0, - video_index: Vec::new(), } } - pub fn add_sample(&mut self, duration_90k: i32, bytes: i32, is_key: bool) { + pub fn add_sample(&mut self, duration_90k: i32, bytes: i32, is_key: bool, + r: &mut db::RecordingToInsert) { let duration_delta = duration_90k - self.prev_duration_90k; self.prev_duration_90k = duration_90k; - self.total_duration_90k += duration_90k; - self.sample_file_bytes += bytes; - self.video_samples += 1; + r.duration_90k += duration_90k; + r.sample_file_bytes += bytes; + r.video_samples += 1; let bytes_delta = bytes - if is_key { let prev = self.prev_bytes_key; - self.video_sync_samples += 1; + r.video_sync_samples += 1; self.prev_bytes_key = bytes; prev } else { @@ -334,11 +321,9 @@ impl SampleIndexEncoder { self.prev_bytes_nonkey = bytes; prev }; - append_varint32((zigzag32(duration_delta) << 1) | (is_key as u32), &mut self.video_index); - append_varint32(zigzag32(bytes_delta), &mut self.video_index); + append_varint32((zigzag32(duration_delta) << 1) | (is_key as u32), &mut r.video_index); + append_varint32(zigzag32(bytes_delta), &mut r.video_index); } - - pub fn has_trailing_zero(&self) -> bool { self.prev_duration_90k == 0 } } /// A segment represents a view of some or all of a single recording, starting from a key frame. @@ -566,16 +551,17 @@ mod tests { #[test] fn test_encode_example() { testutil::init(); + let mut r = db::RecordingToInsert::default(); let mut e = SampleIndexEncoder::new(); - e.add_sample(10, 1000, true); - e.add_sample(9, 10, false); - e.add_sample(11, 15, false); - e.add_sample(10, 12, false); - e.add_sample(10, 1050, true); - assert_eq!(e.video_index, b"\x29\xd0\x0f\x02\x14\x08\x0a\x02\x05\x01\x64"); - assert_eq!(10 + 9 + 11 + 10 + 10, e.total_duration_90k); - assert_eq!(5, e.video_samples); - assert_eq!(2, e.video_sync_samples); + e.add_sample(10, 1000, true, &mut r); + e.add_sample(9, 10, false, &mut r); + e.add_sample(11, 15, false, &mut r); + e.add_sample(10, 12, false, &mut r); + e.add_sample(10, 1050, true, &mut r); + assert_eq!(r.video_index, b"\x29\xd0\x0f\x02\x14\x08\x0a\x02\x05\x01\x64"); + assert_eq!(10 + 9 + 11 + 10 + 10, r.duration_90k); + assert_eq!(5, r.video_samples); + assert_eq!(2, r.video_sync_samples); } /// Tests a round trip from `SampleIndexEncoder` to `SampleIndexIterator`. @@ -595,19 +581,20 @@ mod tests { Sample{duration_90k: 18, bytes: 31000, is_key: true}, Sample{duration_90k: 0, bytes: 1000, is_key: false}, ]; + let mut r = db::RecordingToInsert::default(); let mut e = SampleIndexEncoder::new(); for sample in &samples { - e.add_sample(sample.duration_90k, sample.bytes, sample.is_key); + e.add_sample(sample.duration_90k, sample.bytes, sample.is_key, &mut r); } let mut it = SampleIndexIterator::new(); for sample in &samples { - assert!(it.next(&e.video_index).unwrap()); + assert!(it.next(&r.video_index).unwrap()); assert_eq!(sample, &Sample{duration_90k: it.duration_90k, bytes: it.bytes, is_key: it.is_key()}); } - assert!(!it.next(&e.video_index).unwrap()); + assert!(!it.next(&r.video_index).unwrap()); } /// Tests that `SampleIndexIterator` spots several classes of errors. @@ -649,14 +636,15 @@ mod tests { #[test] fn test_segment_clipping_with_all_sync() { testutil::init(); + let mut r = db::RecordingToInsert::default(); let mut encoder = SampleIndexEncoder::new(); for i in 1..6 { let duration_90k = 2 * i; let bytes = 3 * i; - encoder.add_sample(duration_90k, bytes, true); + encoder.add_sample(duration_90k, bytes, true, &mut r); } let db = TestDb::new(); - let row = db.create_recording_from_encoder(encoder); + let row = db.insert_recording_from_encoder(r); // Time range [2, 2 + 4 + 6 + 8) means the 2nd, 3rd, 4th samples should be // included. let segment = Segment::new(&db.db.lock(), &row, 2 .. 2+4+6+8).unwrap(); @@ -667,14 +655,15 @@ mod tests { #[test] fn test_segment_clipping_with_half_sync() { testutil::init(); + let mut r = db::RecordingToInsert::default(); let mut encoder = SampleIndexEncoder::new(); for i in 1..6 { let duration_90k = 2 * i; let bytes = 3 * i; - encoder.add_sample(duration_90k, bytes, (i % 2) == 1); + encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r); } let db = TestDb::new(); - let row = db.create_recording_from_encoder(encoder); + let row = db.insert_recording_from_encoder(r); // Time range [2 + 4 + 6, 2 + 4 + 6 + 8) means the 4th sample should be included. // The 3rd also gets pulled in because it is a sync frame and the 4th is not. let segment = Segment::new(&db.db.lock(), &row, 2+4+6 .. 2+4+6+8).unwrap(); @@ -684,12 +673,13 @@ mod tests { #[test] fn test_segment_clipping_with_trailing_zero() { testutil::init(); + let mut r = db::RecordingToInsert::default(); let mut encoder = SampleIndexEncoder::new(); - encoder.add_sample(1, 1, true); - encoder.add_sample(1, 2, true); - encoder.add_sample(0, 3, true); + encoder.add_sample(1, 1, true, &mut r); + encoder.add_sample(1, 2, true, &mut r); + encoder.add_sample(0, 3, true, &mut r); let db = TestDb::new(); - let row = db.create_recording_from_encoder(encoder); + let row = db.insert_recording_from_encoder(r); let segment = Segment::new(&db.db.lock(), &row, 1 .. 2).unwrap(); assert_eq!(&get_frames(&db.db, &segment, |it| it.bytes), &[2, 3]); } @@ -698,10 +688,11 @@ mod tests { #[test] fn test_segment_zero_desired_duration() { testutil::init(); + let mut r = db::RecordingToInsert::default(); let mut encoder = SampleIndexEncoder::new(); - encoder.add_sample(1, 1, true); + encoder.add_sample(1, 1, true, &mut r); let db = TestDb::new(); - let row = db.create_recording_from_encoder(encoder); + let row = db.insert_recording_from_encoder(r); let segment = Segment::new(&db.db.lock(), &row, 0 .. 0).unwrap(); assert_eq!(&get_frames(&db.db, &segment, |it| it.bytes), &[1]); } @@ -711,14 +702,15 @@ mod tests { #[test] fn test_segment_fast_path() { testutil::init(); + let mut r = db::RecordingToInsert::default(); let mut encoder = SampleIndexEncoder::new(); for i in 1..6 { let duration_90k = 2 * i; let bytes = 3 * i; - encoder.add_sample(duration_90k, bytes, (i % 2) == 1); + encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r); } let db = TestDb::new(); - let row = db.create_recording_from_encoder(encoder); + let row = db.insert_recording_from_encoder(r); let segment = Segment::new(&db.db.lock(), &row, 0 .. 2+4+6+8+10).unwrap(); assert_eq!(&get_frames(&db.db, &segment, |it| it.duration_90k), &[2, 4, 6, 8, 10]); } @@ -726,12 +718,13 @@ mod tests { #[test] fn test_segment_fast_path_with_trailing_zero() { testutil::init(); + let mut r = db::RecordingToInsert::default(); let mut encoder = SampleIndexEncoder::new(); - encoder.add_sample(1, 1, true); - encoder.add_sample(1, 2, true); - encoder.add_sample(0, 3, true); + encoder.add_sample(1, 1, true, &mut r); + encoder.add_sample(1, 2, true, &mut r); + encoder.add_sample(0, 3, true, &mut r); let db = TestDb::new(); - let row = db.create_recording_from_encoder(encoder); + let row = db.insert_recording_from_encoder(r); let segment = Segment::new(&db.db.lock(), &row, 0 .. 2).unwrap(); assert_eq!(&get_frames(&db.db, &segment, |it| it.bytes), &[1, 2, 3]); } diff --git a/db/testutil.rs b/db/testutil.rs index 7c5e4f2..8e57373 100644 --- a/db/testutil.rs +++ b/db/testutil.rs @@ -34,7 +34,6 @@ use db; use dir; use fnv::FnvHashMap; use mylog; -use recording::{self, TIME_UNITS_PER_SEC}; use rusqlite; use std::env; use std::sync::{self, Arc}; @@ -125,26 +124,20 @@ impl TestDb { } } - pub fn create_recording_from_encoder(&self, encoder: recording::SampleIndexEncoder) - -> db::ListRecordingsRow { + /// Creates a recording with a fresh `RecordingToInsert` row which has been touched only by + /// a `SampleIndexEncoder`. Fills in a video sample entry id and such to make it valid. + /// There will no backing sample file, so it won't be possible to generate a full `.mp4`. + pub fn insert_recording_from_encoder(&self, r: db::RecordingToInsert) + -> db::ListRecordingsRow { + use recording::{self, TIME_UNITS_PER_SEC}; let mut db = self.db.lock(); let video_sample_entry_id = db.insert_video_sample_entry( 1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap(); - const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); - let (id, u) = db.add_recording(TEST_STREAM_ID).unwrap(); - u.lock().recording = Some(db::RecordingToInsert { - sample_file_bytes: encoder.sample_file_bytes, - time: START_TIME .. - START_TIME + recording::Duration(encoder.total_duration_90k as i64), - local_time_delta: recording::Duration(0), - video_samples: encoder.video_samples, - video_sync_samples: encoder.video_sync_samples, - video_sample_entry_id: video_sample_entry_id, - video_index: encoder.video_index, - sample_file_sha1: [0u8; 20], - run_offset: 0, - flags: db::RecordingFlags::TrailingZero as i32, - }); + let (id, _) = db.add_recording(TEST_STREAM_ID, db::RecordingToInsert { + start: recording::Time(1430006400i64 * TIME_UNITS_PER_SEC), + video_sample_entry_id, + ..r + }).unwrap(); db.mark_synced(id).unwrap(); db.flush("create_recording_from_encoder").unwrap(); let mut row = None; @@ -157,30 +150,26 @@ impl TestDb { // For benchmarking #[cfg(feature="nightly")] pub fn add_dummy_recordings_to_db(db: &db::Database, num: usize) { + use recording::{self, TIME_UNITS_PER_SEC}; let mut data = Vec::new(); data.extend_from_slice(include_bytes!("testdata/video_sample_index.bin")); let mut db = db.lock(); let video_sample_entry_id = db.insert_video_sample_entry( 1920, 1080, [0u8; 100].to_vec(), "avc1.000000".to_owned()).unwrap(); - const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); - const DURATION: recording::Duration = recording::Duration(5399985); let mut recording = db::RecordingToInsert { sample_file_bytes: 30104460, - flags: 0, - time: START_TIME .. (START_TIME + DURATION), - local_time_delta: recording::Duration(0), + start: recording::Time(1430006400i64 * TIME_UNITS_PER_SEC), + duration_90k: 5399985, video_samples: 1800, video_sync_samples: 60, video_sample_entry_id: video_sample_entry_id, video_index: data, - sample_file_sha1: [0; 20], run_offset: 0, + ..Default::default() }; for _ in 0..num { - let (id, u) = db.add_recording(TEST_STREAM_ID).unwrap(); - u.lock().recording = Some(recording.clone()); - recording.time.start += DURATION; - recording.time.end += DURATION; + let (id, _) = db.add_recording(TEST_STREAM_ID, recording.clone()).unwrap(); + recording.start += recording::Duration(recording.duration_90k as i64); recording.run_offset += 1; db.mark_synced(id).unwrap(); } diff --git a/design/api.md b/design/api.md index 3b37a75..1facbae 100644 --- a/design/api.md +++ b/design/api.md @@ -175,6 +175,11 @@ Each recording object has the following properties: it's possible that after a crash and restart, this id will refer to a completely different recording. That recording will have a different `openId`. +* `growing` (optional). If this boolean is true, the recording `endId` is + still being written to. Accesses to this id (such as `view.mp4`) may + retrieve more data than described here if not bounded by duration. + Additionally, if `startId` == `endId`, the start time of the recording is + "unanchored" and may change in subsequent accesses. * `openId`. Each time Moonfire NVR starts in read-write mode, it is assigned an increasing "open id". This field is the open id as of when these recordings were written. This can be used to disambiguate ids referring to diff --git a/src/json.rs b/src/json.rs index 98cde99..00f27dd 100644 --- a/src/json.rs +++ b/src/json.rs @@ -32,6 +32,7 @@ use db; use failure::Error; use serde::ser::{SerializeMap, SerializeSeq, Serializer}; use std::collections::BTreeMap; +use std::ops::Not; use uuid::Uuid; #[derive(Serialize)] @@ -182,4 +183,7 @@ pub struct Recording { pub end_id: Option, pub video_sample_entry_width: u16, pub video_sample_entry_height: u16, + + #[serde(skip_serializing_if = "Not::not")] + pub growing: bool, } diff --git a/src/mp4.rs b/src/mp4.rs index 44110ad..7a35f80 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -1857,12 +1857,12 @@ mod tests { /// Makes a `.mp4` file which is only good for exercising the `Slice` logic for producing /// sample tables that match the supplied encoder. fn make_mp4_from_encoders(type_: Type, db: &TestDb, - mut encoders: Vec, + mut recordings: Vec, desired_range_90k: Range) -> File { let mut builder = FileBuilder::new(type_); let mut duration_so_far = 0; - for e in encoders.drain(..) { - let row = db.create_recording_from_encoder(e); + for r in recordings.drain(..) { + let row = db.insert_recording_from_encoder(r); let d_start = if desired_range_90k.start < duration_so_far { 0 } else { desired_range_90k.start - duration_so_far }; let d_end = if desired_range_90k.end > duration_so_far + row.duration_90k @@ -1878,15 +1878,16 @@ mod tests { fn test_all_sync_frames() { testutil::init(); let db = TestDb::new(); + let mut r = db::RecordingToInsert::default(); let mut encoder = recording::SampleIndexEncoder::new(); for i in 1..6 { let duration_90k = 2 * i; let bytes = 3 * i; - encoder.add_sample(duration_90k, bytes, true); + encoder.add_sample(duration_90k, bytes, true, &mut r); } // Time range [2, 2+4+6+8) means the 2nd, 3rd, and 4th samples should be included. - let mp4 = make_mp4_from_encoders(Type::Normal, &db, vec![encoder], 2 .. 2+4+6+8); + let mp4 = make_mp4_from_encoders(Type::Normal, &db, vec![r], 2 .. 2+4+6+8); let track = find_track(mp4, 1); assert!(track.edts_cursor.is_none()); let mut cursor = track.stbl_cursor; @@ -1931,16 +1932,17 @@ mod tests { fn test_half_sync_frames() { testutil::init(); let db = TestDb::new(); + let mut r = db::RecordingToInsert::default(); let mut encoder = recording::SampleIndexEncoder::new(); for i in 1..6 { let duration_90k = 2 * i; let bytes = 3 * i; - encoder.add_sample(duration_90k, bytes, (i % 2) == 1); + encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r); } // Time range [2+4+6, 2+4+6+8) means the 4th sample should be included. // The 3rd gets pulled in also because it's a sync frame and the 4th isn't. - let mp4 = make_mp4_from_encoders(Type::Normal, &db, vec![encoder], 2+4+6 .. 2+4+6+8); + let mp4 = make_mp4_from_encoders(Type::Normal, &db, vec![r], 2+4+6 .. 2+4+6+8); let track = find_track(mp4, 1); // Examine edts. It should skip the 3rd frame. @@ -1994,15 +1996,17 @@ mod tests { testutil::init(); let db = TestDb::new(); let mut encoders = Vec::new(); + let mut r = db::RecordingToInsert::default(); let mut encoder = recording::SampleIndexEncoder::new(); - encoder.add_sample(1, 1, true); - encoder.add_sample(2, 2, false); - encoder.add_sample(3, 3, true); - encoders.push(encoder); + encoder.add_sample(1, 1, true, &mut r); + encoder.add_sample(2, 2, false, &mut r); + encoder.add_sample(3, 3, true, &mut r); + encoders.push(r); + let mut r = db::RecordingToInsert::default(); let mut encoder = recording::SampleIndexEncoder::new(); - encoder.add_sample(4, 4, true); - encoder.add_sample(5, 5, false); - encoders.push(encoder); + encoder.add_sample(4, 4, true, &mut r); + encoder.add_sample(5, 5, false, &mut r); + encoders.push(r); // This should include samples 3 and 4 only, both sync frames. let mp4 = make_mp4_from_encoders(Type::Normal, &db, encoders, 1+2 .. 1+2+3+4); @@ -2029,13 +2033,15 @@ mod tests { testutil::init(); let db = TestDb::new(); let mut encoders = Vec::new(); + let mut r = db::RecordingToInsert::default(); let mut encoder = recording::SampleIndexEncoder::new(); - encoder.add_sample(2, 1, true); - encoder.add_sample(3, 2, false); - encoders.push(encoder); + encoder.add_sample(2, 1, true, &mut r); + encoder.add_sample(3, 2, false, &mut r); + encoders.push(r); + let mut r = db::RecordingToInsert::default(); let mut encoder = recording::SampleIndexEncoder::new(); - encoder.add_sample(0, 3, true); - encoders.push(encoder); + encoder.add_sample(0, 3, true, &mut r); + encoders.push(r); // Multi-segment recording with an edit list, encoding with a zero-duration recording. let mp4 = make_mp4_from_encoders(Type::Normal, &db, encoders, 1 .. 2+3); @@ -2052,16 +2058,17 @@ mod tests { fn test_media_segment() { testutil::init(); let db = TestDb::new(); + let mut r = db::RecordingToInsert::default(); let mut encoder = recording::SampleIndexEncoder::new(); for i in 1..6 { let duration_90k = 2 * i; let bytes = 3 * i; - encoder.add_sample(duration_90k, bytes, (i % 2) == 1); + encoder.add_sample(duration_90k, bytes, (i % 2) == 1, &mut r); } // Time range [2+4+6, 2+4+6+8+1) means the 4th sample and part of the 5th are included. // The 3rd gets pulled in also because it's a sync frame and the 4th isn't. - let mp4 = make_mp4_from_encoders(Type::MediaSegment, &db, vec![encoder], + let mp4 = make_mp4_from_encoders(Type::MediaSegment, &db, vec![r], 2+4+6 .. 2+4+6+8+1); let mut cursor = BoxCursor::new(mp4); cursor.down(); diff --git a/src/web.rs b/src/web.rs index 3dd1648..45b2593 100644 --- a/src/web.rs +++ b/src/web.rs @@ -279,6 +279,7 @@ impl ServiceInner { video_sample_entry_width: vse.width, video_sample_entry_height: vse.height, video_sample_entry_sha1: strutil::hex(&vse.sha1), + growing: row.growing, }); Ok(()) })?; diff --git a/ui-src/index.js b/ui-src/index.js index 94cb456..725373e 100644 --- a/ui-src/index.js +++ b/ui-src/index.js @@ -85,6 +85,9 @@ function onSelectVideo(camera, streamType, range, recording) { if (trim && recording.endTime90k > range.endTime90k) { rel += range.endTime90k - recording.startTime90k; endTime90k = range.endTime90k; + } else if (recording.growing !== undefined) { + // View just the portion described here. + rel += recording.endTime90k - recording.startTime90k; } if (rel !== '-') { url += '.' + rel;