diff --git a/Cargo.lock b/Cargo.lock index 68fba14..c8c6090 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -984,6 +984,15 @@ dependencies = [ "autocfg 1.0.0", ] +[[package]] +name = "input_buffer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19a8a95243d5a0398cae618ec29477c6e3cb631152be5c19481f80bc71559754" +dependencies = [ + "bytes", +] + [[package]] name = "iovec" version = "0.1.4" @@ -1304,6 +1313,7 @@ dependencies = [ "tempdir", "time 0.1.42", "tokio", + "tokio-tungstenite", "url", "uuid", ] @@ -2190,6 +2200,18 @@ dependencies = [ "yaml-rust", ] +[[package]] +name = "sha-1" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d94d0bede923b3cea61f3f1ff57ff8cdfd77b400fb8f9998949e0cf04163df" +dependencies = [ + "block-buffer", + "digest", + "fake-simd", + "opaque-debug", +] + [[package]] name = "sha2" version = "0.8.1" @@ -2484,6 +2506,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8b8fe88007ebc363512449868d7da4389c9400072a3f666f212c7280082882a" +dependencies = [ + "futures", + "log", + "pin-project", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.2.0" @@ -2519,6 +2554,25 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382" +[[package]] +name = "tungstenite" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfea31758bf674f990918962e8e5f07071a3161bd7c4138ed23e416e1ac4264e" +dependencies = [ + "base64 0.11.0", + "byteorder", + "bytes", + "http", + "httparse", + "input_buffer", + "log", + "rand 0.7.3", + "sha-1", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.11.2" @@ -2599,6 +2653,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" + [[package]] name = "uuid" version = "0.8.1" diff --git a/Cargo.toml b/Cargo.toml index c42482e..bdc8442 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ serde_json = "1.0" smallvec = "1.0" time = "0.1" tokio = { version = "0.2.0", features = ["blocking", "macros", "rt-threaded", "signal"] } +tokio-tungstenite = "0.10.1" url = "2.1.1" uuid = { version = "0.8", features = ["serde", "std", "v4"] } diff --git a/design/api.md b/design/api.md index 75446c8..d3fe7b7 100644 --- a/design/api.md +++ b/design/api.md @@ -416,17 +416,21 @@ URL minus the `.txt` suffix. ### `GET /api/cameras///live.m4s` -Returns a `multipart/mixed` sequence of parts. An extra top-level header, -`X-Open-Id`, contains the `openId` which is assigned to all recordings in this -live stream. +Initiate a WebSocket stream for chunks of video. Expects the standard +WebSocket headers as described in [RFC 6455][rfc-6455] and (if authentication +is required) the `s` cookie. -Each part is a `.mp4` media segment that starts with a key frame and contains -all other frames which depend on that key frame. The following part headers -will be included: +The server will send a sequence of binary messages. Each message corresponds +to one run (GOP) of video: a key (IDR) frame and all other frames which depend +on it. These are encoded as a `.mp4` media segment. The following headers will +be included: -* `Content-Length`: as defined by HTTP -* `Content-Type`: the MIME type, including `codecs` parameter. -* `X-Recording-Id`: the ID of the recording these frames are contained in. +* `X-Recording-Id`: the open id, a period, and the recording id of the + recording these frames belong to. +* `X-Recording-Start`: the timestamp (in Moonfire NVR's usual 90,000ths + of a second) of the start of the recording. Note that if the recording + is "unanchored" (as described in `GET /api/.../recordings`), the + recording's start time may change before it is completed. * `X-Time-Range`: the relative start and end times of these frames within the recording, in the same format as `REL_START_TIME` and `REL_END_TIME` above. @@ -442,40 +446,36 @@ Example request URI: /api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/live.m4s ``` -Example response: +Example binary message sequence: ``` -Content-Type: multipart/mixed; boundary=B -X-Open-Id: 42 - ---B -Content-Length: 536445 Content-Type: video/mp4; codecs="avc1.640028" -X-Recording-Id: 5680 +X-Recording-Id: 42.5680 +X-Recording-Start: 130985461191810 X-Time-Range: 5220058-5400061 X-Video-Sample-Entry-Sha1: 25fad1b92c344dadc0473a783dff957b0d7d56bb binary mp4 data +``` ---B -Content-Length: 541118 +``` Content-Type: video/mp4; codecs="avc1.640028" -X-Recording-Id: 5681 +X-Recording-Id: 42.5681 +X-Recording-Start: 130985461191822 X-Time-Range: 0-180002 X-Video-Sample-Entry-Sha1: 25fad1b92c344dadc0473a783dff957b0d7d56bb binary mp4 data +``` ---B -Content-Length: 539195 +``` Content-Type: video/mp4; codecs="avc1.640028" -X-Recording-Id: 5681 +X-Recording-Id: 42.5681 +X-Recording-Start: 130985461191822 X-Time-Range: 180002-360004 X-Video-Sample-Entry-Sha1: 25fad1b92c344dadc0473a783dff957b0d7d56bb binary mp4 data - -... ``` These segments are exactly the same as ones that can be retrieved at the @@ -485,6 +485,13 @@ following URLs, respectively: * `/api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/view.m4s?s=5681@42.0-180002` * `/api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/view.m4s?s=5681@42.180002-360004` +Note: an earlier version of this API used a `multipart/mixed` segment instead, +compatible with the [multipart-stream-js][multipart-stream-js] library. The +problem with this approach is that browsers have low limits on the number of +active HTTP/1.1 connections: six in Chrome's case. The WebSocket limit is much +higher (256), allowing browser-side Javascript to stream all active camera +streams simultaneously as well as making other simultaneous HTTP requests. + ### `GET /api/init/.mp4` Returns a `.mp4` suitable for use as a [HTML5 Media Source Extensions @@ -660,3 +667,5 @@ Response: [media-segment]: https://w3c.github.io/media-source/isobmff-byte-stream-format.html#iso-media-segments [init-segment]: https://w3c.github.io/media-source/isobmff-byte-stream-format.html#iso-init-segments [rfc-6381]: https://tools.ietf.org/html/rfc6381 +[rfc-6455]: https://tools.ietf.org/html/rfc6455 +[multipart-mixed-js]: https://github.com/scottlamb/multipart-mixed-js diff --git a/guide/secure.md b/guide/secure.md index 6e1a87b..44c0655 100644 --- a/guide/secure.md +++ b/guide/secure.md @@ -198,6 +198,11 @@ upstream moonfire { server 127.0.0.1:8080; } +map $http_upgrade $connection_upgrade { + default Upgrade; + '' close; +} + server { root /var/www/html; index index.html index.htm index.nginx-debian.html; @@ -209,6 +214,10 @@ server { # try_files $uri $uri/ =404; } + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection $connection_upgrade; + proxy_set_header X-Forwarded-Proto $scheme; proxy_set_header X-Real-IP $remote_addr; proxy_set_header Host $http_host; @@ -256,5 +265,9 @@ Login with the credentials you added through `moonfire-nvr config` in the [previous guide](install.md). You should see your username and "logout" in the upper-right corner of the web interface. +Also try the live streaming feature, which requires WebSockets. The nginx +configuration above includes sections derived from nginx's [NGINX as a +WebSocket Proxy](https://www.nginx.com/blog/websocket-nginx/) doc. + If it doesn't work as expected, re-read this guide, then open an issue on github for help. diff --git a/src/mp4.rs b/src/mp4.rs index 225a0c6..bb220e0 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -1510,6 +1510,28 @@ impl FileInner { #[derive(Clone)] pub struct File(Arc); +impl File { + pub async fn append_into_vec(self, v: &mut Vec) -> Result<(), Error> { + use http_serve::Entity; + v.reserve(usize::try_from(self.len()) + .map_err(|_| format_err_t!(InvalidArgument, "{}-byte mp4 is too big to send over WebSockets!", + self.len()))?); + let mut b = std::pin::Pin::from(self.get_range(0 .. self.len())); + loop { + use futures::stream::StreamExt; + match b.next().await { + Some(r) => { + let chunk = r + .map_err(failure::Error::from_boxed_compat) + .err_kind(ErrorKind::Unknown)?; + v.extend_from_slice(chunk.bytes()) + }, + None => return Ok(()), + } + } + } +} + impl http_serve::Entity for File { type Data = Chunk; type Error = BoxedError; diff --git a/src/web.rs b/src/web.rs index 554da0c..b50abe1 100644 --- a/src/web.rs +++ b/src/web.rs @@ -29,7 +29,7 @@ // along with this program. If not, see . use base::clock::Clocks; -use base::{ErrorKind, ResultExt, bail_t, strutil}; +use base::{ErrorKind, bail_t, strutil}; use bytes::Bytes; use crate::body::{Body, BoxedError}; use crate::json; @@ -42,8 +42,9 @@ use db::{auth, recording}; use db::dir::SampleFileDir; use failure::{Error, bail, format_err}; use fnv::FnvHashMap; +use futures::sink::SinkExt; use futures::future::{self, Future, TryFutureExt}; -use futures::stream::{Stream, StreamExt, TryStreamExt}; +use futures::stream::StreamExt; use http::{Request, Response, status::StatusCode}; use http_serve; use http::header::{self, HeaderValue}; @@ -59,6 +60,7 @@ use std::ops::Range; use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; +use tokio_tungstenite::tungstenite; use url::form_urlencoded; use uuid::Uuid; @@ -878,11 +880,12 @@ impl Service { } } - fn stream_live_m4s(&self, _req: &Request<::hyper::Body>, caller: Caller, uuid: Uuid, + fn stream_live_m4s(&self, req: Request<::hyper::Body>, caller: Caller, uuid: Uuid, stream_type: db::StreamType) -> ResponseResult { if !caller.permissions.view_video { return Err(plain_response(StatusCode::UNAUTHORIZED, "view_video required")); } + let stream_id; let open_id; let (sub_tx, sub_rx) = futures::channel::mpsc::unbounded(); @@ -904,65 +907,93 @@ impl Service { db.watch_live(stream_id, Box::new(move |l| sub_tx.unbounded_send(l).is_ok())) .expect("stream_id refed by camera"); } - let inner = self.0.clone(); - let body = sub_rx - .map(move |live| -> Result<_, base::Error> { - let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment); - let mut vse_id = None; - { - let db = inner.db.lock(); - let mut rows = 0; - db.list_recordings_by_id(stream_id, live.recording .. live.recording+1, - &mut |r| { - rows += 1; - let vse = db.video_sample_entries_by_id().get(&r.video_sample_entry_id) - .unwrap(); - vse_id = Some(strutil::hex(&vse.sha1)); - builder.append(&db, r, live.off_90k.clone())?; - Ok(()) - }).err_kind(base::ErrorKind::Unknown)?; - if rows != 1 { - bail_t!(Internal, "unable to find {:?}", live); - } - } - let vse_id = vse_id.unwrap(); - use http_serve::Entity; - let mp4 = builder.build(inner.db.clone(), inner.dirs_by_stream_id.clone())?; - let mut hdrs = http::header::HeaderMap::new(); - mp4.add_headers(&mut hdrs); - let mime_type = hdrs.get(http::header::CONTENT_TYPE).unwrap(); - let len = mp4.len(); - use futures::stream::once; - let hdr = format!( - "--B\r\n\ - Content-Length: {}\r\n\ - Content-Type: {}\r\n\ - X-Recording-Id: {}\r\n\ - X-Time-Range: {}-{}\r\n\ - X-Video-Sample-Entry-Sha1: {}\r\n\r\n", - len, - mime_type.to_str().unwrap(), - live.recording, - live.off_90k.start, - live.off_90k.end, - &vse_id); - let v: Vec> = vec![ - Box::pin(once(futures::future::ok(hdr.into()))), - Pin::from(mp4.get_range(0 .. len)), - Box::pin(once(futures::future::ok("\r\n\r\n".into()))) - ]; - Ok(futures::stream::iter(v).flatten()) - }); - let body = body.map_err::(|e| Box::new(e.compat())); - let _: &dyn Stream> = &body; - let body = body.try_flatten(); - let body: crate::body::BodyStream = Box::new(body); - let body: Body = body.into(); - Ok(http::Response::builder() - .header("X-Open-Id", open_id.to_string()) - .header("Content-Type", "multipart/mixed; boundary=B") - .body(body) - .unwrap()) + + let (parts, body) = req.into_parts(); + let req = Request::from_parts(parts, ()); + let response = tungstenite::handshake::server::create_response(&req) + .map_err(|e| bad_req(e.to_string()))?; + let (parts, ()) = response.into_parts(); + + tokio::spawn(self.clone().stream_live_m4s_ws(stream_id, open_id, body, sub_rx)); + + Ok(Response::from_parts(parts, Body::from(""))) + } + + async fn stream_live_m4s_ws( + self, stream_id: i32, open_id: u32, body: hyper::Body, + mut sub_rx: futures::channel::mpsc::UnboundedReceiver) { + let upgraded = match body.on_upgrade().await { + Ok(u) => u, + Err(e) => { + warn!("Unable to upgrade stream to websocket: {}", e); + return; + }, + }; + let mut ws = tokio_tungstenite::WebSocketStream::from_raw_socket( + upgraded, + tungstenite::protocol::Role::Server, + None, + ).await; + loop { + let live = match sub_rx.next().await { + Some(l) => l, + None => return, + }; + if let Err(e) = self.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live).await { + info!("Dropping WebSocket after error: {}", e); + return; + } + } + } + + async fn stream_live_m4s_chunk( + &self, open_id: u32, stream_id: i32, + ws: &mut tokio_tungstenite::WebSocketStream, + live: db::LiveSegment) -> Result<(), Error> { + let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment); + let mut vse_id = None; + let mut start = None; + { + let db = self.0.db.lock(); + let mut rows = 0; + db.list_recordings_by_id(stream_id, live.recording .. live.recording+1, &mut |r| { + rows += 1; + let vse = db.video_sample_entries_by_id().get(&r.video_sample_entry_id) + .unwrap(); + vse_id = Some(strutil::hex(&vse.sha1)); + start = Some(r.start); + builder.append(&db, r, live.off_90k.clone())?; + Ok(()) + })?; + if rows != 1 { + bail_t!(Internal, "unable to find {:?}", live); + } + } + let vse_id = vse_id.unwrap(); + let start = start.unwrap(); + use http_serve::Entity; + let mp4 = builder.build(self.0.db.clone(), self.0.dirs_by_stream_id.clone())?; + let mut hdrs = http::header::HeaderMap::new(); + mp4.add_headers(&mut hdrs); + let mime_type = hdrs.get(http::header::CONTENT_TYPE).unwrap(); + let hdr = format!( + "Content-Type: {}\r\n\ + X-Recording-Start: {}\r\n\ + X-Recording-Id: {}.{}\r\n\ + X-Time-Range: {}-{}\r\n\ + X-Video-Sample-Entry-Sha1: {}\r\n\r\n", + mime_type.to_str().unwrap(), + start.0, + open_id, + live.recording, + live.off_90k.start, + live.off_90k.end, + &vse_id); + let mut v = /*Pin::from(*/hdr.into_bytes()/*)*/; + mp4.append_into_vec(&mut v).await?; + //let v = Pin::into_inner(); + ws.send(tungstenite::Message::Binary(v)).await?; + Ok(()) } fn signals(&self, req: Request, caller: Caller) @@ -1023,7 +1054,7 @@ impl Service { mp4::Type::MediaSegment, debug)) }, Path::StreamLiveMp4Segments(uuid, type_) => { - wrap_r(true, self.stream_live_m4s(&req, caller, uuid, type_)) + wrap_r(true, self.stream_live_m4s(req, caller, uuid, type_)) }, Path::NotFound => wrap(true, future::err(not_found("path not understood"))), Path::Login => wrap(true, with_json_body(req).and_then({