more readable async web code
This uses "async fn" throughout rather than a mix of async and the older futures style. And it takes advantage of the "self: Arc<Self>" syntax to avoid having a ServiceInner. It was confusing to have some methods on Service and some on ServiceInner; now that distinction is gone. One downside is there's a little more atomic reference-counting. Before, service_fn essentially took an &Arc<Self>, which means it could call Arc::clone where its use of self actually outlived the future (see stream_live_m4s) but didn't need to otherwise. After, it calls an async fn that takes Arc<Self>. Using &Arc<Self> is apparently possible (as of Rust 1.41) but using that with "async fn" means the returned future is tied to its lifetime. The workaround is to use async blocks as described here: <https://rust-lang.github.io/async-book/03_async_await/01_chapter.html> but that's really ugly: it brings back the explicit Future reference, requires futures::future::Either in some cases, and introduces another level of indenting. I think it's better to just pay the arc costs which are probably negligible, or at least cheaper than the boxing was before. Oh, and I make this compile on Rust 1.40 again as it claimed to. http-serve accidentally used the &Arc<Self> thing which broke this. Update to a freshly-pushed commit which doesn't do that.
This commit is contained in:
@ -843,7 +843,7 @@ dependencies = [
name = "http-serve"
version = "0.2.2"
source = "git+https://github.com/scottlamb/http-serve?branch=dir#be4a4039b0bf70c951ee56e2d08d63d48dd5dbb3"
source = "git+https://github.com/scottlamb/http-serve?branch=dir#efde86035aedf6c623c11d1125aa256a3e99e6a2"
dependencies = [
@ -39,7 +39,6 @@ use futures::future::FutureExt;
use hyper::service::{make_service_fn, service_fn};
use log::{info, warn};
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
@ -188,13 +187,13 @@ pub async fn run(args: &Args) -> Result<(), Error> {
let time_zone_name = resolve_zone()?;
info!("Resolved timezone: {}", &time_zone_name);
let s = web::Service::new(web::Config {
let svc = Arc::new(web::Service::new(web::Config {
db: db.clone(),
ui_dir: Some(&args.ui_dir),
allow_unauthenticated_permissions: args.allow_unauthenticated_permissions.clone(),
trust_forward_hdrs: args.trust_forward_hdrs,
// Start a streamer for each stream.
let shutdown_streamers = Arc::new(AtomicBool::new(false));
@ -267,8 +266,8 @@ pub async fn run(args: &Args) -> Result<(), Error> {
// Start the web interface.
let make_svc = make_service_fn(move |_conn| {
futures::future::ok::<_, std::convert::Infallible>(service_fn({
let mut s = s.clone();
move |req| Pin::from(s.serve(req))
let svc = Arc::clone(&svc);
move |req| Arc::clone(&svc).serve(req)
let server = ::hyper::server::Server::bind(&args.http_addr)
@ -31,7 +31,7 @@
use base::clock::Clocks;
use base::{ErrorKind, bail_t, strutil};
use bytes::Bytes;
use crate::body::{Body, BoxedError};
use crate::body::Body;
use crate::json;
use crate::mp4;
use bytes::{BufMut, BytesMut};
@ -42,7 +42,6 @@ use db::dir::SampleFileDir;
use failure::{Error, bail, format_err};
use fnv::FnvHashMap;
use futures::sink::SinkExt;
use futures::future::{self, Either, Future, TryFutureExt, err};
use futures::stream::StreamExt;
use http::{Request, Response, status::StatusCode};
use http::header::{self, HeaderValue};
@ -56,15 +55,11 @@ use nom::sequence::{preceded, tuple};
use std::cmp;
use std::net::IpAddr;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use tokio_tungstenite::tungstenite;
use url::form_urlencoded;
use uuid::Uuid;
type BoxedFuture = Box<dyn Future<Output = Result<Response<Body>, BoxedError>> +
Sync + Send + 'static>;
#[derive(Debug, Eq, PartialEq)]
enum Path {
TopLevel, // "/api/"
@ -242,18 +237,6 @@ struct Caller {
session: Option<json::Session>,
impl Caller {
struct ServiceInner {
db: Arc<db::Database>,
ui_dir: Option<Arc<FsDir>>,
dirs_by_stream_id: Arc<FnvHashMap<i32, Arc<SampleFileDir>>>,
time_zone_name: String,
allow_unauthenticated_permissions: Option<db::Permissions>,
trust_forward_hdrs: bool,
type ResponseResult = Result<Response<Body>, Response<Body>>;
fn serve_json<T: serde::ser::Serialize>(req: &Request<hyper::Body>, out: &T) -> ResponseResult {
@ -266,7 +249,308 @@ fn serve_json<T: serde::ser::Serialize>(req: &Request<hyper::Body>, out: &T) ->
impl ServiceInner {
fn csrf_matches(csrf: &str, session: auth::SessionHash) -> bool {
let mut b64 = [0u8; 32];
session.encode_base64(&mut b64);
::ring::constant_time::verify_slices_are_equal(&b64[..], csrf.as_bytes()).is_ok()
/// Extracts `s` cookie from the HTTP request. Does not authenticate.
fn extract_sid(req: &Request<hyper::Body>) -> Option<auth::RawSessionId> {
let hdr = match req.headers().get(header::COOKIE) {
None => return None,
Some(c) => c,
for mut cookie in hdr.as_bytes().split(|&b| b == b';') {
if cookie.starts_with(b" ") {
cookie = &cookie[1..];
if cookie.starts_with(b"s=") {
let s = &cookie[2..];
if let Ok(s) = auth::RawSessionId::decode_base64(s) {
return Some(s);
/// Extracts an `application/json` POST body from a request.
/// This returns the request body as bytes rather than performing
/// deserialization. Keeping the bytes allows the caller to use a `Deserialize`
/// that borrows from the bytes.
async fn extract_json_body(req: &mut Request<hyper::Body>) -> Result<Bytes, Response<Body>> {
if *req.method() != http::method::Method::POST {
return Err(plain_response(StatusCode::METHOD_NOT_ALLOWED, "POST expected"));
let correct_mime_type = match req.headers().get(header::CONTENT_TYPE) {
Some(t) if t == "application/json" => true,
Some(t) if t == "application/json; charset=UTF-8" => true,
_ => false,
if !correct_mime_type {
return Err(bad_req("expected application/json request body"));
let b = ::std::mem::replace(req.body_mut(), hyper::Body::empty());
.map_err(|e| internal_server_err(format_err!("unable to read request body: {}", e)))
pub struct Config<'a> {
pub db: Arc<db::Database>,
pub ui_dir: Option<&'a std::path::Path>,
pub trust_forward_hdrs: bool,
pub time_zone_name: String,
pub allow_unauthenticated_permissions: Option<db::Permissions>,
pub struct Service {
db: Arc<db::Database>,
ui_dir: Option<Arc<FsDir>>,
dirs_by_stream_id: Arc<FnvHashMap<i32, Arc<SampleFileDir>>>,
time_zone_name: String,
allow_unauthenticated_permissions: Option<db::Permissions>,
trust_forward_hdrs: bool,
/// Useful HTTP `Cache-Control` values to set on successful (HTTP 200) API responses.
enum CacheControl {
/// For endpoints which have private data that may change from request to request.
/// For endpoints which rarely change for a given URL.
/// E.g., a fixed segment of video. The underlying video logically never changes; there may
/// rarely be some software change to the actual bytes (which would result in a new etag) so
/// (unlike the content-hashed static content) it's not entirely immutable.
impl Service {
pub fn new(config: Config) -> Result<Self, Error> {
let mut ui_dir = None;
if let Some(d) = config.ui_dir {
match FsDir::builder().for_path(&d) {
Err(e) => {
warn!("Unable to load --ui-dir={}; will serve no static files: {}",
d.display(), e);
Ok(d) => ui_dir = Some(d),
let dirs_by_stream_id = {
let l = config.db.lock();
let mut d =
FnvHashMap::with_capacity_and_hasher(l.streams_by_id().len(), Default::default());
for (&id, s) in l.streams_by_id().iter() {
let dir_id = match s.sample_file_dir_id {
Some(d) => d,
None => continue,
d.insert(id, l.sample_file_dirs_by_id()
Ok(Service {
db: config.db,
allow_unauthenticated_permissions: config.allow_unauthenticated_permissions,
trust_forward_hdrs: config.trust_forward_hdrs,
time_zone_name: config.time_zone_name,
fn stream_live_m4s(self: Arc<Self>, req: Request<::hyper::Body>, caller: Caller, uuid: Uuid,
stream_type: db::StreamType) -> ResponseResult {
if !caller.permissions.view_video {
return Err(plain_response(StatusCode::UNAUTHORIZED, "view_video required"));
let stream_id;
let open_id;
let (sub_tx, sub_rx) = futures::channel::mpsc::unbounded();
let mut db = self.db.lock();
open_id = match db.open {
None => return Err(plain_response(
"database is read-only; there are no live streams")),
Some(o) => o.id,
let camera = db.get_camera(uuid)
.ok_or_else(|| plain_response(StatusCode::NOT_FOUND,
format!("no such camera {}", uuid)))?;
stream_id = camera.streams[stream_type.index()]
.ok_or_else(|| plain_response(StatusCode::NOT_FOUND,
format!("no such stream {}/{}", uuid,
db.watch_live(stream_id, Box::new(move |l| sub_tx.unbounded_send(l).is_ok()))
.expect("stream_id refed by camera");
let (parts, body) = req.into_parts();
let req = Request::from_parts(parts, ());
let response = tungstenite::handshake::server::create_response(&req)
.map_err(|e| bad_req(e.to_string()))?;
let (parts, ()) = response.into_parts();
tokio::spawn(self.stream_live_m4s_ws(stream_id, open_id, body, sub_rx));
Ok(Response::from_parts(parts, Body::from("")))
async fn stream_live_m4s_ws(
self: Arc<Self>, stream_id: i32, open_id: u32, body: hyper::Body,
mut sub_rx: futures::channel::mpsc::UnboundedReceiver<db::LiveSegment>) {
let upgraded = match body.on_upgrade().await {
Ok(u) => u,
Err(e) => {
warn!("Unable to upgrade stream to websocket: {}", e);
let mut ws = tokio_tungstenite::WebSocketStream::from_raw_socket(
loop {
let live = match sub_rx.next().await {
Some(l) => l,
None => return,
if let Err(e) = self.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live).await {
info!("Dropping WebSocket after error: {}", e);
async fn stream_live_m4s_chunk(
&self, open_id: u32, stream_id: i32,
ws: &mut tokio_tungstenite::WebSocketStream<hyper::upgrade::Upgraded>,
live: db::LiveSegment) -> Result<(), Error> {
let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment);
let mut vse_id = None;
let mut start = None;
let db = self.db.lock();
let mut rows = 0;
db.list_recordings_by_id(stream_id, live.recording .. live.recording+1, &mut |r| {
rows += 1;
let vse = db.video_sample_entries_by_id().get(&r.video_sample_entry_id)
vse_id = Some(strutil::hex(&vse.sha1));
start = Some(r.start);
builder.append(&db, r, live.off_90k.clone())?;
if rows != 1 {
bail_t!(Internal, "unable to find {:?}", live);
let vse_id = vse_id.unwrap();
let start = start.unwrap();
use http_serve::Entity;
let mp4 = builder.build(self.db.clone(), self.dirs_by_stream_id.clone())?;
let mut hdrs = header::HeaderMap::new();
mp4.add_headers(&mut hdrs);
let mime_type = hdrs.get(header::CONTENT_TYPE).unwrap();
let hdr = format!(
"Content-Type: {}\r\n\
X-Recording-Start: {}\r\n\
X-Recording-Id: {}.{}\r\n\
X-Time-Range: {}-{}\r\n\
X-Video-Sample-Entry-Sha1: {}\r\n\r\n",
let mut v = /*Pin::from(*/hdr.into_bytes()/*)*/;
mp4.append_into_vec(&mut v).await?;
//let v = Pin::into_inner();
async fn signals(&self, req: Request<hyper::Body>, caller: Caller) -> ResponseResult {
use http::method::Method;
match *req.method() {
Method::POST => self.post_signals(req, caller).await,
Method::GET | Method::HEAD => self.get_signals(&req),
_ => Err(plain_response(StatusCode::METHOD_NOT_ALLOWED,
"POST, GET, or HEAD expected")),
async fn serve_inner(self: Arc<Self>, req: Request<::hyper::Body>, p: Path, caller: Caller)
-> ResponseResult {
let (cache, mut response) = match p {
Path::InitSegment(sha1, debug) => {
(CacheControl::PrivateStatic, self.init_segment(sha1, debug, &req)?)
Path::TopLevel => (CacheControl::PrivateDynamic, self.top_level(&req, caller)?),
Path::Request => (CacheControl::PrivateDynamic, self.request(&req)?),
Path::Camera(uuid) => (CacheControl::PrivateDynamic, self.camera(&req, uuid)?),
Path::StreamRecordings(uuid, type_) => {
(CacheControl::PrivateDynamic, self.stream_recordings(&req, uuid, type_)?)
Path::StreamViewMp4(uuid, type_, debug) => {
self.stream_view_mp4(&req, caller, uuid, type_, mp4::Type::Normal, debug)?)
Path::StreamViewMp4Segment(uuid, type_, debug) => {
self.stream_view_mp4(&req, caller, uuid, type_, mp4::Type::MediaSegment, debug)?)
Path::StreamLiveMp4Segments(uuid, type_) => {
(CacheControl::PrivateDynamic, self.stream_live_m4s(req, caller, uuid, type_)?)
Path::NotFound => return Err(not_found("path not understood")),
Path::Login => (CacheControl::PrivateDynamic, self.login(req).await?),
Path::Logout => (CacheControl::PrivateDynamic, self.logout(req).await?),
Path::Signals => (CacheControl::PrivateDynamic, self.signals(req, caller).await?),
Path::Static => (CacheControl::None, self.static_file(req).await?)
match cache {
CacheControl::PrivateStatic => {
HeaderValue::from_static("private, max-age=3600"));
CacheControl::PrivateDynamic => {
HeaderValue::from_static("private, no-cache"));
CacheControl::None => {},
pub async fn serve(self: Arc<Self>, req: Request<::hyper::Body>)
-> Result<Response<Body>, std::convert::Infallible> {
let p = Path::decode(req.uri().path());
let always_allow_unauthenticated = match p {
Path::NotFound | Path::Request | Path::Login | Path::Logout | Path::Static => true,
_ => false,
debug!("request on: {}: {:?}", req.uri(), p);
let caller = match self.authenticate(&req, always_allow_unauthenticated) {
Ok(c) => c,
Err(e) => return Ok(from_base_error(e)),
Ok(self.serve_inner(req, p, caller).await.unwrap_or_else(|e| e))
fn top_level(&self, req: &Request<::hyper::Body>, caller: Caller) -> ResponseResult {
let mut days = false;
let mut camera_configs = false;
@ -514,39 +798,31 @@ impl ServiceInner {
Ok(http_serve::serve(mp4, req))
fn static_file(&self, req: Request<hyper::Body>)
-> impl Future<Output = ResponseResult> + 'static {
let dir = match self.ui_dir.clone() {
None => {
return Either::Left(
err(not_found("--ui-dir not configured; no static files available.")))
Some(d) => d,
async fn static_file(&self, req: Request<hyper::Body>) -> ResponseResult {
let dir = self.ui_dir.clone()
.ok_or_else(|| not_found("--ui-dir not configured; no static files available."))?;
let static_req = match StaticFileRequest::parse(req.uri().path()) {
None => return Err(not_found("static file not found")),
Some(r) => r,
Either::Right(async move {
let static_req = match StaticFileRequest::parse(req.uri().path()) {
None => return Err(not_found("static file not found")),
Some(r) => r,
let f = dir.get(static_req.path, req.headers());
let node = f.await
.map_err(|e| if e.kind() == std::io::ErrorKind::NotFound {
not_found("no such static file")
} else {
let mut hdrs = http::HeaderMap::new();
node.add_encoding_headers(&mut hdrs);
hdrs.insert(header::CACHE_CONTROL, HeaderValue::from_static(if static_req.immutable {
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#Caching_static_assets
"public, max-age=604800, immutable"
let f = dir.get(static_req.path, req.headers());
let node = f.await
.map_err(|e| if e.kind() == std::io::ErrorKind::NotFound {
not_found("no such static file")
} else {
hdrs.insert(header::CONTENT_TYPE, HeaderValue::from_static(static_req.mime));
let e = node.into_file_entity(hdrs).map_err(internal_server_err)?;
Ok(http_serve::serve(e, &req))
let mut hdrs = http::HeaderMap::new();
node.add_encoding_headers(&mut hdrs);
hdrs.insert(header::CACHE_CONTROL, HeaderValue::from_static(if static_req.immutable {
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#Caching_static_assets
"public, max-age=604800, immutable"
} else {
hdrs.insert(header::CONTENT_TYPE, HeaderValue::from_static(static_req.mime));
let e = node.into_file_entity(hdrs).map_err(internal_server_err)?;
Ok(http_serve::serve(e, &req))
fn authreq(&self, req: &Request<::hyper::Body>) -> auth::Request {
@ -588,10 +864,11 @@ impl ServiceInner {
fn login(&self, req: &Request<::hyper::Body>, body: Bytes) -> ResponseResult {
let r: json::LoginRequest = serde_json::from_slice(&body)
async fn login(&self, mut req: Request<::hyper::Body>) -> ResponseResult {
let r = extract_json_body(&mut req).await?;
let r: json::LoginRequest = serde_json::from_slice(&r)
.map_err(|e| bad_req(e.to_string()))?;
let authreq = self.authreq(req);
let authreq = self.authreq(&req);
let host = req.headers().get(header::HOST).ok_or_else(|| bad_req("missing Host header!"))?;
let host = host.as_bytes();
let domain = match memchr(b':', host) {
@ -599,7 +876,7 @@ impl ServiceInner {
None => host,
let mut l = self.db.lock();
let is_secure = self.is_secure(req);
let is_secure = self.is_secure(&req);
let flags = (auth::SessionFlag::HttpOnly as i32) |
(auth::SessionFlag::SameSite as i32) |
(auth::SessionFlag::SameSiteStrict as i32) |
@ -625,13 +902,14 @@ impl ServiceInner {
fn logout(&self, req: &Request<hyper::Body>, body: Bytes) -> ResponseResult {
let r: json::LogoutRequest = serde_json::from_slice(&body)
async fn logout(&self, mut req: Request<hyper::Body>) -> ResponseResult {
let r = extract_json_body(&mut req).await?;
let r: json::LogoutRequest = serde_json::from_slice(&r)
.map_err(|e| bad_req(e.to_string()))?;
let mut res = Response::new(b""[..].into());
if let Some(sid) = extract_sid(req) {
let authreq = self.authreq(req);
if let Some(sid) = extract_sid(&req) {
let authreq = self.authreq(&req);
let mut l = self.db.lock();
let hash = sid.hash();
let need_revoke = match l.authenticate_session(authreq.clone(), &hash) {
@ -666,12 +944,13 @@ impl ServiceInner {
fn post_signals(&self, req: &Request<hyper::Body>, caller: Caller, body: Bytes)
-> ResponseResult {
async fn post_signals(&self, mut req: Request<hyper::Body>, caller: Caller)
-> ResponseResult {
if !caller.permissions.update_signals {
return Err(plain_response(StatusCode::UNAUTHORIZED, "update_signals required"));
let r: json::PostSignalsRequest = serde_json::from_slice(&body)
let r = extract_json_body(&mut req).await?;
let r: json::PostSignalsRequest = serde_json::from_slice(&r)
.map_err(|e| bad_req(e.to_string()))?;
let mut l = self.db.lock();
let now = recording::Time::new(self.db.clocks().realtime());
@ -684,7 +963,7 @@ impl ServiceInner {
l.update_signals(start .. end, &r.signal_ids, &r.states).map_err(from_base_error)?;
serve_json(req, &json::PostSignalsResponse {
serve_json(&req, &json::PostSignalsResponse {
time_90k: now.0,
@ -754,327 +1033,6 @@ impl ServiceInner {
fn csrf_matches(csrf: &str, session: auth::SessionHash) -> bool {
let mut b64 = [0u8; 32];
session.encode_base64(&mut b64);
::ring::constant_time::verify_slices_are_equal(&b64[..], csrf.as_bytes()).is_ok()
/// Extracts `s` cookie from the HTTP request. Does not authenticate.
fn extract_sid(req: &Request<hyper::Body>) -> Option<auth::RawSessionId> {
let hdr = match req.headers().get(header::COOKIE) {
None => return None,
Some(c) => c,
for mut cookie in hdr.as_bytes().split(|&b| b == b';') {
if cookie.starts_with(b" ") {
cookie = &cookie[1..];
if cookie.starts_with(b"s=") {
let s = &cookie[2..];
if let Ok(s) = auth::RawSessionId::decode_base64(s) {
return Some(s);
/// Returns a future separating the request from its JSON body.
/// If this is not a `POST` or the body's `Content-Type` is not
/// `application/json`, returns an appropriate error response instead.
/// Use with `and_then` to chain logic which consumes the form body.
async fn with_json_body(mut req: Request<hyper::Body>)
-> Result<(Request<hyper::Body>, Bytes), Response<Body>> {
if *req.method() != http::method::Method::POST {
return Err(plain_response(StatusCode::METHOD_NOT_ALLOWED, "POST expected"));
let correct_mime_type = match req.headers().get(header::CONTENT_TYPE) {
Some(t) if t == "application/json" => true,
Some(t) if t == "application/json; charset=UTF-8" => true,
_ => false,
if !correct_mime_type {
return Err(bad_req("expected application/json request body"));
let b = ::std::mem::replace(req.body_mut(), hyper::Body::empty());
match hyper::body::to_bytes(b).await {
Ok(b) => Ok((req, b)),
Err(e) => Err(internal_server_err(format_err!("unable to read request body: {}", e))),
pub struct Config<'a> {
pub db: Arc<db::Database>,
pub ui_dir: Option<&'a std::path::Path>,
pub trust_forward_hdrs: bool,
pub time_zone_name: String,
pub allow_unauthenticated_permissions: Option<db::Permissions>,
pub struct Service(Arc<ServiceInner>);
/// Useful HTTP `Cache-Control` values to set on successful (HTTP 200) API responses.
enum CacheControl {
/// For endpoints which have private data that may change from request to request.
/// For endpoints which rarely change for a given URL.
/// E.g., a fixed segment of video. The underlying video logically never changes; there may
/// rarely be some software change to the actual bytes (which would result in a new etag) so
/// (unlike the content-hashed static content) it's not entirely immutable.
impl Service {
pub fn new(config: Config) -> Result<Self, Error> {
let mut ui_dir = None;
if let Some(d) = config.ui_dir {
match FsDir::builder().for_path(&d) {
Err(e) => {
warn!("Unable to load --ui-dir={}; will serve no static files: {}",
d.display(), e);
Ok(d) => ui_dir = Some(d),
let dirs_by_stream_id = {
let l = config.db.lock();
let mut d =
FnvHashMap::with_capacity_and_hasher(l.streams_by_id().len(), Default::default());
for (&id, s) in l.streams_by_id().iter() {
let dir_id = match s.sample_file_dir_id {
Some(d) => d,
None => continue,
d.insert(id, l.sample_file_dirs_by_id()
Ok(Service(Arc::new(ServiceInner {
db: config.db,
allow_unauthenticated_permissions: config.allow_unauthenticated_permissions,
trust_forward_hdrs: config.trust_forward_hdrs,
time_zone_name: config.time_zone_name,
fn stream_live_m4s(&self, req: Request<::hyper::Body>, caller: Caller, uuid: Uuid,
stream_type: db::StreamType) -> ResponseResult {
if !caller.permissions.view_video {
return Err(plain_response(StatusCode::UNAUTHORIZED, "view_video required"));
let stream_id;
let open_id;
let (sub_tx, sub_rx) = futures::channel::mpsc::unbounded();
let mut db = self.0.db.lock();
open_id = match db.open {
None => return Err(plain_response(
"database is read-only; there are no live streams")),
Some(o) => o.id,
let camera = db.get_camera(uuid)
.ok_or_else(|| plain_response(StatusCode::NOT_FOUND,
format!("no such camera {}", uuid)))?;
stream_id = camera.streams[stream_type.index()]
.ok_or_else(|| plain_response(StatusCode::NOT_FOUND,
format!("no such stream {}/{}", uuid,
db.watch_live(stream_id, Box::new(move |l| sub_tx.unbounded_send(l).is_ok()))
.expect("stream_id refed by camera");
let (parts, body) = req.into_parts();
let req = Request::from_parts(parts, ());
let response = tungstenite::handshake::server::create_response(&req)
.map_err(|e| bad_req(e.to_string()))?;
let (parts, ()) = response.into_parts();
tokio::spawn(self.clone().stream_live_m4s_ws(stream_id, open_id, body, sub_rx));
Ok(Response::from_parts(parts, Body::from("")))
async fn stream_live_m4s_ws(
self, stream_id: i32, open_id: u32, body: hyper::Body,
mut sub_rx: futures::channel::mpsc::UnboundedReceiver<db::LiveSegment>) {
let upgraded = match body.on_upgrade().await {
Ok(u) => u,
Err(e) => {
warn!("Unable to upgrade stream to websocket: {}", e);
let mut ws = tokio_tungstenite::WebSocketStream::from_raw_socket(
loop {
let live = match sub_rx.next().await {
Some(l) => l,
None => return,
if let Err(e) = self.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live).await {
info!("Dropping WebSocket after error: {}", e);
async fn stream_live_m4s_chunk(
&self, open_id: u32, stream_id: i32,
ws: &mut tokio_tungstenite::WebSocketStream<hyper::upgrade::Upgraded>,
live: db::LiveSegment) -> Result<(), Error> {
let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment);
let mut vse_id = None;
let mut start = None;
let db = self.0.db.lock();
let mut rows = 0;
db.list_recordings_by_id(stream_id, live.recording .. live.recording+1, &mut |r| {
rows += 1;
let vse = db.video_sample_entries_by_id().get(&r.video_sample_entry_id)
vse_id = Some(strutil::hex(&vse.sha1));
start = Some(r.start);
builder.append(&db, r, live.off_90k.clone())?;
if rows != 1 {
bail_t!(Internal, "unable to find {:?}", live);
let vse_id = vse_id.unwrap();
let start = start.unwrap();
use http_serve::Entity;
let mp4 = builder.build(self.0.db.clone(), self.0.dirs_by_stream_id.clone())?;
let mut hdrs = header::HeaderMap::new();
mp4.add_headers(&mut hdrs);
let mime_type = hdrs.get(header::CONTENT_TYPE).unwrap();
let hdr = format!(
"Content-Type: {}\r\n\
X-Recording-Start: {}\r\n\
X-Recording-Id: {}.{}\r\n\
X-Time-Range: {}-{}\r\n\
X-Video-Sample-Entry-Sha1: {}\r\n\r\n",
let mut v = /*Pin::from(*/hdr.into_bytes()/*)*/;
mp4.append_into_vec(&mut v).await?;
//let v = Pin::into_inner();
fn signals(&self, req: Request<hyper::Body>, caller: Caller)
-> Box<dyn Future<Output = Result<Response<Body>, Response<Body>>> + Send + Sync + 'static> {
use http::method::Method;
match *req.method() {
Method::POST => Box::new(with_json_body(req)
let s = self.0.clone();
move |(req, b)| future::ready(s.post_signals(&req, caller, b))
Method::GET | Method::HEAD => Box::new(future::ready(self.0.get_signals(&req))),
_ => Box::new(future::err(plain_response(StatusCode::METHOD_NOT_ALLOWED,
"POST, GET, or HEAD expected"))),
pub fn serve(&mut self, req: Request<::hyper::Body>) -> BoxedFuture {
fn wrap<R>(cache_hdr: CacheControl, r: R) -> BoxedFuture
where R: Future<Output = Result<Response<Body>, Response<Body>>> + Send + Sync + 'static {
return Box::new(r.or_else(|e| futures::future::ok(e)).map_ok(move |mut r| {
match cache_hdr {
CacheControl::PrivateStatic => {
HeaderValue::from_static("private, max-age=3600"));
CacheControl::PrivateDynamic => {
HeaderValue::from_static("private, no-cache"));
CacheControl::None => {},
fn wrap_r(cache_hdr: CacheControl, r: ResponseResult)
-> Box<dyn Future<Output = Result<Response<Body>, BoxedError>> + Send + Sync + 'static> {
return wrap(cache_hdr, future::ready(r))
let p = Path::decode(req.uri().path());
let always_allow_unauthenticated = match p {
Path::NotFound | Path::Request | Path::Login | Path::Logout | Path::Static => true,
_ => false,
debug!("request on: {}: {:?}", req.uri(), p);
let caller = match self.0.authenticate(&req, always_allow_unauthenticated) {
Ok(c) => c,
Err(e) => return Box::new(future::ok(from_base_error(e))),
match p {
Path::InitSegment(sha1, debug) => {
wrap_r(CacheControl::PrivateStatic, self.0.init_segment(sha1, debug, &req))
Path::TopLevel => wrap_r(CacheControl::PrivateDynamic, self.0.top_level(&req, caller)),
Path::Request => wrap_r(CacheControl::PrivateDynamic, self.0.request(&req)),
Path::Camera(uuid) => wrap_r(CacheControl::PrivateDynamic, self.0.camera(&req, uuid)),
Path::StreamRecordings(uuid, type_) => {
wrap_r(CacheControl::PrivateDynamic, self.0.stream_recordings(&req, uuid, type_))
Path::StreamViewMp4(uuid, type_, debug) => {
self.0.stream_view_mp4(&req, caller, uuid, type_, mp4::Type::Normal, debug))
Path::StreamViewMp4Segment(uuid, type_, debug) => {
self.0.stream_view_mp4(&req, caller, uuid, type_, mp4::Type::MediaSegment,
Path::StreamLiveMp4Segments(uuid, type_) => {
wrap_r(CacheControl::PrivateDynamic, self.stream_live_m4s(req, caller, uuid, type_))
Path::NotFound => wrap(CacheControl::PrivateDynamic,
future::err(not_found("path not understood"))),
Path::Login => wrap(CacheControl::PrivateDynamic, with_json_body(req).and_then({
let s = self.clone();
move |(req, b)| future::ready(s.0.login(&req, b))
Path::Logout => wrap(CacheControl::PrivateDynamic, with_json_body(req).and_then({
let s = self.clone();
move |(req, b)| future::ready(s.0.logout(&req, b))
Path::Signals => wrap(CacheControl::PrivateDynamic,
Pin::from(self.signals(req, caller))),
Path::Static => wrap(CacheControl::None, self.0.static_file(req))
#[derive(Debug, Eq, PartialEq)]
struct StaticFileRequest<'a> {
path: &'a str,
@ -1123,6 +1081,7 @@ mod tests {
use log::info;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use super::{Segments, StaticFileRequest};
struct Server {
@ -1137,17 +1096,17 @@ mod tests {
fn new(allow_unauthenticated_permissions: Option<db::Permissions>) -> Server {
let db = TestDb::new(base::clock::RealClocks {});
let (shutdown_tx, shutdown_rx) = futures::channel::oneshot::channel::<()>();
let service = super::Service::new(super::Config {
let service = Arc::new(super::Service::new(super::Config {
db: db.db.clone(),
ui_dir: None,
trust_forward_hdrs: true,
time_zone_name: "".to_owned(),
let make_svc = hyper::service::make_service_fn(move |_conn| {
futures::future::ok::<_, std::convert::Infallible>(hyper::service::service_fn({
let mut s = service.clone();
move |req| std::pin::Pin::from(s.serve(req))
let s = Arc::clone(&service);
move |req| Arc::clone(&s).serve(req)
let (tx, rx) = std::sync::mpsc::channel();
@ -1440,6 +1399,7 @@ mod bench {
use db::testutil::{self, TestDb};
use hyper;
use lazy_static::lazy_static;
use std::sync::Arc;
use uuid::Uuid;
struct Server {
@ -1452,17 +1412,17 @@ mod bench {
let db = TestDb::new(::base::clock::RealClocks {});
let test_camera_uuid = db.test_camera_uuid;
testutil::add_dummy_recordings_to_db(&db.db, 1440);
let service = super::Service::new(super::Config {
let service = Arc::new(super::Service::new(super::Config {
db: db.db.clone(),
ui_dir: None,
allow_unauthenticated_permissions: Some(db::Permissions::default()),
trust_forward_hdrs: false,
time_zone_name: "".to_owned(),
let make_svc = hyper::service::make_service_fn(move |_conn| {
futures::future::ok::<_, std::convert::Infallible>(hyper::service::service_fn({
let mut s = service.clone();
move |req| std::pin::Pin::from(s.serve(req))
let s = Arc::clone(&service);
move |req| Arc::clone(&s).serve(req)
let mut rt = tokio::runtime::Runtime::new().unwrap();
