mirror of
https://github.com/scottlamb/moonfire-nvr.git
synced 2025-11-25 20:16:11 -05:00
upgrade to async hyper
serve_generated_bytes is >3X faster. One caveat is that the reactor thread may stall when reading from the memory-mapped slice. Moonfire NVR is basically a single-user program, so that may not be so bad, but we'll see.
This commit is contained in:
342
src/web.rs
342
src/web.rs
@@ -35,15 +35,20 @@ use core::str::FromStr;
|
||||
use db;
|
||||
use dir::SampleFileDir;
|
||||
use error::Error;
|
||||
use futures::Stream;
|
||||
use futures::{future, stream};
|
||||
use json;
|
||||
use http_entity;
|
||||
use hyper::{header,server,status};
|
||||
use hyper::uri::RequestUri;
|
||||
use hyper::{header, status};
|
||||
use hyper::server::{self, Request, Response};
|
||||
use mime;
|
||||
use mp4;
|
||||
use parking_lot::MutexGuard;
|
||||
use recording;
|
||||
use reffers::ARefs;
|
||||
use regex::Regex;
|
||||
use serde_json;
|
||||
use slices;
|
||||
use std::cmp;
|
||||
use std::fmt;
|
||||
use std::io::Write;
|
||||
@@ -75,17 +80,6 @@ enum Path {
|
||||
NotFound,
|
||||
}
|
||||
|
||||
fn get_path_and_query(uri: &RequestUri) -> (&str, &str) {
|
||||
match *uri {
|
||||
RequestUri::AbsolutePath(ref both) => match both.find('?') {
|
||||
Some(split) => (&both[..split], &both[split+1..]),
|
||||
None => (both, ""),
|
||||
},
|
||||
RequestUri::AbsoluteUri(ref u) => (u.path(), u.query().unwrap_or("")),
|
||||
_ => ("", ""),
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_path(path: &str) -> Path {
|
||||
if path == "/" {
|
||||
return Path::CamerasList;
|
||||
@@ -116,8 +110,8 @@ fn decode_path(path: &str) -> Path {
|
||||
}
|
||||
}
|
||||
|
||||
fn is_json(req: &server::Request) -> bool {
|
||||
if let Some(accept) = req.headers.get::<header::Accept>() {
|
||||
fn is_json(req: &Request) -> bool {
|
||||
if let Some(accept) = req.headers().get::<header::Accept>() {
|
||||
return accept.len() == 1 && accept[0].item == *JSON &&
|
||||
accept[0].quality == header::Quality(1000);
|
||||
}
|
||||
@@ -182,11 +176,6 @@ impl fmt::Display for HumanizedTimestamp {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Handler {
|
||||
db: Arc<db::Database>,
|
||||
dir: Arc<SampleFileDir>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
struct Segments {
|
||||
ids: Range<i32>,
|
||||
@@ -227,33 +216,40 @@ impl Segments {
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler {
|
||||
pub struct Service {
|
||||
db: Arc<db::Database>,
|
||||
dir: Arc<SampleFileDir>,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
pub fn new(db: Arc<db::Database>, dir: Arc<SampleFileDir>) -> Self {
|
||||
Handler{db: db, dir: dir}
|
||||
Service{db: db, dir: dir}
|
||||
}
|
||||
|
||||
fn not_found(&self, mut res: server::Response) -> Result<(), Error> {
|
||||
*res.status_mut() = status::StatusCode::NotFound;
|
||||
res.send(b"not found")?;
|
||||
Ok(())
|
||||
fn not_found(&self) -> Result<Response<slices::Body>, Error> {
|
||||
Ok(Response::new()
|
||||
.with_status(status::StatusCode::NotFound)
|
||||
.with_header(header::ContentType(mime!(Text/Plain)))
|
||||
.with_body(stream::once(Ok(ARefs::new(&b"not found"[..]))).boxed()))
|
||||
}
|
||||
|
||||
fn list_cameras(&self, req: &server::Request, mut res: server::Response) -> Result<(), Error> {
|
||||
fn list_cameras(&self, req: &Request) -> Result<Response<slices::Body>, Error> {
|
||||
let json = is_json(req);
|
||||
let buf = {
|
||||
let db = self.db.lock();
|
||||
if json {
|
||||
serde_json::to_vec(&json::ListCameras{cameras: db.cameras_by_id()})?
|
||||
} else {
|
||||
self.list_cameras_html(&db)?
|
||||
self.list_cameras_html(db)?
|
||||
}
|
||||
};
|
||||
res.headers_mut().set(header::ContentType(if json { JSON.clone() } else { HTML.clone() }));
|
||||
res.send(&buf)?;
|
||||
Ok(())
|
||||
Ok(Response::new()
|
||||
.with_header(header::ContentType(if json { JSON.clone() } else { HTML.clone() }))
|
||||
.with_header(header::ContentLength(buf.len() as u64))
|
||||
.with_body(stream::once(Ok(ARefs::new(buf))).boxed()))
|
||||
}
|
||||
|
||||
fn list_cameras_html(&self, db: &db::LockedDatabase) -> Result<Vec<u8>, Error> {
|
||||
fn list_cameras_html(&self, db: MutexGuard<db::LockedDatabase>) -> Result<Vec<u8>, Error> {
|
||||
let mut buf = Vec::new();
|
||||
buf.extend_from_slice(b"\
|
||||
<!DOCTYPE html>\n\
|
||||
@@ -287,8 +283,8 @@ impl Handler {
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
fn camera(&self, uuid: Uuid, query: &str, req: &server::Request, mut res: server::Response)
|
||||
-> Result<(), Error> {
|
||||
fn camera(&self, uuid: Uuid, query: Option<&str>, req: &Request)
|
||||
-> Result<Response<slices::Body>, Error> {
|
||||
let json = is_json(req);
|
||||
let buf = {
|
||||
let db = self.db.lock();
|
||||
@@ -297,28 +293,31 @@ impl Handler {
|
||||
.ok_or_else(|| Error::new("no such camera".to_owned()))?;
|
||||
serde_json::to_vec(&json::Camera::new(camera, true))?
|
||||
} else {
|
||||
self.camera_html(&db, query, uuid)?
|
||||
self.camera_html(db, query, uuid)?
|
||||
}
|
||||
};
|
||||
res.headers_mut().set(header::ContentType(if json { JSON.clone() } else { HTML.clone() }));
|
||||
res.send(&buf)?;
|
||||
Ok(())
|
||||
Ok(Response::new()
|
||||
.with_header(header::ContentType(if json { JSON.clone() } else { HTML.clone() }))
|
||||
.with_header(header::ContentLength(buf.len() as u64))
|
||||
.with_body(stream::once(Ok(ARefs::new(buf))).boxed()))
|
||||
}
|
||||
|
||||
fn camera_html(&self, db: &db::LockedDatabase, query: &str, uuid: Uuid)
|
||||
-> Result<Vec<u8>, Error> {
|
||||
fn camera_html(&self, db: MutexGuard<db::LockedDatabase>, query: Option<&str>,
|
||||
uuid: Uuid) -> Result<Vec<u8>, Error> {
|
||||
let (r, trim) = {
|
||||
let mut time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value());
|
||||
let mut trim = false;
|
||||
for (key, value) in form_urlencoded::parse(query.as_bytes()) {
|
||||
let (key, value) = (key.borrow(), value.borrow());
|
||||
match key {
|
||||
"start_time" => time.start = recording::Time::parse(value)?,
|
||||
"end_time" => time.end = recording::Time::parse(value)?,
|
||||
"trim" if value == "true" => trim = true,
|
||||
_ => {},
|
||||
}
|
||||
};
|
||||
if let Some(q) = query {
|
||||
for (key, value) in form_urlencoded::parse(q.as_bytes()) {
|
||||
let (key, value) = (key.borrow(), value.borrow());
|
||||
match key {
|
||||
"start_time" => time.start = recording::Time::parse(value)?,
|
||||
"end_time" => time.end = recording::Time::parse(value)?,
|
||||
"trim" if value == "true" => trim = true,
|
||||
_ => {},
|
||||
}
|
||||
};
|
||||
}
|
||||
(time, trim)
|
||||
};
|
||||
let camera = db.get_camera(uuid)
|
||||
@@ -396,13 +395,14 @@ impl Handler {
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
fn camera_recordings(&self, uuid: Uuid, query: &str, req: &server::Request,
|
||||
mut res: server::Response) -> Result<(), Error> {
|
||||
let r = Handler::get_optional_range(query)?;
|
||||
fn camera_recordings(&self, uuid: Uuid, query: Option<&str>, req: &Request)
|
||||
-> Result<Response<slices::Body>, Error> {
|
||||
let r = Service::get_optional_range(query)?;
|
||||
if !is_json(req) {
|
||||
*res.status_mut() = status::StatusCode::NotAcceptable;
|
||||
res.send(b"only available for JSON requests")?;
|
||||
return Ok(());
|
||||
return Ok(Response::new()
|
||||
.with_status(status::StatusCode::NotAcceptable)
|
||||
.with_body(stream::once(
|
||||
Ok(ARefs::new(&b"only available for JSON requests"[..]))).boxed()));
|
||||
}
|
||||
let mut out = json::ListRecordings{recordings: Vec::new()};
|
||||
{
|
||||
@@ -424,13 +424,14 @@ impl Handler {
|
||||
})?;
|
||||
}
|
||||
let buf = serde_json::to_vec(&out)?;
|
||||
res.headers_mut().set(header::ContentType(JSON.clone()));
|
||||
res.send(&buf)?;
|
||||
Ok(())
|
||||
Ok(Response::new()
|
||||
.with_header(header::ContentType(JSON.clone()))
|
||||
.with_header(header::ContentLength(buf.len() as u64))
|
||||
.with_body(stream::once(Ok(ARefs::new(buf))).boxed()))
|
||||
}
|
||||
|
||||
fn camera_view_mp4(&self, uuid: Uuid, query: &str, req: &server::Request,
|
||||
res: server::Response) -> Result<(), Error> {
|
||||
fn camera_view_mp4(&self, uuid: Uuid, query: Option<&str>, req: &Request)
|
||||
-> Result<Response<slices::Body>, Error> {
|
||||
let camera_id = {
|
||||
let db = self.db.lock();
|
||||
let camera = db.get_camera(uuid)
|
||||
@@ -438,117 +439,127 @@ impl Handler {
|
||||
camera.id
|
||||
};
|
||||
let mut builder = mp4::FileBuilder::new();
|
||||
for (key, value) in form_urlencoded::parse(query.as_bytes()) {
|
||||
let (key, value) = (key.borrow(), value.borrow());
|
||||
match key {
|
||||
"s" => {
|
||||
let s = Segments::parse(value).map_err(
|
||||
|_| Error::new(format!("invalid s parameter: {}", value)))?;
|
||||
debug!("camera_view_mp4: appending s={:?}", s);
|
||||
let mut est_segments = (s.ids.end - s.ids.start) as usize;
|
||||
if let Some(end) = s.end_time {
|
||||
// There should be roughly ceil((end - start) / desired_recording_duration)
|
||||
// recordings in the desired timespan if there are no gaps or overlap,
|
||||
// possibly another for misalignment of the requested timespan with the
|
||||
// rotate offset and another because rotation only happens at key frames.
|
||||
let ceil_durations = (end - s.start_time +
|
||||
recording::DESIRED_RECORDING_DURATION - 1) /
|
||||
recording::DESIRED_RECORDING_DURATION;
|
||||
est_segments = cmp::min(est_segments, (ceil_durations + 2) as usize);
|
||||
}
|
||||
builder.reserve(est_segments);
|
||||
let db = self.db.lock();
|
||||
let mut prev = None;
|
||||
let mut cur_off = 0;
|
||||
db.list_recordings_by_id(camera_id, s.ids.clone(), |r| {
|
||||
if let Some(q) = query {
|
||||
for (key, value) in form_urlencoded::parse(q.as_bytes()) {
|
||||
let (key, value) = (key.borrow(), value.borrow());
|
||||
match key {
|
||||
"s" => {
|
||||
let s = Segments::parse(value).map_err(
|
||||
|_| Error::new(format!("invalid s parameter: {}", value)))?;
|
||||
debug!("camera_view_mp4: appending s={:?}", s);
|
||||
let mut est_segments = (s.ids.end - s.ids.start) as usize;
|
||||
if let Some(end) = s.end_time {
|
||||
// There should be roughly ceil((end - start) /
|
||||
// desired_recording_duration) recordings in the desired timespan if
|
||||
// there are no gaps or overlap, possibly another for misalignment of
|
||||
// the requested timespan with the rotate offset and another because
|
||||
// rotation only happens at key frames.
|
||||
let ceil_durations = (end - s.start_time +
|
||||
recording::DESIRED_RECORDING_DURATION - 1) /
|
||||
recording::DESIRED_RECORDING_DURATION;
|
||||
est_segments = cmp::min(est_segments, (ceil_durations + 2) as usize);
|
||||
}
|
||||
builder.reserve(est_segments);
|
||||
let db = self.db.lock();
|
||||
let mut prev = None;
|
||||
let mut cur_off = 0;
|
||||
db.list_recordings_by_id(camera_id, s.ids.clone(), |r| {
|
||||
// Check for missing recordings.
|
||||
match prev {
|
||||
None if r.id == s.ids.start => {},
|
||||
None => return Err(Error::new(format!("no such recording {}/{}",
|
||||
camera_id, s.ids.start))),
|
||||
Some(id) if r.id != id + 1 => {
|
||||
return Err(Error::new(format!("no such recording {}/{}",
|
||||
camera_id, id + 1)));
|
||||
},
|
||||
_ => {},
|
||||
};
|
||||
prev = Some(r.id);
|
||||
|
||||
// Add a segment for the relevant part of the recording, if any.
|
||||
let end_time = s.end_time.unwrap_or(i64::max_value());
|
||||
let d = r.duration_90k as i64;
|
||||
if s.start_time <= cur_off + d && cur_off < end_time {
|
||||
let start = cmp::max(0, s.start_time - cur_off);
|
||||
let end = cmp::min(d, end_time - cur_off);
|
||||
let times = start as i32 .. end as i32;
|
||||
debug!("...appending recording {}/{} with times {:?} (out of dur {})",
|
||||
r.camera_id, r.id, times, d);
|
||||
builder.append(&db, r, start as i32 .. end as i32)?;
|
||||
} else {
|
||||
debug!("...skipping recording {}/{} dur {}", r.camera_id, r.id, d);
|
||||
}
|
||||
cur_off += d;
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
// Check for missing recordings.
|
||||
match prev {
|
||||
None if r.id == s.ids.start => {},
|
||||
None => return Err(Error::new(format!("no such recording {}/{}",
|
||||
camera_id, s.ids.start))),
|
||||
Some(id) if r.id != id + 1 => {
|
||||
Some(id) if s.ids.end != id + 1 => {
|
||||
return Err(Error::new(format!("no such recording {}/{}",
|
||||
camera_id, id + 1)));
|
||||
camera_id, s.ids.end - 1)));
|
||||
},
|
||||
None => {
|
||||
return Err(Error::new(format!("no such recording {}/{}",
|
||||
camera_id, s.ids.start)));
|
||||
},
|
||||
_ => {},
|
||||
};
|
||||
prev = Some(r.id);
|
||||
|
||||
// Add a segment for the relevant part of the recording, if any.
|
||||
let end_time = s.end_time.unwrap_or(i64::max_value());
|
||||
let d = r.duration_90k as i64;
|
||||
if s.start_time <= cur_off + d && cur_off < end_time {
|
||||
let start = cmp::max(0, s.start_time - cur_off);
|
||||
let end = cmp::min(d, end_time - cur_off);
|
||||
let times = start as i32 .. end as i32;
|
||||
debug!("...appending recording {}/{} with times {:?} (out of dur {})",
|
||||
r.camera_id, r.id, times, d);
|
||||
builder.append(&db, r, start as i32 .. end as i32)?;
|
||||
} else {
|
||||
debug!("...skipping recording {}/{} dur {}", r.camera_id, r.id, d);
|
||||
if let Some(end) = s.end_time {
|
||||
if end > cur_off {
|
||||
return Err(Error::new(
|
||||
format!("end time {} is beyond specified recordings", end)));
|
||||
}
|
||||
}
|
||||
cur_off += d;
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
// Check for missing recordings.
|
||||
match prev {
|
||||
Some(id) if s.ids.end != id + 1 => {
|
||||
return Err(Error::new(format!("no such recording {}/{}",
|
||||
camera_id, s.ids.end - 1)));
|
||||
},
|
||||
None => {
|
||||
return Err(Error::new(format!("no such recording {}/{}",
|
||||
camera_id, s.ids.start)));
|
||||
},
|
||||
_ => {},
|
||||
};
|
||||
if let Some(end) = s.end_time {
|
||||
if end > cur_off {
|
||||
return Err(Error::new(
|
||||
format!("end time {} is beyond specified recordings", end)));
|
||||
}
|
||||
}
|
||||
},
|
||||
"ts" => builder.include_timestamp_subtitle_track(value == "true"),
|
||||
_ => return Err(Error::new(format!("parameter {} not understood", key))),
|
||||
}
|
||||
};
|
||||
},
|
||||
"ts" => builder.include_timestamp_subtitle_track(value == "true"),
|
||||
_ => return Err(Error::new(format!("parameter {} not understood", key))),
|
||||
}
|
||||
};
|
||||
}
|
||||
let mp4 = builder.build(self.db.clone(), self.dir.clone())?;
|
||||
http_entity::serve(&mp4, req, res)?;
|
||||
Ok(())
|
||||
Ok(http_entity::serve(mp4, req))
|
||||
}
|
||||
|
||||
/// Parses optional `start_time_90k` and `end_time_90k` query parameters, defaulting to the
|
||||
/// full range of possible values.
|
||||
fn get_optional_range(query: &str) -> Result<Range<recording::Time>, Error> {
|
||||
fn get_optional_range(query: Option<&str>) -> Result<Range<recording::Time>, Error> {
|
||||
let mut start = i64::min_value();
|
||||
let mut end = i64::max_value();
|
||||
for (key, value) in form_urlencoded::parse(query.as_bytes()) {
|
||||
let (key, value) = (key.borrow(), value.borrow());
|
||||
match key {
|
||||
"start_time_90k" => start = i64::from_str(value)?,
|
||||
"end_time_90k" => end = i64::from_str(value)?,
|
||||
_ => {},
|
||||
}
|
||||
};
|
||||
if let Some(q) = query {
|
||||
for (key, value) in form_urlencoded::parse(q.as_bytes()) {
|
||||
let (key, value) = (key.borrow(), value.borrow());
|
||||
match key {
|
||||
"start_time_90k" => start = i64::from_str(value)?,
|
||||
"end_time_90k" => end = i64::from_str(value)?,
|
||||
_ => {},
|
||||
}
|
||||
};
|
||||
}
|
||||
Ok(recording::Time(start) .. recording::Time(end))
|
||||
}
|
||||
}
|
||||
|
||||
impl server::Handler for Handler {
|
||||
fn handle(&self, req: server::Request, res: server::Response) {
|
||||
let (path, query) = get_path_and_query(&req.uri);
|
||||
let res = match decode_path(path) {
|
||||
Path::CamerasList => self.list_cameras(&req, res),
|
||||
Path::Camera(uuid) => self.camera(uuid, query, &req, res),
|
||||
Path::CameraRecordings(uuid) => self.camera_recordings(uuid, query, &req, res),
|
||||
Path::CameraViewMp4(uuid) => self.camera_view_mp4(uuid, query, &req, res),
|
||||
Path::NotFound => self.not_found(res),
|
||||
impl server::Service for Service {
|
||||
type Request = Request;
|
||||
type Response = Response<slices::Body>;
|
||||
type Error = hyper::Error;
|
||||
type Future = future::FutureResult<Self::Response, Self::Error>;
|
||||
|
||||
fn call(&self, req: Request) -> Self::Future {
|
||||
debug!("request on: {}", req.uri());
|
||||
let res = match decode_path(req.uri().path()) {
|
||||
Path::CamerasList => self.list_cameras(&req),
|
||||
Path::Camera(uuid) => self.camera(uuid, req.uri().query(), &req),
|
||||
Path::CameraRecordings(uuid) => self.camera_recordings(uuid, req.uri().query(), &req),
|
||||
Path::CameraViewMp4(uuid) => self.camera_view_mp4(uuid, req.uri().query(), &req),
|
||||
Path::NotFound => self.not_found(),
|
||||
};
|
||||
if let Err(ref e) = res {
|
||||
warn!("Error handling request: {}", e);
|
||||
}
|
||||
future::result(res.map_err(|e| {
|
||||
error!("error: {}", e);
|
||||
hyper::Error::Incomplete
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -599,6 +610,7 @@ mod tests {
|
||||
|
||||
#[cfg(all(test, feature="nightly"))]
|
||||
mod bench {
|
||||
extern crate reqwest;
|
||||
extern crate test;
|
||||
|
||||
use hyper;
|
||||
@@ -612,18 +624,20 @@ mod bench {
|
||||
|
||||
impl Server {
|
||||
fn new() -> Server {
|
||||
let mut listener = hyper::net::HttpListener::new("127.0.0.1:0").unwrap();
|
||||
use hyper::net::NetworkListener;
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let server = hyper::Server::new(listener);
|
||||
let url = format!("http://{}:{}", addr.ip(), addr.port());
|
||||
let db = TestDb::new();
|
||||
testutil::add_dummy_recordings_to_db(&db.db, 1440);
|
||||
let (tx, rx) = ::std::sync::mpsc::channel();
|
||||
::std::thread::spawn(move || {
|
||||
let h = super::Handler::new(db.db.clone(), db.dir.clone());
|
||||
let _ = server.handle(h);
|
||||
let addr = "127.0.0.1:0".parse().unwrap();
|
||||
let (db, dir) = (db.db.clone(), db.dir.clone());
|
||||
let server = hyper::server::Http::new()
|
||||
.bind(&addr, move || Ok(super::Service::new(db.clone(), dir.clone())))
|
||||
.unwrap();
|
||||
tx.send(server.local_addr().unwrap()).unwrap();
|
||||
server.run().unwrap();
|
||||
});
|
||||
Server{base_url: url}
|
||||
let addr = rx.recv().unwrap();
|
||||
Server{base_url: format!("http://{}:{}", addr.ip(), addr.port())}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -635,13 +649,13 @@ mod bench {
|
||||
fn serve_camera_html(b: &mut Bencher) {
|
||||
testutil::init();
|
||||
let server = &*SERVER;
|
||||
let url = hyper::Url::parse(&format!("{}/cameras/{}/", server.base_url,
|
||||
*testutil::TEST_CAMERA_UUID)).unwrap();
|
||||
let url = reqwest::Url::parse(&format!("{}/cameras/{}/", server.base_url,
|
||||
*testutil::TEST_CAMERA_UUID)).unwrap();
|
||||
let mut buf = Vec::new();
|
||||
b.iter(|| {
|
||||
let client = hyper::Client::new();
|
||||
let client = reqwest::Client::new().unwrap();
|
||||
let mut resp = client.get(url.clone()).send().unwrap();
|
||||
assert_eq!(resp.status, hyper::status::StatusCode::Ok);
|
||||
assert_eq!(*resp.status(), reqwest::StatusCode::Ok);
|
||||
buf.clear();
|
||||
use std::io::Read;
|
||||
resp.read_to_end(&mut buf).unwrap();
|
||||
|
||||
Reference in New Issue
Block a user