From 3ba3bf2b18d86768d889d3dad0e171f5ca841509 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Mon, 21 Jan 2019 15:58:52 -0800 Subject: [PATCH] backend support for live stream (#59) This is so far completely untested, for use by a new UI prototype. It creates a new URL endpoint which sends one video/mp4 media segment per key frame, with the dependent frames included. This means there will be about one key frame interval of latency (typically about a second). This seems hard to avoid, as mentioned in issue #59. --- Cargo.lock | 33 +++++++++++++ base/error.rs | 4 ++ db/Cargo.toml | 1 + db/db.rs | 126 ++++++++++++++++++++++++++++++++++-------------- db/writer.rs | 44 +++++++++++++---- design/api.md | 71 +++++++++++++++++++++++++++ src/body.rs | 1 - src/cmds/run.rs | 2 + src/web.rs | 103 +++++++++++++++++++++++++++++++++++++-- 9 files changed, 333 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 53248dd..181b5ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -838,6 +838,7 @@ dependencies = [ "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "moonfire-base 0.0.1", "mylog 0.1.0 (git+https://github.com/scottlamb/mylog)", + "odds 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "openssl 0.10.16 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1014,6 +1015,16 @@ dependencies = [ "libc 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "odds" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rawpointer 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rawslice 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "unchecked-index 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "openssl" version = "0.10.16" @@ -1292,6 +1303,19 @@ dependencies = [ "rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rawpointer" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "rawslice" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rawpointer 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rayon" version = "0.8.2" @@ -1905,6 +1929,11 @@ name = "ucd-util" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "unchecked-index" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "unicase" version = "1.4.2" @@ -2169,6 +2198,7 @@ dependencies = [ "checksum num-rational 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4e96f040177bb3da242b5b1ecf3f54b5d5af3efbbfb18608977a5d2767b22f10" "checksum num-traits 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "0b3a5d7cc97d6d30d8b9bc8fa19bf45349ffe46241e8816f50f62f6d6aaabee1" "checksum num_cpus 1.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5a69d464bdc213aaaff628444e99578ede64e9c854025aa43b9796530afa9238" +"checksum odds 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a9a18d7081eb052145753e982d7b8de495f15f74636d0d963f09116581eab665" "checksum openssl 0.10.16 (registry+https://github.com/rust-lang/crates.io-index)" = "ec7bd7ca4cce6dbdc77e7c1230682740d307d1218a87fb0349a571272be749f9" "checksum openssl-probe 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" "checksum openssl-sys 0.9.40 (registry+https://github.com/rust-lang/crates.io-index)" = "1bb974e77de925ef426b6bc82fce15fd45bdcbeb5728bffcfc7cdeeb7ce1c2d6" @@ -2200,6 +2230,8 @@ dependencies = [ "checksum rand_isaac 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ded997c9d5f13925be2a6fd7e66bf1872597f759fd9dd93513dd7e92e5a5ee08" "checksum rand_pcg 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "086bd09a33c7044e56bb44d5bdde5a60e7f119a9e95b0775f545de759a32fe05" "checksum rand_xorshift 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "effa3fcaa47e18db002bdde6060944b6d2f9cfd8db471c30e873448ad9187be3" +"checksum rawpointer 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ebac11a9d2e11f2af219b8b8d833b76b1ea0e054aa0e8d8e9e4cbde353bdf019" +"checksum rawslice 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "22b23b9f57ea250c6db4b21e2897b43ff08209217ca8260469fae6c0f9ad7e25" "checksum rayon 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b614fe08b6665cb9a231d07ac1364b0ef3cb3698f1239ee0c4c3a88a524f54c8" "checksum rayon-core 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b055d1e92aba6877574d8fe604a63c8b5df60f60e5982bf7ccbb1338ea527356" "checksum redox_syscall 0.1.44 (registry+https://github.com/rust-lang/crates.io-index)" = "a84bcd297b87a545980a2d25a0beb72a1f490c31f0a9fde52fca35bfbb1ceb70" @@ -2264,6 +2296,7 @@ dependencies = [ "checksum toml 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "758664fc71a3a69038656bee8b6be6477d2a6c315a6b81f7081f591bffa4111f" "checksum try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382" "checksum ucd-util 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "535c204ee4d8434478593480b8f86ab45ec9aae0e83c568ca81abf0fd0e88f86" +"checksum unchecked-index 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "eeba86d422ce181a719445e51872fa30f1f7413b62becb52e95ec91aa262d85c" "checksum unicase 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7f4765f83163b74f957c797ad9253caf97f103fb064d3999aea9568d09fc8a33" "checksum unicase 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9d3218ea14b4edcaccfa0df0a64a3792a2c32cc706f1b336e48867f9d3147f90" "checksum unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5" diff --git a/base/error.rs b/base/error.rs index c089a76..28ae79d 100644 --- a/base/error.rs +++ b/base/error.rs @@ -40,6 +40,10 @@ impl Error { pub fn kind(&self) -> ErrorKind { *self.inner.get_context() } + + pub fn compat(self) -> failure::Compat> { + self.inner.compat() + } } impl Fail for Error { diff --git a/db/Cargo.toml b/db/Cargo.toml index 610ab22..8528606 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -23,6 +23,7 @@ libpasta = "0.1.0-rc2" log = "0.4" lru-cache = "0.1" mylog = { git = "https://github.com/scottlamb/mylog" } +odds = { version = "0.3.1", features = ["std-vec"] } openssl = "0.10" parking_lot = { version = "0.7", features = [] } protobuf = "2.0" diff --git a/db/db.rs b/db/db.rs index c0d8d27..eeabe53 100644 --- a/db/db.rs +++ b/db/db.rs @@ -395,7 +395,6 @@ impl ::std::fmt::Display for StreamType { pub const ALL_STREAM_TYPES: [StreamType; 2] = [StreamType::MAIN, StreamType::SUB]; -#[derive(Clone, Debug)] pub struct Stream { pub id: i32, pub camera_id: i32, @@ -445,6 +444,20 @@ pub struct Stream { /// The number of recordings in `uncommitted` which are synced and ready to commit. synced_recordings: usize, + + on_live_segment: Vec bool + Send>>, +} + +/// Bounds of a single keyframe and the frames dependent on it. +/// This is used for live stream recordings. The stream id should already be known to the +/// subscriber. +#[derive(Clone, Debug)] +pub struct LiveSegment { + pub recording: i32, + + /// The pts, relative to the start of the recording, of the start and end of this live segment, + /// in 90kHz units. + pub off_90k: Range, } #[derive(Clone, Debug, Default)] @@ -592,7 +605,7 @@ pub struct LockedDatabase { flush_count: usize, /// If the database is open in read-write mode, the information about the current Open row. - open: Option, + pub open: Option, /// The monotonic time when the database was opened (whether in read-write mode or read-only /// mode). @@ -611,8 +624,8 @@ pub struct LockedDatabase { /// Represents a row of the `open` database table. #[derive(Copy, Clone, Debug)] -pub(crate) struct Open { - pub(crate) id: u32, +pub struct Open { + pub id: u32, pub(crate) uuid: Uuid, } @@ -638,7 +651,7 @@ impl ::std::fmt::Display for CompositeId { /// structs. struct StreamStateChanger { sids: [Option; 2], - streams: Vec<(i32, Option)>, + streams: Vec<(i32, Option<(i32, StreamType, StreamChange)>)>, } impl StreamStateChanger { @@ -651,6 +664,7 @@ impl StreamStateChanger { let mut streams = Vec::with_capacity(2); let existing_streams = existing.map(|e| e.streams).unwrap_or_default(); for (i, ref mut sc) in change.streams.iter_mut().enumerate() { + let type_ = StreamType::from_index(i).unwrap(); let mut have_data = false; if let Some(sid) = existing_streams[i] { let s = streams_by_id.get(&sid).unwrap(); @@ -694,14 +708,8 @@ impl StreamStateChanger { bail!("missing stream {}", sid); } sids[i] = Some(sid); - let s = (*s).clone(); - streams.push((sid, Some(Stream { - sample_file_dir_id: sc.sample_file_dir_id, - rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()), - record: sc.record, - flush_if_sec: sc.flush_if_sec, - ..s - }))); + let sc = mem::replace(*sc, StreamChange::default()); + streams.push((sid, Some((camera_id, type_, sc)))); } } else { if sc.rtsp_path.is_empty() && sc.sample_file_dir_id.is_none() && !sc.record { @@ -715,7 +723,6 @@ impl StreamStateChanger { values (:camera_id, :sample_file_dir_id, :type, :rtsp_path, :record, 0, :flush_if_sec, 1) "#)?; - let type_ = StreamType::from_index(i).unwrap(); stmt.execute_named(&[ (":camera_id", &camera_id), (":sample_file_dir_id", &sc.sample_file_dir_id), @@ -726,26 +733,8 @@ impl StreamStateChanger { ])?; let id = tx.last_insert_rowid() as i32; sids[i] = Some(id); - streams.push((id, Some(Stream { - id, - type_, - camera_id, - sample_file_dir_id: sc.sample_file_dir_id, - rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()), - retain_bytes: 0, - flush_if_sec: sc.flush_if_sec, - range: None, - sample_file_bytes: 0, - to_delete: Vec::new(), - bytes_to_delete: 0, - bytes_to_add: 0, - duration: recording::Duration(0), - days: BTreeMap::new(), - record: sc.record, - next_recording_id: 1, - uncommitted: VecDeque::new(), - synced_recordings: 0, - }))); + let sc = mem::replace(*sc, StreamChange::default()); + streams.push((id, Some((camera_id, type_, sc)))); } } Ok(StreamStateChanger { @@ -760,9 +749,37 @@ impl StreamStateChanger { for (id, stream) in self.streams.drain(..) { use ::std::collections::btree_map::Entry; match (streams_by_id.entry(id), stream) { - (Entry::Vacant(e), Some(new)) => { e.insert(new); }, + (Entry::Vacant(e), Some((camera_id, type_, mut sc))) => { + e.insert(Stream { + id, + type_, + camera_id, + sample_file_dir_id: sc.sample_file_dir_id, + rtsp_path: mem::replace(&mut sc.rtsp_path, String::new()), + retain_bytes: 0, + flush_if_sec: sc.flush_if_sec, + range: None, + sample_file_bytes: 0, + to_delete: Vec::new(), + bytes_to_delete: 0, + bytes_to_add: 0, + duration: recording::Duration(0), + days: BTreeMap::new(), + record: sc.record, + next_recording_id: 1, + uncommitted: VecDeque::new(), + synced_recordings: 0, + on_live_segment: Vec::new(), + }); + }, (Entry::Vacant(_), None) => {}, - (Entry::Occupied(mut e), Some(new)) => { e.insert(new); }, + (Entry::Occupied(e), Some((_, _, sc))) => { + let e = e.into_mut(); + e.sample_file_dir_id = sc.sample_file_dir_id; + e.rtsp_path = sc.rtsp_path; + e.record = sc.record; + e.flush_if_sec = sc.flush_if_sec; + }, (Entry::Occupied(e), None) => { e.remove(); }, }; } @@ -846,6 +863,40 @@ impl LockedDatabase { Ok(()) } + /// Registers a callback to run on every live segment immediately after it's recorded. + /// The callback is run with the database lock held, so it must not call back into the database + /// or block. The callback should return false to unregister. + pub fn watch_live(&mut self, stream_id: i32, cb: Box bool + Send>) + -> Result<(), Error> { + let s = match self.streams_by_id.get_mut(&stream_id) { + None => bail!("no such stream {}", stream_id), + Some(s) => s, + }; + s.on_live_segment.push(cb); + Ok(()) + } + + /// Clears all watches on all streams. + /// Normally watches are self-cleaning: when a segment is sent, the callback returns false if + /// it is no longer interested (typically because hyper has just noticed the client is no + /// longer connected). This doesn't work when the system is shutting down and nothing more is + /// sent, though. + pub fn clear_watches(&mut self) { + for (_, s) in &mut self.streams_by_id { + s.on_live_segment.clear(); + } + } + + pub(crate) fn send_live_segment(&mut self, stream: i32, l: LiveSegment) -> Result<(), Error> { + let s = match self.streams_by_id.get_mut(&stream) { + None => bail!("no such stream {}", stream), + Some(s) => s, + }; + use odds::vec::VecExt; + s.on_live_segment.retain_mut(|cb| cb(l.clone())); + Ok(()) + } + /// Helper for `DatabaseGuard::flush()` and `Database::drop()`. /// /// The public API is in `DatabaseGuard::flush()`; it supplies the `Clocks` to this function. @@ -978,7 +1029,7 @@ impl LockedDatabase { // handlers given that it didn't add them. pub fn clear_on_flush(&mut self) { self.on_flush.clear(); - } + } /// Opens the given sample file directories. /// @@ -1427,6 +1478,7 @@ impl LockedDatabase { record: row.get_checked(8)?, uncommitted: VecDeque::new(), synced_recordings: 0, + on_live_segment: Vec::new(), }); c.streams[type_.index()] = Some(id); } diff --git a/db/writer.rs b/db/writer.rs index 064c2cf..1ec8153 100644 --- a/db/writer.rs +++ b/db/writer.rs @@ -563,6 +563,11 @@ struct InnerWriter { r: Arc>, e: recording::SampleIndexEncoder, id: CompositeId, + + /// The pts, relative to the start of this segment and in 90kHz units, up until which live + /// segments have been sent out. Initially 0. + completed_live_segment_off_90k: i32, + hasher: hash::Hasher, /// The start time of this segment, based solely on examining the local clock after frames in @@ -636,7 +641,7 @@ impl ClockAdjuster { #[derive(Copy, Clone)] struct UnflushedSample { local_time: recording::Time, - pts_90k: i64, + pts_90k: i64, // relative to the start of the stream, not a single recording. len: i32, is_key: bool, } @@ -650,6 +655,7 @@ struct PreviousWriter { } impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { + /// `db` must not be locked. pub fn new(dir: &'a D, db: &'a db::Database, channel: &'a SyncerChannel, stream_id: i32, video_sample_entry_id: i32) -> Self { Writer { @@ -686,6 +692,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { r, e: recording::SampleIndexEncoder::new(), id, + completed_live_segment_off_90k: 0, hasher: hash::Hasher::new(hash::MessageDigest::sha1())?, local_start: recording::Time(i64::max_value()), adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)), @@ -716,7 +723,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { // We must restore it on all success or error paths. if let Some(unflushed) = w.unflushed_sample.take() { - let duration = (pts_90k - unflushed.pts_90k) as i32; + let duration = (pts_90k - unflushed.pts_90k as i64) as i32; if duration <= 0 { // Restore invariant. w.unflushed_sample = Some(unflushed); @@ -724,7 +731,17 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { unflushed.pts_90k, pts_90k); } let duration = w.adjuster.adjust(duration); - w.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time); + let d = w.add_sample(duration, unflushed.len, unflushed.is_key, unflushed.local_time); + + // If the sample `write` was called on is a key frame, then the prior frames (including + // the one we just flushed) represent a live segment. Send it out. + if is_key { + self.db.lock().send_live_segment(self.stream_id, db::LiveSegment { + recording: w.id.recording(), + off_90k: w.completed_live_segment_off_90k .. d, + }).unwrap(); + w.completed_live_segment_off_90k = d; + } } let mut remaining = pkt; while !remaining.is_empty() { @@ -747,7 +764,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { pub fn close(&mut self, next_pts: Option) { self.state = match mem::replace(&mut self.state, WriterState::Unopened) { WriterState::Open(w) => { - let prev = w.close(self.channel, next_pts); + let prev = w.close(self.channel, next_pts, self.db, self.stream_id); WriterState::Closed(prev) }, s => s, @@ -756,8 +773,9 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { } impl InnerWriter { + /// Returns the total duration of the `RecordingToInsert` (needed for live view path). fn add_sample(&mut self, duration_90k: i32, bytes: i32, is_key: bool, - pkt_local_time: recording::Time) { + pkt_local_time: recording::Time) -> i32 { let mut l = self.r.lock(); self.e.add_sample(duration_90k, bytes, is_key, &mut l); let new = pkt_local_time - recording::Duration(l.duration_90k as i64); @@ -765,9 +783,11 @@ impl InnerWriter { if l.run_offset == 0 { // start time isn't anchored to previous recording's end; adjust. l.start = self.local_start; } + l.duration_90k } - fn close(mut self, channel: &SyncerChannel, next_pts: Option) -> PreviousWriter { + fn close(mut self, channel: &SyncerChannel, next_pts: Option, + db: &db::Database, stream_id: i32) -> PreviousWriter { let unflushed = self.unflushed_sample.take().expect("should always be an unflushed sample"); let (last_sample_duration, flags) = match next_pts { None => (self.adjuster.adjust(0), db::RecordingFlags::TrailingZero as i32), @@ -776,8 +796,14 @@ impl InnerWriter { let mut sha1_bytes = [0u8; 20]; sha1_bytes.copy_from_slice(&self.hasher.finish().unwrap()[..]); let (local_time_delta, run_offset, end); - self.add_sample(last_sample_duration, unflushed.len, unflushed.is_key, - unflushed.local_time); + let d = self.add_sample(last_sample_duration, unflushed.len, unflushed.is_key, + unflushed.local_time); + + // This always ends a live segment. + db.lock().send_live_segment(stream_id, db::LiveSegment { + recording: self.id.recording(), + off_90k: self.completed_live_segment_off_90k .. d, + }).unwrap(); let total_duration; { let mut l = self.r.lock(); @@ -809,7 +835,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Drop for Writer<'a, C, D> { // Swallow any error. The caller should only drop the Writer without calling close() // if there's already been an error. The caller should report that. No point in // complaining again. - let _ = w.close(self.channel, None); + let _ = w.close(self.channel, None, self.db, self.stream_id); } } } diff --git a/design/api.md b/design/api.md index f1d29fa..077bc15 100644 --- a/design/api.md +++ b/design/api.md @@ -338,6 +338,77 @@ recording segment for several reasons: A GET returns a `text/plain` debugging string for the `.mp4` generated by the same URL minus the `.txt` suffix. +### `/api/cameras///live.m4s` + +A GET returns a `multipart/mixed` sequence of parts. An extra top-level +header, `X-Open-Id`, contains the `openId` which is assigned to all recordings +in this live stream. + +Each part is a `.mp4` media segment that starts with a key frame and contains +all other frames which depend on that key frame. The following part headers +will be included: + +* `Content-Length`: as defined by HTTP +* `Content-Type`: the MIME type, including `codecs` parameter. +* `X-Recording-Id`: the ID of the recording these frames are contained in. +* `X-Time-Range`: the relative start and end times of these frames within + the recording, in the same format as `REL_START_TIME` and `REL_END_TIME` + above. + +Cameras are typically configured to have about one key frame per second, so +there will be one part per second when the stream is working. If the stream is +not connected, the HTTP GET request will wait until the stream is established, +possibly forever. + +Example request URI: + +``` +/api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/live.m4s +``` + +Example response: + +``` +Content-Type: multipart/mixed; boundary=B +X-Open-Id: 42 + +--B +Content-Length: 536445 +Content-Type: video/mp4; codecs="avc1.640028" +X-Recording-Id: 5680 +X-Time-Range: 5220058-5400061 +X-Video-Sample-Entry-Sha1: 25fad1b92c344dadc0473a783dff957b0d7d56bb + +binary mp4 data + +--B +Content-Length: 541118 +Content-Type: video/mp4; codecs="avc1.640028" +X-Recording-Id: 5681 +X-Time-Range: 0-180002 +X-Video-Sample-Entry-Sha1: 25fad1b92c344dadc0473a783dff957b0d7d56bb + +binary mp4 data + +--B +Content-Length: 539195 +Content-Type: video/mp4; codecs="avc1.640028" +X-Recording-Id: 5681 +X-Time-Range: 180002-360004 +X-Video-Sample-Entry-Sha1: 25fad1b92c344dadc0473a783dff957b0d7d56bb + +binary mp4 data + +... +``` + +These segments are exactly the same as ones that can be retrieved at the +following URLs, respectively: + + * `/api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/view.m4s?s=5680@42.5220058-5400061` + * `/api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/view.m4s?s=5681@42.0-180002` + * `/api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/view.m4s?s=5681@42.180002-360004` + ### `/api/init/.mp4` A GET returns a `.mp4` suitable for use as a [HTML5 Media Source Extensions diff --git a/src/body.rs b/src/body.rs index 04cc3f2..39e93f0 100644 --- a/src/body.rs +++ b/src/body.rs @@ -31,7 +31,6 @@ //! Tools for implementing a `http_serve::Entity` body composed from many "slices". use base::Error; -use failure::Fail; use futures::{Stream, stream}; use hyper::body::Payload; use reffers::ARefs; diff --git a/src/cmds/run.rs b/src/cmds/run.rs index 74c1f01..d7893df 100644 --- a/src/cmds/run.rs +++ b/src/cmds/run.rs @@ -293,6 +293,8 @@ pub fn run() -> Result<(), Error> { } } + db.lock().clear_watches(); + info!("Waiting for HTTP requests to finish."); reactor.join().unwrap(); info!("Exiting."); diff --git a/src/web.rs b/src/web.rs index e0ba479..6219cf3 100644 --- a/src/web.rs +++ b/src/web.rs @@ -29,7 +29,7 @@ // along with this program. If not, see . use base::clock::Clocks; -use base::{ErrorKind, strutil}; +use base::{ErrorKind, ResultExt, bail_t, strutil}; use crate::body::{Body, BoxedError}; use crate::json; use crate::mp4; @@ -77,6 +77,7 @@ enum Path { StreamRecordings(Uuid, db::StreamType), // "/api/cameras///recordings" StreamViewMp4(Uuid, db::StreamType, bool), // "/api/cameras///view.mp4{.txt}" StreamViewMp4Segment(Uuid, db::StreamType, bool), // "/api/cameras///view.m4s{.txt}" + StreamLiveMp4Segments(Uuid, db::StreamType), // "/api/cameras///live.m4s" Login, // "/api/login" Logout, // "/api/logout" Static, // (anything that doesn't start with "/api/") @@ -149,6 +150,7 @@ impl Path { "/view.mp4.txt" => Path::StreamViewMp4(uuid, type_, true), "/view.m4s" => Path::StreamViewMp4Segment(uuid, type_, false), "/view.m4s.txt" => Path::StreamViewMp4Segment(uuid, type_, true), + "/live.m4s" => Path::StreamLiveMp4Segments(uuid, type_), _ => Path::NotFound, } } @@ -378,19 +380,19 @@ impl ServiceInner { } fn stream_view_mp4(&self, req: &Request<::hyper::Body>, uuid: Uuid, - stream_type_: db::StreamType, mp4_type_: mp4::Type, debug: bool) + stream_type: db::StreamType, mp4_type: mp4::Type, debug: bool) -> ResponseResult { let stream_id = { let db = self.db.lock(); let camera = db.get_camera(uuid) .ok_or_else(|| plain_response(StatusCode::NOT_FOUND, format!("no such camera {}", uuid)))?; - camera.streams[stream_type_.index()] + camera.streams[stream_type.index()] .ok_or_else(|| plain_response(StatusCode::NOT_FOUND, format!("no such stream {}/{}", uuid, - stream_type_)))? + stream_type)))? }; - let mut builder = mp4::FileBuilder::new(mp4_type_); + let mut builder = mp4::FileBuilder::new(mp4_type); if let Some(q) = req.uri().query() { for (key, value) in form_urlencoded::parse(q.as_bytes()) { let (key, value) = (key.borrow(), value.borrow()); @@ -797,6 +799,91 @@ impl Service { .map_err(|e| internal_server_err(format_err!("unable to read request body: {}", e)))) } + + fn stream_live_m4s(&self, _req: &Request<::hyper::Body>, uuid: Uuid, + stream_type: db::StreamType) -> ResponseResult { + let stream_id; + let open_id; + let (sub_tx, sub_rx) = futures::sync::mpsc::unbounded(); + { + let mut db = self.0.db.lock(); + open_id = match db.open { + None => return Err(plain_response( + StatusCode::PRECONDITION_FAILED, + "database is read-only; there are no live streams")), + Some(o) => o.id, + }; + let camera = db.get_camera(uuid) + .ok_or_else(|| plain_response(StatusCode::NOT_FOUND, + format!("no such camera {}", uuid)))?; + stream_id = camera.streams[stream_type.index()] + .ok_or_else(|| plain_response(StatusCode::NOT_FOUND, + format!("no such stream {}/{}", uuid, + stream_type)))?; + db.watch_live(stream_id, Box::new(move |l| sub_tx.unbounded_send(l).is_ok())) + .expect("stream_id refed by camera"); + } + let inner = self.0.clone(); + let body: crate::body::BodyStream = Box::new(sub_rx + .map_err(|()| unreachable!()) + .and_then(move |live| -> Result<_, base::Error> { + let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment); + let mut vse_id = None; + { + let db = inner.db.lock(); + let mut rows = 0; + db.list_recordings_by_id(stream_id, live.recording .. live.recording+1, + &mut |r| { + rows += 1; + let vse = db.video_sample_entries_by_id().get(&r.video_sample_entry_id) + .unwrap(); + vse_id = Some(strutil::hex(&vse.sha1)); + builder.append(&db, r, live.off_90k.clone())?; + Ok(()) + }).err_kind(base::ErrorKind::Unknown)?; + if rows != 1 { + bail_t!(Internal, "unable to find {:?}", live); + } + } + let vse_id = vse_id.unwrap(); + use http_serve::Entity; + let mp4 = builder.build(inner.db.clone(), inner.dirs_by_stream_id.clone())?; + let mut hdrs = http::header::HeaderMap::new(); + mp4.add_headers(&mut hdrs); + //Ok(format!("{:?}\n\n", mp4).into()) + let mime_type = hdrs.get(http::header::CONTENT_TYPE).unwrap(); + let len = mp4.len(); + use futures::stream::once; + let hdr = format!( + "--B\r\n\ + Content-Length: {}\r\n\ + Content-Type: {}\r\n\ + X-Recording-Id: {}\r\n\ + X-Time-Range: {}-{}\r\n\ + X-Video-Sample-Entry-Sha1: {}\r\n\r\n", + len, + mime_type.to_str().unwrap(), + live.recording, + live.off_90k.start, + live.off_90k.end, + &vse_id); + let v: Vec = vec![ + Box::new(once(Ok(hdr.into()))), + mp4.get_range(0 .. len), + Box::new(once(Ok("\r\n\r\n".into()))) + ]; + Ok(futures::stream::iter_ok::<_, crate::body::BoxedError>(v)) + }) + .map_err(|e| Box::new(e.compat())) + .flatten() + .flatten()); + let body: Body = body.into(); + Ok(http::Response::builder() + .header("X-Open-Id", open_id.to_string()) + .header("Content-Type", "multipart/mixed; boundary=B") + .body(body) + .unwrap()) + } } impl ::hyper::service::Service for Service { @@ -851,6 +938,9 @@ impl ::hyper::service::Service for Service { wrap_r(true, self.0.stream_view_mp4(&req, uuid, type_, mp4::Type::MediaSegment, debug)) }, + Path::StreamLiveMp4Segments(uuid, type_) => { + wrap_r(true, self.stream_live_m4s(&req, uuid, type_)) + }, Path::NotFound => wrap(true, future::err(not_found("path not understood"))), Path::Login => wrap(true, self.with_form_body(req).and_then({ let s = self.clone(); @@ -1000,6 +1090,9 @@ mod tests { assert_eq!( Path::decode("/api/cameras/35144640-ff1e-4619-b0d5-4c74c185741c/main/view.m4s.txt"), Path::StreamViewMp4Segment(cam_uuid, db::StreamType::MAIN, true)); + assert_eq!( + Path::decode("/api/cameras/35144640-ff1e-4619-b0d5-4c74c185741c/main/live.m4s"), + Path::StreamLiveMp4Segments(cam_uuid, db::StreamType::MAIN)); assert_eq!( Path::decode("/api/cameras/35144640-ff1e-4619-b0d5-4c74c185741c/main/junk"), Path::NotFound);