diff --git a/Cargo.lock b/Cargo.lock index e74c34e..2a6cb15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -843,7 +843,7 @@ dependencies = [ [[package]] name = "http-serve" version = "0.2.2" -source = "git+https://github.com/scottlamb/http-serve?branch=dir#be4a4039b0bf70c951ee56e2d08d63d48dd5dbb3" +source = "git+https://github.com/scottlamb/http-serve?branch=dir#efde86035aedf6c623c11d1125aa256a3e99e6a2" dependencies = [ "bytes", "flate2", diff --git a/src/cmds/run.rs b/src/cmds/run.rs index 6e3a35c..01e6498 100644 --- a/src/cmds/run.rs +++ b/src/cmds/run.rs @@ -39,7 +39,6 @@ use futures::future::FutureExt; use hyper::service::{make_service_fn, service_fn}; use log::{info, warn}; use std::path::PathBuf; -use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; @@ -188,13 +187,13 @@ pub async fn run(args: &Args) -> Result<(), Error> { let time_zone_name = resolve_zone()?; info!("Resolved timezone: {}", &time_zone_name); - let s = web::Service::new(web::Config { + let svc = Arc::new(web::Service::new(web::Config { db: db.clone(), ui_dir: Some(&args.ui_dir), allow_unauthenticated_permissions: args.allow_unauthenticated_permissions.clone(), trust_forward_hdrs: args.trust_forward_hdrs, time_zone_name, - })?; + })?); // Start a streamer for each stream. let shutdown_streamers = Arc::new(AtomicBool::new(false)); @@ -267,8 +266,8 @@ pub async fn run(args: &Args) -> Result<(), Error> { // Start the web interface. let make_svc = make_service_fn(move |_conn| { futures::future::ok::<_, std::convert::Infallible>(service_fn({ - let mut s = s.clone(); - move |req| Pin::from(s.serve(req)) + let svc = Arc::clone(&svc); + move |req| Arc::clone(&svc).serve(req) })) }); let server = ::hyper::server::Server::bind(&args.http_addr) diff --git a/src/web.rs b/src/web.rs index 7082101..57e0220 100644 --- a/src/web.rs +++ b/src/web.rs @@ -31,7 +31,7 @@ use base::clock::Clocks; use base::{ErrorKind, bail_t, strutil}; use bytes::Bytes; -use crate::body::{Body, BoxedError}; +use crate::body::Body; use crate::json; use crate::mp4; use bytes::{BufMut, BytesMut}; @@ -42,7 +42,6 @@ use db::dir::SampleFileDir; use failure::{Error, bail, format_err}; use fnv::FnvHashMap; use futures::sink::SinkExt; -use futures::future::{self, Either, Future, TryFutureExt, err}; use futures::stream::StreamExt; use http::{Request, Response, status::StatusCode}; use http::header::{self, HeaderValue}; @@ -56,15 +55,11 @@ use nom::sequence::{preceded, tuple}; use std::cmp; use std::net::IpAddr; use std::ops::Range; -use std::pin::Pin; use std::sync::Arc; use tokio_tungstenite::tungstenite; use url::form_urlencoded; use uuid::Uuid; -type BoxedFuture = Box, BoxedError>> + - Sync + Send + 'static>; - #[derive(Debug, Eq, PartialEq)] enum Path { TopLevel, // "/api/" @@ -242,18 +237,6 @@ struct Caller { session: Option, } -impl Caller { -} - -struct ServiceInner { - db: Arc, - ui_dir: Option>, - dirs_by_stream_id: Arc>>, - time_zone_name: String, - allow_unauthenticated_permissions: Option, - trust_forward_hdrs: bool, -} - type ResponseResult = Result, Response>; fn serve_json(req: &Request, out: &T) -> ResponseResult { @@ -266,7 +249,308 @@ fn serve_json(req: &Request, out: &T) -> Ok(resp) } -impl ServiceInner { +fn csrf_matches(csrf: &str, session: auth::SessionHash) -> bool { + let mut b64 = [0u8; 32]; + session.encode_base64(&mut b64); + ::ring::constant_time::verify_slices_are_equal(&b64[..], csrf.as_bytes()).is_ok() +} + +/// Extracts `s` cookie from the HTTP request. Does not authenticate. +fn extract_sid(req: &Request) -> Option { + let hdr = match req.headers().get(header::COOKIE) { + None => return None, + Some(c) => c, + }; + for mut cookie in hdr.as_bytes().split(|&b| b == b';') { + if cookie.starts_with(b" ") { + cookie = &cookie[1..]; + } + if cookie.starts_with(b"s=") { + let s = &cookie[2..]; + if let Ok(s) = auth::RawSessionId::decode_base64(s) { + return Some(s); + } + } + } + None +} + +/// Extracts an `application/json` POST body from a request. +/// +/// This returns the request body as bytes rather than performing +/// deserialization. Keeping the bytes allows the caller to use a `Deserialize` +/// that borrows from the bytes. +async fn extract_json_body(req: &mut Request) -> Result> { + if *req.method() != http::method::Method::POST { + return Err(plain_response(StatusCode::METHOD_NOT_ALLOWED, "POST expected")); + } + let correct_mime_type = match req.headers().get(header::CONTENT_TYPE) { + Some(t) if t == "application/json" => true, + Some(t) if t == "application/json; charset=UTF-8" => true, + _ => false, + }; + if !correct_mime_type { + return Err(bad_req("expected application/json request body")); + } + let b = ::std::mem::replace(req.body_mut(), hyper::Body::empty()); + hyper::body::to_bytes(b).await + .map_err(|e| internal_server_err(format_err!("unable to read request body: {}", e))) +} + +pub struct Config<'a> { + pub db: Arc, + pub ui_dir: Option<&'a std::path::Path>, + pub trust_forward_hdrs: bool, + pub time_zone_name: String, + pub allow_unauthenticated_permissions: Option, +} + +pub struct Service { + db: Arc, + ui_dir: Option>, + dirs_by_stream_id: Arc>>, + time_zone_name: String, + allow_unauthenticated_permissions: Option, + trust_forward_hdrs: bool, +} + +/// Useful HTTP `Cache-Control` values to set on successful (HTTP 200) API responses. +enum CacheControl { + /// For endpoints which have private data that may change from request to request. + PrivateDynamic, + + /// For endpoints which rarely change for a given URL. + /// E.g., a fixed segment of video. The underlying video logically never changes; there may + /// rarely be some software change to the actual bytes (which would result in a new etag) so + /// (unlike the content-hashed static content) it's not entirely immutable. + PrivateStatic, + + None, +} + +impl Service { + pub fn new(config: Config) -> Result { + let mut ui_dir = None; + if let Some(d) = config.ui_dir { + match FsDir::builder().for_path(&d) { + Err(e) => { + warn!("Unable to load --ui-dir={}; will serve no static files: {}", + d.display(), e); + }, + Ok(d) => ui_dir = Some(d), + }; + } + let dirs_by_stream_id = { + let l = config.db.lock(); + let mut d = + FnvHashMap::with_capacity_and_hasher(l.streams_by_id().len(), Default::default()); + for (&id, s) in l.streams_by_id().iter() { + let dir_id = match s.sample_file_dir_id { + Some(d) => d, + None => continue, + }; + d.insert(id, l.sample_file_dirs_by_id() + .get(&dir_id) + .unwrap() + .get()?); + } + Arc::new(d) + }; + + Ok(Service { + db: config.db, + dirs_by_stream_id, + ui_dir, + allow_unauthenticated_permissions: config.allow_unauthenticated_permissions, + trust_forward_hdrs: config.trust_forward_hdrs, + time_zone_name: config.time_zone_name, + }) + } + + fn stream_live_m4s(self: Arc, req: Request<::hyper::Body>, caller: Caller, uuid: Uuid, + stream_type: db::StreamType) -> ResponseResult { + if !caller.permissions.view_video { + return Err(plain_response(StatusCode::UNAUTHORIZED, "view_video required")); + } + + let stream_id; + let open_id; + let (sub_tx, sub_rx) = futures::channel::mpsc::unbounded(); + { + let mut db = self.db.lock(); + open_id = match db.open { + None => return Err(plain_response( + StatusCode::PRECONDITION_FAILED, + "database is read-only; there are no live streams")), + Some(o) => o.id, + }; + let camera = db.get_camera(uuid) + .ok_or_else(|| plain_response(StatusCode::NOT_FOUND, + format!("no such camera {}", uuid)))?; + stream_id = camera.streams[stream_type.index()] + .ok_or_else(|| plain_response(StatusCode::NOT_FOUND, + format!("no such stream {}/{}", uuid, + stream_type)))?; + db.watch_live(stream_id, Box::new(move |l| sub_tx.unbounded_send(l).is_ok())) + .expect("stream_id refed by camera"); + } + + let (parts, body) = req.into_parts(); + let req = Request::from_parts(parts, ()); + let response = tungstenite::handshake::server::create_response(&req) + .map_err(|e| bad_req(e.to_string()))?; + let (parts, ()) = response.into_parts(); + + tokio::spawn(self.stream_live_m4s_ws(stream_id, open_id, body, sub_rx)); + + Ok(Response::from_parts(parts, Body::from(""))) + } + + async fn stream_live_m4s_ws( + self: Arc, stream_id: i32, open_id: u32, body: hyper::Body, + mut sub_rx: futures::channel::mpsc::UnboundedReceiver) { + let upgraded = match body.on_upgrade().await { + Ok(u) => u, + Err(e) => { + warn!("Unable to upgrade stream to websocket: {}", e); + return; + }, + }; + let mut ws = tokio_tungstenite::WebSocketStream::from_raw_socket( + upgraded, + tungstenite::protocol::Role::Server, + None, + ).await; + loop { + let live = match sub_rx.next().await { + Some(l) => l, + None => return, + }; + if let Err(e) = self.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live).await { + info!("Dropping WebSocket after error: {}", e); + return; + } + } + } + + async fn stream_live_m4s_chunk( + &self, open_id: u32, stream_id: i32, + ws: &mut tokio_tungstenite::WebSocketStream, + live: db::LiveSegment) -> Result<(), Error> { + let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment); + let mut vse_id = None; + let mut start = None; + { + let db = self.db.lock(); + let mut rows = 0; + db.list_recordings_by_id(stream_id, live.recording .. live.recording+1, &mut |r| { + rows += 1; + let vse = db.video_sample_entries_by_id().get(&r.video_sample_entry_id) + .unwrap(); + vse_id = Some(strutil::hex(&vse.sha1)); + start = Some(r.start); + builder.append(&db, r, live.off_90k.clone())?; + Ok(()) + })?; + if rows != 1 { + bail_t!(Internal, "unable to find {:?}", live); + } + } + let vse_id = vse_id.unwrap(); + let start = start.unwrap(); + use http_serve::Entity; + let mp4 = builder.build(self.db.clone(), self.dirs_by_stream_id.clone())?; + let mut hdrs = header::HeaderMap::new(); + mp4.add_headers(&mut hdrs); + let mime_type = hdrs.get(header::CONTENT_TYPE).unwrap(); + let hdr = format!( + "Content-Type: {}\r\n\ + X-Recording-Start: {}\r\n\ + X-Recording-Id: {}.{}\r\n\ + X-Time-Range: {}-{}\r\n\ + X-Video-Sample-Entry-Sha1: {}\r\n\r\n", + mime_type.to_str().unwrap(), + start.0, + open_id, + live.recording, + live.off_90k.start, + live.off_90k.end, + &vse_id); + let mut v = /*Pin::from(*/hdr.into_bytes()/*)*/; + mp4.append_into_vec(&mut v).await?; + //let v = Pin::into_inner(); + ws.send(tungstenite::Message::Binary(v)).await?; + Ok(()) + } + + async fn signals(&self, req: Request, caller: Caller) -> ResponseResult { + use http::method::Method; + match *req.method() { + Method::POST => self.post_signals(req, caller).await, + Method::GET | Method::HEAD => self.get_signals(&req), + _ => Err(plain_response(StatusCode::METHOD_NOT_ALLOWED, + "POST, GET, or HEAD expected")), + } + } + + async fn serve_inner(self: Arc, req: Request<::hyper::Body>, p: Path, caller: Caller) + -> ResponseResult { + let (cache, mut response) = match p { + Path::InitSegment(sha1, debug) => { + (CacheControl::PrivateStatic, self.init_segment(sha1, debug, &req)?) + }, + Path::TopLevel => (CacheControl::PrivateDynamic, self.top_level(&req, caller)?), + Path::Request => (CacheControl::PrivateDynamic, self.request(&req)?), + Path::Camera(uuid) => (CacheControl::PrivateDynamic, self.camera(&req, uuid)?), + Path::StreamRecordings(uuid, type_) => { + (CacheControl::PrivateDynamic, self.stream_recordings(&req, uuid, type_)?) + }, + Path::StreamViewMp4(uuid, type_, debug) => { + (CacheControl::PrivateStatic, + self.stream_view_mp4(&req, caller, uuid, type_, mp4::Type::Normal, debug)?) + }, + Path::StreamViewMp4Segment(uuid, type_, debug) => { + (CacheControl::PrivateStatic, + self.stream_view_mp4(&req, caller, uuid, type_, mp4::Type::MediaSegment, debug)?) + }, + Path::StreamLiveMp4Segments(uuid, type_) => { + (CacheControl::PrivateDynamic, self.stream_live_m4s(req, caller, uuid, type_)?) + }, + Path::NotFound => return Err(not_found("path not understood")), + Path::Login => (CacheControl::PrivateDynamic, self.login(req).await?), + Path::Logout => (CacheControl::PrivateDynamic, self.logout(req).await?), + Path::Signals => (CacheControl::PrivateDynamic, self.signals(req, caller).await?), + Path::Static => (CacheControl::None, self.static_file(req).await?) + }; + match cache { + CacheControl::PrivateStatic => { + response.headers_mut().insert(header::CACHE_CONTROL, + HeaderValue::from_static("private, max-age=3600")); + }, + CacheControl::PrivateDynamic => { + response.headers_mut().insert(header::CACHE_CONTROL, + HeaderValue::from_static("private, no-cache")); + }, + CacheControl::None => {}, + } + Ok(response) + } + + pub async fn serve(self: Arc, req: Request<::hyper::Body>) + -> Result, std::convert::Infallible> { + let p = Path::decode(req.uri().path()); + let always_allow_unauthenticated = match p { + Path::NotFound | Path::Request | Path::Login | Path::Logout | Path::Static => true, + _ => false, + }; + debug!("request on: {}: {:?}", req.uri(), p); + let caller = match self.authenticate(&req, always_allow_unauthenticated) { + Ok(c) => c, + Err(e) => return Ok(from_base_error(e)), + }; + Ok(self.serve_inner(req, p, caller).await.unwrap_or_else(|e| e)) + } + fn top_level(&self, req: &Request<::hyper::Body>, caller: Caller) -> ResponseResult { let mut days = false; let mut camera_configs = false; @@ -514,39 +798,31 @@ impl ServiceInner { Ok(http_serve::serve(mp4, req)) } - fn static_file(&self, req: Request) - -> impl Future + 'static { - let dir = match self.ui_dir.clone() { - None => { - return Either::Left( - err(not_found("--ui-dir not configured; no static files available."))) - }, - Some(d) => d, + async fn static_file(&self, req: Request) -> ResponseResult { + let dir = self.ui_dir.clone() + .ok_or_else(|| not_found("--ui-dir not configured; no static files available."))?; + let static_req = match StaticFileRequest::parse(req.uri().path()) { + None => return Err(not_found("static file not found")), + Some(r) => r, }; - Either::Right(async move { - let static_req = match StaticFileRequest::parse(req.uri().path()) { - None => return Err(not_found("static file not found")), - Some(r) => r, - }; - let f = dir.get(static_req.path, req.headers()); - let node = f.await - .map_err(|e| if e.kind() == std::io::ErrorKind::NotFound { - not_found("no such static file") - } else { - internal_server_err(e) - })?; - let mut hdrs = http::HeaderMap::new(); - node.add_encoding_headers(&mut hdrs); - hdrs.insert(header::CACHE_CONTROL, HeaderValue::from_static(if static_req.immutable { - // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#Caching_static_assets - "public, max-age=604800, immutable" + let f = dir.get(static_req.path, req.headers()); + let node = f.await + .map_err(|e| if e.kind() == std::io::ErrorKind::NotFound { + not_found("no such static file") } else { - "public" - })); - hdrs.insert(header::CONTENT_TYPE, HeaderValue::from_static(static_req.mime)); - let e = node.into_file_entity(hdrs).map_err(internal_server_err)?; - Ok(http_serve::serve(e, &req)) - }) + internal_server_err(e) + })?; + let mut hdrs = http::HeaderMap::new(); + node.add_encoding_headers(&mut hdrs); + hdrs.insert(header::CACHE_CONTROL, HeaderValue::from_static(if static_req.immutable { + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#Caching_static_assets + "public, max-age=604800, immutable" + } else { + "public" + })); + hdrs.insert(header::CONTENT_TYPE, HeaderValue::from_static(static_req.mime)); + let e = node.into_file_entity(hdrs).map_err(internal_server_err)?; + Ok(http_serve::serve(e, &req)) } fn authreq(&self, req: &Request<::hyper::Body>) -> auth::Request { @@ -588,10 +864,11 @@ impl ServiceInner { .unwrap_or(false) } - fn login(&self, req: &Request<::hyper::Body>, body: Bytes) -> ResponseResult { - let r: json::LoginRequest = serde_json::from_slice(&body) + async fn login(&self, mut req: Request<::hyper::Body>) -> ResponseResult { + let r = extract_json_body(&mut req).await?; + let r: json::LoginRequest = serde_json::from_slice(&r) .map_err(|e| bad_req(e.to_string()))?; - let authreq = self.authreq(req); + let authreq = self.authreq(&req); let host = req.headers().get(header::HOST).ok_or_else(|| bad_req("missing Host header!"))?; let host = host.as_bytes(); let domain = match memchr(b':', host) { @@ -599,7 +876,7 @@ impl ServiceInner { None => host, }.to_owned(); let mut l = self.db.lock(); - let is_secure = self.is_secure(req); + let is_secure = self.is_secure(&req); let flags = (auth::SessionFlag::HttpOnly as i32) | (auth::SessionFlag::SameSite as i32) | (auth::SessionFlag::SameSiteStrict as i32) | @@ -625,13 +902,14 @@ impl ServiceInner { .body(b""[..].into()).unwrap()) } - fn logout(&self, req: &Request, body: Bytes) -> ResponseResult { - let r: json::LogoutRequest = serde_json::from_slice(&body) + async fn logout(&self, mut req: Request) -> ResponseResult { + let r = extract_json_body(&mut req).await?; + let r: json::LogoutRequest = serde_json::from_slice(&r) .map_err(|e| bad_req(e.to_string()))?; let mut res = Response::new(b""[..].into()); - if let Some(sid) = extract_sid(req) { - let authreq = self.authreq(req); + if let Some(sid) = extract_sid(&req) { + let authreq = self.authreq(&req); let mut l = self.db.lock(); let hash = sid.hash(); let need_revoke = match l.authenticate_session(authreq.clone(), &hash) { @@ -666,12 +944,13 @@ impl ServiceInner { Ok(res) } - fn post_signals(&self, req: &Request, caller: Caller, body: Bytes) - -> ResponseResult { + async fn post_signals(&self, mut req: Request, caller: Caller) + -> ResponseResult { if !caller.permissions.update_signals { return Err(plain_response(StatusCode::UNAUTHORIZED, "update_signals required")); } - let r: json::PostSignalsRequest = serde_json::from_slice(&body) + let r = extract_json_body(&mut req).await?; + let r: json::PostSignalsRequest = serde_json::from_slice(&r) .map_err(|e| bad_req(e.to_string()))?; let mut l = self.db.lock(); let now = recording::Time::new(self.db.clocks().realtime()); @@ -684,7 +963,7 @@ impl ServiceInner { }, }; l.update_signals(start .. end, &r.signal_ids, &r.states).map_err(from_base_error)?; - serve_json(req, &json::PostSignalsResponse { + serve_json(&req, &json::PostSignalsResponse { time_90k: now.0, }) } @@ -754,327 +1033,6 @@ impl ServiceInner { } } -fn csrf_matches(csrf: &str, session: auth::SessionHash) -> bool { - let mut b64 = [0u8; 32]; - session.encode_base64(&mut b64); - ::ring::constant_time::verify_slices_are_equal(&b64[..], csrf.as_bytes()).is_ok() -} - -/// Extracts `s` cookie from the HTTP request. Does not authenticate. -fn extract_sid(req: &Request) -> Option { - let hdr = match req.headers().get(header::COOKIE) { - None => return None, - Some(c) => c, - }; - for mut cookie in hdr.as_bytes().split(|&b| b == b';') { - if cookie.starts_with(b" ") { - cookie = &cookie[1..]; - } - if cookie.starts_with(b"s=") { - let s = &cookie[2..]; - if let Ok(s) = auth::RawSessionId::decode_base64(s) { - return Some(s); - } - } - } - None -} - -/// Returns a future separating the request from its JSON body. -/// -/// If this is not a `POST` or the body's `Content-Type` is not -/// `application/json`, returns an appropriate error response instead. -/// -/// Use with `and_then` to chain logic which consumes the form body. -async fn with_json_body(mut req: Request) - -> Result<(Request, Bytes), Response> { - if *req.method() != http::method::Method::POST { - return Err(plain_response(StatusCode::METHOD_NOT_ALLOWED, "POST expected")); - } - let correct_mime_type = match req.headers().get(header::CONTENT_TYPE) { - Some(t) if t == "application/json" => true, - Some(t) if t == "application/json; charset=UTF-8" => true, - _ => false, - }; - if !correct_mime_type { - return Err(bad_req("expected application/json request body")); - } - let b = ::std::mem::replace(req.body_mut(), hyper::Body::empty()); - match hyper::body::to_bytes(b).await { - Ok(b) => Ok((req, b)), - Err(e) => Err(internal_server_err(format_err!("unable to read request body: {}", e))), - } -} - - -pub struct Config<'a> { - pub db: Arc, - pub ui_dir: Option<&'a std::path::Path>, - pub trust_forward_hdrs: bool, - pub time_zone_name: String, - pub allow_unauthenticated_permissions: Option, -} - -#[derive(Clone)] -pub struct Service(Arc); - -/// Useful HTTP `Cache-Control` values to set on successful (HTTP 200) API responses. -enum CacheControl { - /// For endpoints which have private data that may change from request to request. - PrivateDynamic, - - /// For endpoints which rarely change for a given URL. - /// E.g., a fixed segment of video. The underlying video logically never changes; there may - /// rarely be some software change to the actual bytes (which would result in a new etag) so - /// (unlike the content-hashed static content) it's not entirely immutable. - PrivateStatic, - - None, -} - -impl Service { - pub fn new(config: Config) -> Result { - let mut ui_dir = None; - if let Some(d) = config.ui_dir { - match FsDir::builder().for_path(&d) { - Err(e) => { - warn!("Unable to load --ui-dir={}; will serve no static files: {}", - d.display(), e); - }, - Ok(d) => ui_dir = Some(d), - }; - } - let dirs_by_stream_id = { - let l = config.db.lock(); - let mut d = - FnvHashMap::with_capacity_and_hasher(l.streams_by_id().len(), Default::default()); - for (&id, s) in l.streams_by_id().iter() { - let dir_id = match s.sample_file_dir_id { - Some(d) => d, - None => continue, - }; - d.insert(id, l.sample_file_dirs_by_id() - .get(&dir_id) - .unwrap() - .get()?); - } - Arc::new(d) - }; - - Ok(Service(Arc::new(ServiceInner { - db: config.db, - dirs_by_stream_id, - ui_dir, - allow_unauthenticated_permissions: config.allow_unauthenticated_permissions, - trust_forward_hdrs: config.trust_forward_hdrs, - time_zone_name: config.time_zone_name, - }))) - } - - fn stream_live_m4s(&self, req: Request<::hyper::Body>, caller: Caller, uuid: Uuid, - stream_type: db::StreamType) -> ResponseResult { - if !caller.permissions.view_video { - return Err(plain_response(StatusCode::UNAUTHORIZED, "view_video required")); - } - - let stream_id; - let open_id; - let (sub_tx, sub_rx) = futures::channel::mpsc::unbounded(); - { - let mut db = self.0.db.lock(); - open_id = match db.open { - None => return Err(plain_response( - StatusCode::PRECONDITION_FAILED, - "database is read-only; there are no live streams")), - Some(o) => o.id, - }; - let camera = db.get_camera(uuid) - .ok_or_else(|| plain_response(StatusCode::NOT_FOUND, - format!("no such camera {}", uuid)))?; - stream_id = camera.streams[stream_type.index()] - .ok_or_else(|| plain_response(StatusCode::NOT_FOUND, - format!("no such stream {}/{}", uuid, - stream_type)))?; - db.watch_live(stream_id, Box::new(move |l| sub_tx.unbounded_send(l).is_ok())) - .expect("stream_id refed by camera"); - } - - let (parts, body) = req.into_parts(); - let req = Request::from_parts(parts, ()); - let response = tungstenite::handshake::server::create_response(&req) - .map_err(|e| bad_req(e.to_string()))?; - let (parts, ()) = response.into_parts(); - - tokio::spawn(self.clone().stream_live_m4s_ws(stream_id, open_id, body, sub_rx)); - - Ok(Response::from_parts(parts, Body::from(""))) - } - - async fn stream_live_m4s_ws( - self, stream_id: i32, open_id: u32, body: hyper::Body, - mut sub_rx: futures::channel::mpsc::UnboundedReceiver) { - let upgraded = match body.on_upgrade().await { - Ok(u) => u, - Err(e) => { - warn!("Unable to upgrade stream to websocket: {}", e); - return; - }, - }; - let mut ws = tokio_tungstenite::WebSocketStream::from_raw_socket( - upgraded, - tungstenite::protocol::Role::Server, - None, - ).await; - loop { - let live = match sub_rx.next().await { - Some(l) => l, - None => return, - }; - if let Err(e) = self.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live).await { - info!("Dropping WebSocket after error: {}", e); - return; - } - } - } - - async fn stream_live_m4s_chunk( - &self, open_id: u32, stream_id: i32, - ws: &mut tokio_tungstenite::WebSocketStream, - live: db::LiveSegment) -> Result<(), Error> { - let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment); - let mut vse_id = None; - let mut start = None; - { - let db = self.0.db.lock(); - let mut rows = 0; - db.list_recordings_by_id(stream_id, live.recording .. live.recording+1, &mut |r| { - rows += 1; - let vse = db.video_sample_entries_by_id().get(&r.video_sample_entry_id) - .unwrap(); - vse_id = Some(strutil::hex(&vse.sha1)); - start = Some(r.start); - builder.append(&db, r, live.off_90k.clone())?; - Ok(()) - })?; - if rows != 1 { - bail_t!(Internal, "unable to find {:?}", live); - } - } - let vse_id = vse_id.unwrap(); - let start = start.unwrap(); - use http_serve::Entity; - let mp4 = builder.build(self.0.db.clone(), self.0.dirs_by_stream_id.clone())?; - let mut hdrs = header::HeaderMap::new(); - mp4.add_headers(&mut hdrs); - let mime_type = hdrs.get(header::CONTENT_TYPE).unwrap(); - let hdr = format!( - "Content-Type: {}\r\n\ - X-Recording-Start: {}\r\n\ - X-Recording-Id: {}.{}\r\n\ - X-Time-Range: {}-{}\r\n\ - X-Video-Sample-Entry-Sha1: {}\r\n\r\n", - mime_type.to_str().unwrap(), - start.0, - open_id, - live.recording, - live.off_90k.start, - live.off_90k.end, - &vse_id); - let mut v = /*Pin::from(*/hdr.into_bytes()/*)*/; - mp4.append_into_vec(&mut v).await?; - //let v = Pin::into_inner(); - ws.send(tungstenite::Message::Binary(v)).await?; - Ok(()) - } - - fn signals(&self, req: Request, caller: Caller) - -> Box, Response>> + Send + Sync + 'static> { - use http::method::Method; - match *req.method() { - Method::POST => Box::new(with_json_body(req) - .and_then({ - let s = self.0.clone(); - move |(req, b)| future::ready(s.post_signals(&req, caller, b)) - })), - Method::GET | Method::HEAD => Box::new(future::ready(self.0.get_signals(&req))), - _ => Box::new(future::err(plain_response(StatusCode::METHOD_NOT_ALLOWED, - "POST, GET, or HEAD expected"))), - } - } - - pub fn serve(&mut self, req: Request<::hyper::Body>) -> BoxedFuture { - fn wrap(cache_hdr: CacheControl, r: R) -> BoxedFuture - where R: Future, Response>> + Send + Sync + 'static { - return Box::new(r.or_else(|e| futures::future::ok(e)).map_ok(move |mut r| { - match cache_hdr { - CacheControl::PrivateStatic => { - r.headers_mut().insert(header::CACHE_CONTROL, - HeaderValue::from_static("private, max-age=3600")); - }, - CacheControl::PrivateDynamic => { - r.headers_mut().insert(header::CACHE_CONTROL, - HeaderValue::from_static("private, no-cache")); - }, - CacheControl::None => {}, - } - r - })) - } - - fn wrap_r(cache_hdr: CacheControl, r: ResponseResult) - -> Box, BoxedError>> + Send + Sync + 'static> { - return wrap(cache_hdr, future::ready(r)) - } - - let p = Path::decode(req.uri().path()); - let always_allow_unauthenticated = match p { - Path::NotFound | Path::Request | Path::Login | Path::Logout | Path::Static => true, - _ => false, - }; - debug!("request on: {}: {:?}", req.uri(), p); - let caller = match self.0.authenticate(&req, always_allow_unauthenticated) { - Ok(c) => c, - Err(e) => return Box::new(future::ok(from_base_error(e))), - }; - match p { - Path::InitSegment(sha1, debug) => { - wrap_r(CacheControl::PrivateStatic, self.0.init_segment(sha1, debug, &req)) - }, - Path::TopLevel => wrap_r(CacheControl::PrivateDynamic, self.0.top_level(&req, caller)), - Path::Request => wrap_r(CacheControl::PrivateDynamic, self.0.request(&req)), - Path::Camera(uuid) => wrap_r(CacheControl::PrivateDynamic, self.0.camera(&req, uuid)), - Path::StreamRecordings(uuid, type_) => { - wrap_r(CacheControl::PrivateDynamic, self.0.stream_recordings(&req, uuid, type_)) - }, - Path::StreamViewMp4(uuid, type_, debug) => { - wrap_r(CacheControl::PrivateStatic, - self.0.stream_view_mp4(&req, caller, uuid, type_, mp4::Type::Normal, debug)) - }, - Path::StreamViewMp4Segment(uuid, type_, debug) => { - wrap_r(CacheControl::PrivateStatic, - self.0.stream_view_mp4(&req, caller, uuid, type_, mp4::Type::MediaSegment, - debug)) - }, - Path::StreamLiveMp4Segments(uuid, type_) => { - wrap_r(CacheControl::PrivateDynamic, self.stream_live_m4s(req, caller, uuid, type_)) - }, - Path::NotFound => wrap(CacheControl::PrivateDynamic, - future::err(not_found("path not understood"))), - Path::Login => wrap(CacheControl::PrivateDynamic, with_json_body(req).and_then({ - let s = self.clone(); - move |(req, b)| future::ready(s.0.login(&req, b)) - })), - Path::Logout => wrap(CacheControl::PrivateDynamic, with_json_body(req).and_then({ - let s = self.clone(); - move |(req, b)| future::ready(s.0.logout(&req, b)) - })), - Path::Signals => wrap(CacheControl::PrivateDynamic, - Pin::from(self.signals(req, caller))), - Path::Static => wrap(CacheControl::None, self.0.static_file(req)) - } - } -} - #[derive(Debug, Eq, PartialEq)] struct StaticFileRequest<'a> { path: &'a str, @@ -1123,6 +1081,7 @@ mod tests { use log::info; use std::collections::HashMap; use std::str::FromStr; + use std::sync::Arc; use super::{Segments, StaticFileRequest}; struct Server { @@ -1137,17 +1096,17 @@ mod tests { fn new(allow_unauthenticated_permissions: Option) -> Server { let db = TestDb::new(base::clock::RealClocks {}); let (shutdown_tx, shutdown_rx) = futures::channel::oneshot::channel::<()>(); - let service = super::Service::new(super::Config { + let service = Arc::new(super::Service::new(super::Config { db: db.db.clone(), ui_dir: None, allow_unauthenticated_permissions, trust_forward_hdrs: true, time_zone_name: "".to_owned(), - }).unwrap(); + }).unwrap()); let make_svc = hyper::service::make_service_fn(move |_conn| { futures::future::ok::<_, std::convert::Infallible>(hyper::service::service_fn({ - let mut s = service.clone(); - move |req| std::pin::Pin::from(s.serve(req)) + let s = Arc::clone(&service); + move |req| Arc::clone(&s).serve(req) })) }); let (tx, rx) = std::sync::mpsc::channel(); @@ -1440,6 +1399,7 @@ mod bench { use db::testutil::{self, TestDb}; use hyper; use lazy_static::lazy_static; + use std::sync::Arc; use uuid::Uuid; struct Server { @@ -1452,17 +1412,17 @@ mod bench { let db = TestDb::new(::base::clock::RealClocks {}); let test_camera_uuid = db.test_camera_uuid; testutil::add_dummy_recordings_to_db(&db.db, 1440); - let service = super::Service::new(super::Config { + let service = Arc::new(super::Service::new(super::Config { db: db.db.clone(), ui_dir: None, allow_unauthenticated_permissions: Some(db::Permissions::default()), trust_forward_hdrs: false, time_zone_name: "".to_owned(), - }).unwrap(); + }).unwrap()); let make_svc = hyper::service::make_service_fn(move |_conn| { futures::future::ok::<_, std::convert::Infallible>(hyper::service::service_fn({ - let mut s = service.clone(); - move |req| std::pin::Pin::from(s.serve(req)) + let s = Arc::clone(&service); + move |req| Arc::clone(&s).serve(req) })) }); let mut rt = tokio::runtime::Runtime::new().unwrap();