diff --git a/db/db.rs b/db/db.rs index 53f5b1c..15d12e4 100644 --- a/db/db.rs +++ b/db/db.rs @@ -144,6 +144,7 @@ pub struct ListRecordingsRow { pub video_sync_samples: i32, pub sample_file_bytes: i32, pub run_offset: i32, + pub open_id: u32, pub flags: i32, } @@ -157,8 +158,9 @@ pub struct ListAggregatedRecordingsRow { pub sample_file_bytes: i64, pub video_sample_entry_id: i32, pub stream_id: i32, - pub flags: i32, pub run_start_id: i32, + pub open_id: u32, + pub first_uncommitted: Option, } /// Select fields from the `recordings_playback` table. Retrieve with `with_recording_playback`. @@ -170,6 +172,9 @@ pub struct RecordingPlayback<'a> { /// Bitmask in the `flags` field in the `recordings` table; see `schema.sql`. pub enum RecordingFlags { TrailingZero = 1, + + // These values (starting from high bit on down) are never written to the database. + Uncommitted = 2147483648, } /// A recording to pass to `insert_recording`. @@ -187,6 +192,24 @@ pub(crate) struct RecordingToInsert { pub sample_file_sha1: [u8; 20], } +impl RecordingToInsert { + fn to_list_row(&self, id: CompositeId, open_id: u32) -> ListRecordingsRow { + ListRecordingsRow { + start: self.time.start, + video_sample_entry_id: self.video_sample_entry_id, + id, + duration_90k: (self.time.end - self.time.start).0 as i32, + video_samples: self.video_samples, + video_sync_samples: self.video_sync_samples, + sample_file_bytes: self.sample_file_bytes, + run_offset: self.run_offset, + open_id, + flags: self.flags | RecordingFlags::Uncommitted as i32, + } + } +} + + /// A row used in `raw::list_oldest_recordings` and `db::delete_oldest_recordings`. #[derive(Copy, Clone, Debug)] pub(crate) struct ListOldestRecordingsRow { @@ -1000,19 +1023,64 @@ impl LockedDatabase { } } - /// Lists the specified recordings in ascending order by start time, passing them to a supplied - /// function. Given that the function is called with the database lock held, it should be quick. + /// Lists the specified recordings, passing them to a supplied function. Given that the + /// function is called with the database lock held, it should be quick. + /// + /// Note that at present, the returned recordings are _not_ completely ordered by start time. + /// Uncommitted recordings are returned id order after the others. pub fn list_recordings_by_time( &self, stream_id: i32, desired_time: Range, f: &mut FnMut(ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> { - raw::list_recordings_by_time(&self.conn, stream_id, desired_time, f) + let s = match self.streams_by_id.get(&stream_id) { + None => bail!("no such stream {}", stream_id), + Some(s) => s, + }; + raw::list_recordings_by_time(&self.conn, stream_id, desired_time.clone(), f)?; + for i in 0 .. s.synced_recordings { + let row = { + let l = s.uncommitted[i].lock(); + if let Some(ref r) = l.recording { + if r.time.start > desired_time.end || r.time.end < r.time.start { + continue; // there's no overlap with the requested range. + } + r.to_list_row(CompositeId::new(stream_id, s.next_recording_id + i as i32), + self.open.unwrap().id) + } else { + continue; + } + }; + f(row)?; + } + Ok(()) } /// Lists the specified recordings in ascending order by id. pub fn list_recordings_by_id( &self, stream_id: i32, desired_ids: Range, f: &mut FnMut(ListRecordingsRow) -> Result<(), Error>) -> Result<(), Error> { - raw::list_recordings_by_id(&self.conn, stream_id, desired_ids, f) + let s = match self.streams_by_id.get(&stream_id) { + None => bail!("no such stream {}", stream_id), + Some(s) => s, + }; + if desired_ids.start < s.next_recording_id { + raw::list_recordings_by_id(&self.conn, stream_id, desired_ids.clone(), f)?; + } + if desired_ids.end > s.next_recording_id { + let start = cmp::min(0, desired_ids.start - s.next_recording_id); + for i in start .. desired_ids.end - s.next_recording_id { + let row = { + let l = s.uncommitted[i as usize].lock(); + if let Some(ref r) = l.recording { + r.to_list_row(CompositeId::new(stream_id, s.next_recording_id + i as i32), + self.open.unwrap().id) + } else { + continue; + } + }; + f(row)?; + } + } + Ok(()) } /// Calls `list_recordings_by_time` and aggregates consecutive recordings. @@ -1035,7 +1103,8 @@ impl LockedDatabase { // is equal to the start_time of recording id r+1. Thus ascending times guarantees // ascending ids within a run. (Different runs, however, can be arbitrarily interleaved if // their timestamps overlap. Tracking all active runs prevents that interleaving from - // causing problems.) + // causing problems.) list_recordings_by_time also returns uncommitted recordings in + // ascending order by id, and after any committed recordings with lower ids. let mut aggs: BTreeMap = BTreeMap::new(); self.list_recordings_by_time(stream_id, desired_time, &mut |row| { let recording_id = row.id.recording(); @@ -1052,6 +1121,7 @@ impl LockedDatabase { let a = aggs.remove(&run_start_id).expect("needs_flush when agg is None"); f(&a)?; } + let uncommitted = (row.flags & RecordingFlags::Uncommitted as i32) != 0; let need_insert = if let Some(ref mut a) = aggs.get_mut(&run_start_id) { if a.time.end != row.start { bail!("stream {} recording {} ends at {}; {} starts at {}; expected same", @@ -1062,6 +1132,9 @@ impl LockedDatabase { a.video_samples += row.video_samples as i64; a.video_sync_samples += row.video_sync_samples as i64; a.sample_file_bytes += row.sample_file_bytes as i64; + if uncommitted { + a.first_uncommitted = a.first_uncommitted.or(Some(recording_id)); + } false } else { true @@ -1076,7 +1149,8 @@ impl LockedDatabase { video_sample_entry_id: row.video_sample_entry_id, stream_id, run_start_id, - flags: row.flags, + open_id: row.open_id, + first_uncommitted: if uncommitted { Some(recording_id) } else { None }, }); }; Ok(()) @@ -1092,6 +1166,25 @@ impl LockedDatabase { /// This uses a LRU cache to reduce the number of retrievals from the database. pub fn with_recording_playback(&self, id: CompositeId, f: F) -> Result where F: FnOnce(&RecordingPlayback) -> Result { + // Check for uncommitted path. + let s = self.streams_by_id + .get(&id.stream()) + .ok_or_else(|| format_err!("no stream for {}", id))?; + if s.next_recording_id <= id.recording() { + let i = id.recording() - s.next_recording_id; + if i as usize >= s.uncommitted.len() { + bail!("no such recording {}; latest committed is {}, latest is {}", + id, s.next_recording_id, s.next_recording_id + s.uncommitted.len() as i32); + } + let l = s.uncommitted[i as usize].lock(); + if let Some(ref r) = l.recording { + return f(&RecordingPlayback { video_index: &r.video_index }); + } else { + bail!("recording {} is not ready", id); + } + } + + // Committed path. let mut cache = self.video_index_cache.borrow_mut(); if let Some(video_index) = cache.get_mut(&id.0) { trace!("cache hit for recording {}", id); diff --git a/db/raw.rs b/db/raw.rs index 834938e..bc0b99b 100644 --- a/db/raw.rs +++ b/db/raw.rs @@ -49,7 +49,8 @@ const LIST_RECORDINGS_BY_TIME_SQL: &'static str = r#" recording.sample_file_bytes, recording.video_samples, recording.video_sync_samples, - recording.video_sample_entry_id + recording.video_sample_entry_id, + recording.open_id from recording where @@ -71,7 +72,8 @@ const LIST_RECORDINGS_BY_ID_SQL: &'static str = r#" recording.sample_file_bytes, recording.video_samples, recording.video_sync_samples, - recording.video_sample_entry_id + recording.video_sample_entry_id, + recording.open_id from recording where @@ -173,6 +175,7 @@ fn list_recordings_inner(mut rows: rusqlite::Rows, video_samples: row.get_checked(6)?, video_sync_samples: row.get_checked(7)?, video_sample_entry_id: row.get_checked(8)?, + open_id: row.get_checked(9)?, })?; } Ok(()) diff --git a/db/recording.rs b/db/recording.rs index a4bb0dc..35a5c09 100644 --- a/db/recording.rs +++ b/db/recording.rs @@ -346,6 +346,7 @@ impl SampleIndexEncoder { #[derive(Debug)] pub struct Segment { pub id: db::CompositeId, + pub open_id: u32, pub start: Time, /// An iterator positioned at the beginning of the segment, or `None`. Most segments are @@ -373,6 +374,7 @@ impl Segment { desired_range_90k: Range) -> Result { let mut self_ = Segment { id: recording.id, + open_id: recording.open_id, start: recording.start, begin: None, file_end: recording.sample_file_bytes, diff --git a/design/api.md b/design/api.md index c35cad0..3b37a75 100644 --- a/design/api.md +++ b/design/api.md @@ -170,6 +170,15 @@ Each recording object has the following properties: together are as described. Adjacent recordings from the same RTSP session may be coalesced in this fashion to reduce the amount of redundant data transferred. +* `firstUncommitted` (optional). If this range is not fully committed to the + database, the first id that is uncommitted. This is significant because + it's possible that after a crash and restart, this id will refer to a + completely different recording. That recording will have a different + `openId`. +* `openId`. Each time Moonfire NVR starts in read-write mode, it is assigned + an increasing "open id". This field is the open id as of when these + recordings were written. This can be used to disambiguate ids referring to + uncommitted recordings. * `startTime90k`: the start time of the given recording. Note this may be less than the requested `startTime90k` if this recording was ongoing at the requested time. @@ -224,10 +233,12 @@ MIME type will be `video/mp4`, with a `codecs` parameter as specified in [RFC Expected query parameters: * `s` (one or more): a string of the form - `START_ID[-END_ID][.[REL_START_TIME]-[REL_END_TIME]]`. This specifies - recording segments to include. The produced `.mp4` file will be a + `START_ID[-END_ID][@OPEN_ID][.[REL_START_TIME]-[REL_END_TIME]]`. This + specifies recording segments to include. The produced `.mp4` file will be a concatenation of the segments indicated by all `s` parameters. The ids to - retrieve are as returned by the `/recordings` URL. The optional start and + retrieve are as returned by the `/recordings` URL. The open id is optional + and will be enforced if present; it's recommended for disambiguation when + the requested range includes uncommitted recordings. The optional start and end times are in 90k units and relative to the start of the first specified id. These can be used to clip the returned segments. Note they can be used to skip over some ids entirely; this is allowed so that the caller doesn't diff --git a/src/json.rs b/src/json.rs index cc9ca40..98cde99 100644 --- a/src/json.rs +++ b/src/json.rs @@ -173,6 +173,10 @@ pub struct Recording { pub video_samples: i64, pub video_sample_entry_sha1: String, pub start_id: i32, + pub open_id: u32, + + #[serde(skip_serializing_if = "Option::is_none")] + pub first_uncommitted: Option, #[serde(skip_serializing_if = "Option::is_none")] pub end_id: Option, diff --git a/src/mp4.rs b/src/mp4.rs index 5e41d95..44110ad 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -804,10 +804,11 @@ impl FileBuilder { } // Update the etag to reflect this segment. - let mut data = [0_u8; 24]; + let mut data = [0_u8; 28]; let mut cursor = io::Cursor::new(&mut data[..]); cursor.write_i64::(s.s.id.0)?; cursor.write_i64::(s.s.start.0)?; + cursor.write_u32::(s.s.open_id)?; cursor.write_i32::(d.start)?; cursor.write_i32::(d.end)?; etag.update(cursor.into_inner())?; @@ -2103,7 +2104,7 @@ mod tests { // combine ranges from the new format with ranges from the old format. let sha1 = digest(&mp4); assert_eq!("1e5331e8371bd97ac3158b3a86494abc87cdc70e", strutil::hex(&sha1[..])); - const EXPECTED_ETAG: &'static str = "c56ef7eb3b4a713ceafebc3dc7958bd9e62a2fae"; + const EXPECTED_ETAG: &'static str = "04298efb2df0cc45a6cea65dfdf2e817a3b42ca8"; assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); drop(db.syncer_channel); db.db.lock().clear_on_flush(); @@ -2124,7 +2125,7 @@ mod tests { // combine ranges from the new format with ranges from the old format. let sha1 = digest(&mp4); assert_eq!("de382684a471f178e4e3a163762711b0653bfd83", strutil::hex(&sha1[..])); - const EXPECTED_ETAG: &'static str = "3bdc2c8ce521df50155d0ca4d7497ada448fa7c3"; + const EXPECTED_ETAG: &'static str = "16a4f6348560c3de0d149675dccba21ef7906be3"; assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); drop(db.syncer_channel); db.db.lock().clear_on_flush(); @@ -2145,7 +2146,7 @@ mod tests { // combine ranges from the new format with ranges from the old format. let sha1 = digest(&mp4); assert_eq!("d655945f94e18e6ed88a2322d27522aff6f76403", strutil::hex(&sha1[..])); - const EXPECTED_ETAG: &'static str = "3986d3bd9b866c3455fb7359fb134aa2d9107af7"; + const EXPECTED_ETAG: &'static str = "80e418b029e81aa195f90aa6b806015a5030e5be"; assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); drop(db.syncer_channel); db.db.lock().clear_on_flush(); @@ -2166,7 +2167,7 @@ mod tests { // combine ranges from the new format with ranges from the old format. let sha1 = digest(&mp4); assert_eq!("e0d28ddf08e24575a82657b1ce0b2da73f32fd88", strutil::hex(&sha1[..])); - const EXPECTED_ETAG: &'static str = "9e789398c9a71ca834fec8fbc55b389f99d12dda"; + const EXPECTED_ETAG: &'static str = "5bfea0f20108a7c5b77ef1e21d82ef2abc29540f"; assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); drop(db.syncer_channel); db.db.lock().clear_on_flush(); diff --git a/src/web.rs b/src/web.rs index 830e4ec..3dd1648 100644 --- a/src/web.rs +++ b/src/web.rs @@ -61,8 +61,9 @@ use uuid::Uuid; lazy_static! { /// Regex used to parse the `s` query parameter to `view.mp4`. /// As described in `design/api.md`, this is of the form - /// `START_ID[-END_ID][.[REL_START_TIME]-[REL_END_TIME]]`. - static ref SEGMENTS_RE: Regex = Regex::new(r"^(\d+)(-\d+)?(?:\.(\d+)?-(\d+)?)?$").unwrap(); + /// `START_ID[-END_ID][@OPEN_ID][.[REL_START_TIME]-[REL_END_TIME]]`. + static ref SEGMENTS_RE: Regex = + Regex::new(r"^(\d+)(-\d+)?(@\d+)?(?:\.(\d+)?-(\d+)?)?$").unwrap(); } enum Path { @@ -135,6 +136,7 @@ fn decode_path(path: &str) -> Path { #[derive(Debug, Eq, PartialEq)] struct Segments { ids: Range, + open_id: Option, start_time: i64, end_time: Option, } @@ -144,17 +146,21 @@ impl Segments { let caps = SEGMENTS_RE.captures(input).ok_or(())?; let ids_start = i32::from_str(caps.get(1).unwrap().as_str()).map_err(|_| ())?; let ids_end = match caps.get(2) { - Some(e) => i32::from_str(&e.as_str()[1..]).map_err(|_| ())?, + Some(m) => i32::from_str(&m.as_str()[1..]).map_err(|_| ())?, None => ids_start, } + 1; + let open_id = match caps.get(3) { + Some(m) => Some(u32::from_str(&m.as_str()[1..]).map_err(|_| ())?), + None => None, + }; if ids_start < 0 || ids_end <= ids_start { return Err(()); } - let start_time = caps.get(3).map_or(Ok(0), |m| i64::from_str(m.as_str())).map_err(|_| ())?; + let start_time = caps.get(4).map_or(Ok(0), |m| i64::from_str(m.as_str())).map_err(|_| ())?; if start_time < 0 { return Err(()); } - let end_time = match caps.get(4) { + let end_time = match caps.get(5) { Some(v) => { let e = i64::from_str(v.as_str()).map_err(|_| ())?; if e <= start_time { @@ -164,10 +170,11 @@ impl Segments { }, None => None }; - Ok(Segments{ + Ok(Segments { ids: ids_start .. ids_end, - start_time: start_time, - end_time: end_time, + open_id, + start_time, + end_time, }) } } @@ -262,10 +269,12 @@ impl ServiceInner { let vse = db.video_sample_entries_by_id().get(&row.video_sample_entry_id).unwrap(); out.recordings.push(json::Recording { start_id: row.ids.start, - end_id: if end == row.ids.start + 1 { None } else { Some(end) }, + end_id: if end == row.ids.start { None } else { Some(end) }, start_time_90k: row.time.start.0, end_time_90k: row.time.end.0, sample_file_bytes: row.sample_file_bytes, + open_id: row.open_id, + first_uncommitted: row.first_uncommitted, video_samples: row.video_samples, video_sample_entry_width: vse.width, video_sample_entry_height: vse.height, @@ -331,6 +340,13 @@ impl ServiceInner { db.list_recordings_by_id(stream_id, s.ids.clone(), &mut |r| { let recording_id = r.id.recording(); + if let Some(o) = s.open_id { + if r.open_id != o { + bail!("recording {} has open id {}, requested {}", + r.id, r.open_id, o); + } + } + // Check for missing recordings. match prev { None if recording_id == s.ids.start => {}, @@ -507,21 +523,25 @@ mod tests { #[test] fn test_segments() { testutil::init(); - assert_eq!(Segments{ids: 1..2, start_time: 0, end_time: None}, + assert_eq!(Segments{ids: 1..2, open_id: None, start_time: 0, end_time: None}, Segments::parse("1").unwrap()); - assert_eq!(Segments{ids: 1..2, start_time: 26, end_time: None}, + assert_eq!(Segments{ids: 1..2, open_id: Some(42), start_time: 0, end_time: None}, + Segments::parse("1@42").unwrap()); + assert_eq!(Segments{ids: 1..2, open_id: None, start_time: 26, end_time: None}, Segments::parse("1.26-").unwrap()); - assert_eq!(Segments{ids: 1..2, start_time: 0, end_time: Some(42)}, + assert_eq!(Segments{ids: 1..2, open_id: Some(42), start_time: 26, end_time: None}, + Segments::parse("1@42.26-").unwrap()); + assert_eq!(Segments{ids: 1..2, open_id: None, start_time: 0, end_time: Some(42)}, Segments::parse("1.-42").unwrap()); - assert_eq!(Segments{ids: 1..2, start_time: 26, end_time: Some(42)}, + assert_eq!(Segments{ids: 1..2, open_id: None, start_time: 26, end_time: Some(42)}, Segments::parse("1.26-42").unwrap()); - assert_eq!(Segments{ids: 1..6, start_time: 0, end_time: None}, + assert_eq!(Segments{ids: 1..6, open_id: None, start_time: 0, end_time: None}, Segments::parse("1-5").unwrap()); - assert_eq!(Segments{ids: 1..6, start_time: 26, end_time: None}, + assert_eq!(Segments{ids: 1..6, open_id: None, start_time: 26, end_time: None}, Segments::parse("1-5.26-").unwrap()); - assert_eq!(Segments{ids: 1..6, start_time: 0, end_time: Some(42)}, + assert_eq!(Segments{ids: 1..6, open_id: None, start_time: 0, end_time: Some(42)}, Segments::parse("1-5.-42").unwrap()); - assert_eq!(Segments{ids: 1..6, start_time: 26, end_time: Some(42)}, + assert_eq!(Segments{ids: 1..6, open_id: None, start_time: 26, end_time: Some(42)}, Segments::parse("1-5.26-42").unwrap()); } } diff --git a/ui-src/index.js b/ui-src/index.js index 8c8c96d..94cb456 100644 --- a/ui-src/index.js +++ b/ui-src/index.js @@ -70,6 +70,9 @@ function onSelectVideo(camera, streamType, range, recording) { if (recording.endId !== undefined) { url += '-' + recording.endId; } + if (recording.firstUncommitted !== undefined) { + url += '@' + recording.openId; // disambiguate. + } const trim = $("#trim").prop("checked"); let rel = ''; let startTime90k = recording.startTime90k;