mirror of
https://github.com/scottlamb/moonfire-nvr.git
synced 2025-11-23 02:57:43 -05:00
upgrade to hyper 0.12.x
Just one (intentional) functional change---now the streamers start shutting down while the webserver shuts down gracefully.
This commit is contained in:
100
src/body.rs
Normal file
100
src/body.rs
Normal file
@@ -0,0 +1,100 @@
|
||||
// This file is part of Moonfire NVR, a security camera network video recorder.
|
||||
// Copyright (C) 2018 Scott Lamb <slamb@slamb.org>
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// In addition, as a special exception, the copyright holders give
|
||||
// permission to link the code of portions of this program with the
|
||||
// OpenSSL library under certain conditions as described in each
|
||||
// individual source file, and distribute linked combinations including
|
||||
// the two.
|
||||
//
|
||||
// You must obey the GNU General Public License in all respects for all
|
||||
// of the code used other than OpenSSL. If you modify file(s) with this
|
||||
// exception, you may extend this exception to your version of the
|
||||
// file(s), but you are not obligated to do so. If you do not wish to do
|
||||
// so, delete this exception statement from your version. If you delete
|
||||
// this exception statement from all source files in the program, then
|
||||
// also delete it here.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Tools for implementing a `http_serve::Entity` body composed from many "slices".
|
||||
|
||||
use failure::Error;
|
||||
use futures::{Stream, stream};
|
||||
use hyper::body::Payload;
|
||||
use reffers::ARefs;
|
||||
use std::error::Error as StdError;
|
||||
|
||||
pub struct Chunk(ARefs<'static, [u8]>);
|
||||
|
||||
//pub type CompatError = ::failure::Compat<Error>;
|
||||
pub type BoxedError = Box<StdError + Send + Sync>;
|
||||
pub type BodyStream = Box<Stream<Item = Chunk, Error = BoxedError> + Send + 'static>;
|
||||
|
||||
pub fn wrap_error(e: Error) -> BoxedError {
|
||||
Box::new(e.compat())
|
||||
}
|
||||
|
||||
impl From<ARefs<'static, [u8]>> for Chunk {
|
||||
fn from(r: ARefs<'static, [u8]>) -> Self { Chunk(r) }
|
||||
}
|
||||
|
||||
impl From<&'static [u8]> for Chunk {
|
||||
fn from(r: &'static [u8]) -> Self { Chunk(ARefs::new(r)) }
|
||||
}
|
||||
|
||||
impl From<Vec<u8>> for Chunk {
|
||||
fn from(r: Vec<u8>) -> Self { Chunk(ARefs::new(r).map(|v| &v[..])) }
|
||||
}
|
||||
|
||||
impl ::bytes::Buf for Chunk {
|
||||
fn remaining(&self) -> usize { self.0.len() }
|
||||
fn bytes(&self) -> &[u8] { &*self.0 }
|
||||
fn advance(&mut self, cnt: usize) {
|
||||
self.0 = ::std::mem::replace(&mut self.0, ARefs::new(&[][..])).map(|b| &b[cnt..]);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Body(BodyStream);
|
||||
|
||||
impl Payload for Body {
|
||||
type Data = Chunk;
|
||||
type Error = BoxedError;
|
||||
|
||||
fn poll_data(&mut self) -> ::futures::Poll<Option<Self::Data>, Self::Error> {
|
||||
self.0.poll()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BodyStream> for Body {
|
||||
fn from(b: BodyStream) -> Self { Body(b) }
|
||||
}
|
||||
|
||||
impl From<&'static [u8]> for Body {
|
||||
fn from(c: &'static [u8]) -> Self {
|
||||
Body(Box::new(stream::once(Ok(c.into()))))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Error> for Body {
|
||||
fn from(e: Error) -> Self {
|
||||
Body(Box::new(stream::once(Err(wrap_error(e)))))
|
||||
}
|
||||
}
|
||||
|
||||
//impl<C: Into<Chunk>> From<C> for Body {
|
||||
// fn from(c: C) -> Self {
|
||||
// Body(Box::new(stream::once(Ok(c.into()))))
|
||||
// }
|
||||
//}
|
||||
@@ -33,12 +33,13 @@ use db::{self, dir, writer};
|
||||
use failure::Error;
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{Future, Stream};
|
||||
use std::error::Error as StdError;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::thread;
|
||||
use stream;
|
||||
use streamer;
|
||||
use tokio_core::reactor;
|
||||
use tokio;
|
||||
use tokio_signal::unix::{Signal, SIGINT, SIGTERM};
|
||||
use web;
|
||||
|
||||
@@ -78,12 +79,12 @@ struct Args {
|
||||
flag_allow_origin: Option<String>,
|
||||
}
|
||||
|
||||
fn setup_shutdown_future(h: &reactor::Handle) -> Box<Future<Item = (), Error = ()>> {
|
||||
let int = Signal::new(SIGINT, h).flatten_stream().into_future();
|
||||
let term = Signal::new(SIGTERM, h).flatten_stream().into_future();
|
||||
Box::new(int.select(term)
|
||||
.map(|_| ())
|
||||
.map_err(|_| ()))
|
||||
fn setup_shutdown() -> impl Future<Item = (), Error = ()> + Send {
|
||||
let int = Signal::new(SIGINT).flatten_stream().into_future();
|
||||
let term = Signal::new(SIGTERM).flatten_stream().into_future();
|
||||
int.select(term)
|
||||
.map(|_| ())
|
||||
.map_err(|_| ())
|
||||
}
|
||||
|
||||
fn resolve_zone() -> String {
|
||||
@@ -193,14 +194,18 @@ pub fn run() -> Result<(), Error> {
|
||||
|
||||
// Start the web interface.
|
||||
let addr = args.flag_http_addr.parse().unwrap();
|
||||
let server = ::hyper::server::Http::new()
|
||||
.bind(&addr, move || Ok(s.clone()))
|
||||
.unwrap();
|
||||
let server = ::hyper::server::Server::bind(&addr).tcp_nodelay(true).serve(
|
||||
move || Ok::<_, Box<StdError + Send + Sync>>(s.clone()));
|
||||
|
||||
let shutdown = setup_shutdown_future(&server.handle());
|
||||
let shutdown = setup_shutdown().shared();
|
||||
|
||||
info!("Ready to serve HTTP requests");
|
||||
server.run_until(shutdown).unwrap();
|
||||
let reactor = ::std::thread::spawn({
|
||||
let shutdown = shutdown.clone();
|
||||
|| tokio::run(server.with_graceful_shutdown(shutdown.map(|_| ()))
|
||||
.map_err(|e| error!("hyper error: {}", e)))
|
||||
});
|
||||
shutdown.wait().unwrap();
|
||||
|
||||
info!("Shutting down streamers.");
|
||||
shutdown_streamers.store(true, Ordering::SeqCst);
|
||||
@@ -218,6 +223,8 @@ pub fn run() -> Result<(), Error> {
|
||||
}
|
||||
}
|
||||
|
||||
info!("Waiting for HTTP requests to finish.");
|
||||
reactor.join().unwrap();
|
||||
info!("Exiting.");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -47,7 +47,6 @@ extern crate libc;
|
||||
extern crate reffers;
|
||||
extern crate rusqlite;
|
||||
extern crate memmap;
|
||||
extern crate mime;
|
||||
extern crate moonfire_base as base;
|
||||
extern crate moonfire_db as db;
|
||||
extern crate moonfire_ffmpeg;
|
||||
@@ -60,13 +59,14 @@ extern crate serde;
|
||||
extern crate serde_json;
|
||||
extern crate smallvec;
|
||||
extern crate time;
|
||||
extern crate tokio_core;
|
||||
extern crate tokio;
|
||||
extern crate tokio_signal;
|
||||
extern crate url;
|
||||
extern crate uuid;
|
||||
|
||||
use base::clock as clock;
|
||||
|
||||
mod body;
|
||||
mod cmds;
|
||||
mod h264;
|
||||
mod json;
|
||||
|
||||
131
src/mp4.rs
131
src/mp4.rs
@@ -79,20 +79,22 @@
|
||||
extern crate time;
|
||||
|
||||
use base::strutil;
|
||||
use bytes::BytesMut;
|
||||
use bytes::{Buf, BytesMut};
|
||||
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
|
||||
use body::{Chunk, BoxedError, wrap_error};
|
||||
use db::recording::{self, TIME_UNITS_PER_SEC};
|
||||
use db::{self, dir};
|
||||
use failure::Error;
|
||||
use futures::Stream;
|
||||
use futures::stream;
|
||||
use http;
|
||||
use http::header::HeaderValue;
|
||||
use http_serve;
|
||||
use hyper::header;
|
||||
use memmap;
|
||||
use openssl::hash;
|
||||
use parking_lot::{Once, ONCE_INIT};
|
||||
use reffers::ARefs;
|
||||
use slices::{self, Body, Chunk, Slices};
|
||||
use slices::{self, Slices};
|
||||
use smallvec::SmallVec;
|
||||
use std::cell::UnsafeCell;
|
||||
use std::cmp;
|
||||
@@ -101,6 +103,7 @@ use std::io;
|
||||
use std::ops::Range;
|
||||
use std::mem;
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
|
||||
/// This value should be incremented any time a change is made to this file that causes different
|
||||
/// bytes to be output for a particular set of `Mp4Builder` options. Incrementing this value will
|
||||
@@ -615,7 +618,7 @@ impl Slice {
|
||||
let mp4 = ARefs::new(mp4.0.clone());
|
||||
let r = r.start as usize .. r.end as usize;
|
||||
let p = self.p();
|
||||
mp4.try_map(|mp4| Ok(&mp4.segments[p].get_index(&mp4.db, f)?[r]))
|
||||
Ok(mp4.try_map(|mp4| Ok::<_, Error>(&mp4.segments[p].get_index(&mp4.db, f)?[r]))?.into())
|
||||
}
|
||||
|
||||
fn wrap_truns(&self, mp4: &File, r: Range<u64>, len: usize) -> Result<Chunk, Error> {
|
||||
@@ -629,16 +632,17 @@ impl Slice {
|
||||
mp4.0.db.lock()
|
||||
.with_recording_playback(s.s.id, &mut |playback| s.truns(playback, pos, len))?;
|
||||
let truns = ARefs::new(truns);
|
||||
Ok(truns.map(|t| &t[r.start as usize .. r.end as usize]))
|
||||
Ok(truns.map(|t| &t[r.start as usize .. r.end as usize]).into())
|
||||
}
|
||||
}
|
||||
|
||||
impl slices::Slice for Slice {
|
||||
type Ctx = File;
|
||||
type Chunk = slices::Chunk;
|
||||
type Chunk = Chunk;
|
||||
|
||||
fn end(&self) -> u64 { return self.0 & 0xFF_FF_FF_FF_FF }
|
||||
fn get_range(&self, f: &File, range: Range<u64>, len: u64) -> Body {
|
||||
fn get_range(&self, f: &File, range: Range<u64>, len: u64)
|
||||
-> Box<Stream<Item = Self::Chunk, Error = BoxedError> + Send> {
|
||||
trace!("getting mp4 slice {:?}'s range {:?} / {}", self, range, len);
|
||||
let p = self.p();
|
||||
let res = match self.t() {
|
||||
@@ -649,12 +653,12 @@ impl slices::Slice for Slice {
|
||||
},
|
||||
SliceType::Buf => {
|
||||
let r = ARefs::new(f.0.clone());
|
||||
Ok(r.map(|f| &f.buf[p+range.start as usize .. p+range.end as usize]))
|
||||
Ok(r.map(|f| &f.buf[p+range.start as usize .. p+range.end as usize]).into())
|
||||
},
|
||||
SliceType::VideoSampleEntry => {
|
||||
let r = ARefs::new(f.0.clone());
|
||||
Ok(r.map(|f| &f.video_sample_entries[p]
|
||||
.data[range.start as usize .. range.end as usize]))
|
||||
.data[range.start as usize .. range.end as usize]).into())
|
||||
},
|
||||
SliceType::Stts => self.wrap_index(f, range.clone(), &Segment::stts),
|
||||
SliceType::Stsz => self.wrap_index(f, range.clone(), &Segment::stsz),
|
||||
@@ -665,15 +669,12 @@ impl slices::Slice for Slice {
|
||||
SliceType::Truns => self.wrap_truns(f, range.clone(), len as usize),
|
||||
};
|
||||
Box::new(stream::once(res
|
||||
.map_err(|e| {
|
||||
error!("Error producing {:?}: {:?}", self, e);
|
||||
::hyper::Error::Incomplete
|
||||
})
|
||||
.map_err(|e| wrap_error(e))
|
||||
.and_then(move |c| {
|
||||
if c.len() != (range.end - range.start) as usize {
|
||||
error!("Error producing {:?}: range {:?} produced incorrect len {}.",
|
||||
self, range, c.len());
|
||||
return Err(::hyper::Error::Incomplete);
|
||||
if c.remaining() != (range.end - range.start) as usize {
|
||||
return Err(wrap_error(format_err!(
|
||||
"Error producing {:?}: range {:?} produced incorrect len {}.",
|
||||
self, range, c.remaining())));
|
||||
}
|
||||
Ok(c)
|
||||
})))
|
||||
@@ -868,8 +869,8 @@ impl FileBuilder {
|
||||
}
|
||||
debug!("segments: {:#?}", self.segments);
|
||||
debug!("slices: {:?}", self.body.slices);
|
||||
let mtime = ::std::time::UNIX_EPOCH +
|
||||
::std::time::Duration::from_secs(max_end as u64);
|
||||
let last_modified = ::std::time::UNIX_EPOCH +
|
||||
::std::time::Duration::from_secs(max_end as u64);
|
||||
Ok(File(Arc::new(FileInner {
|
||||
db,
|
||||
dirs_by_stream_id,
|
||||
@@ -878,8 +879,9 @@ impl FileBuilder {
|
||||
buf: self.body.buf,
|
||||
video_sample_entries: self.video_sample_entries,
|
||||
initial_sample_byte_pos,
|
||||
last_modified: mtime.into(),
|
||||
etag: header::EntityTag::strong(strutil::hex(&etag.finish()?)),
|
||||
last_modified,
|
||||
etag: HeaderValue::from_str(&format!("\"{}\"", &strutil::hex(&etag.finish()?)))
|
||||
.expect("hex string should be valid UTF-8"),
|
||||
})))
|
||||
}
|
||||
|
||||
@@ -1420,8 +1422,8 @@ struct FileInner {
|
||||
buf: Vec<u8>,
|
||||
video_sample_entries: SmallVec<[Arc<db::VideoSampleEntry>; 1]>,
|
||||
initial_sample_byte_pos: u64,
|
||||
last_modified: header::HttpDate,
|
||||
etag: header::EntityTag,
|
||||
last_modified: SystemTime,
|
||||
etag: HeaderValue,
|
||||
}
|
||||
|
||||
impl FileInner {
|
||||
@@ -1433,7 +1435,7 @@ impl FileInner {
|
||||
let r = s.s.sample_file_range();
|
||||
pos += r.end - r.start;
|
||||
}
|
||||
Ok(ARefs::new(v).map(|v| &v[r.start as usize .. r.end as usize]))
|
||||
Ok(ARefs::new(v).map(|v| &v[r.start as usize .. r.end as usize]).into())
|
||||
}
|
||||
|
||||
/// Gets a `Chunk` of video sample data from disk.
|
||||
@@ -1459,7 +1461,7 @@ impl FileInner {
|
||||
.map(&f)?
|
||||
});
|
||||
use core::ops::Deref;
|
||||
Ok(ARefs::new(mmap).map(|m| m.deref()))
|
||||
Ok(ARefs::new(mmap).map(|m| m.deref()).into())
|
||||
}
|
||||
|
||||
fn get_subtitle_sample_data(&self, i: usize, r: Range<u64>, l: u64) -> Result<Chunk, Error> {
|
||||
@@ -1475,7 +1477,7 @@ impl FileInner {
|
||||
use std::io::Write;
|
||||
write!(v, "{}", tm.strftime(SUBTITLE_TEMPLATE)?)?;
|
||||
}
|
||||
Ok(ARefs::new(v).map(|v| &v[r.start as usize .. r.end as usize]))
|
||||
Ok(ARefs::new(v).map(|v| &v[r.start as usize .. r.end as usize]).into())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1483,8 +1485,8 @@ impl FileInner {
|
||||
pub struct File(Arc<FileInner>);
|
||||
|
||||
impl http_serve::Entity for File {
|
||||
type Chunk = slices::Chunk;
|
||||
type Body = slices::Body;
|
||||
type Data = Chunk;
|
||||
type Error = BoxedError;
|
||||
|
||||
fn add_headers(&self, hdrs: &mut http::header::HeaderMap) {
|
||||
let mut mime = BytesMut::with_capacity(64);
|
||||
@@ -1502,10 +1504,13 @@ impl http_serve::Entity for File {
|
||||
hdrs.insert(http::header::CONTENT_TYPE,
|
||||
http::header::HeaderValue::from_shared(mime.freeze()).unwrap());
|
||||
}
|
||||
fn last_modified(&self) -> Option<header::HttpDate> { Some(self.0.last_modified) }
|
||||
fn etag(&self) -> Option<header::EntityTag> { Some(self.0.etag.clone()) }
|
||||
fn last_modified(&self) -> Option<SystemTime> { Some(self.0.last_modified) }
|
||||
fn etag(&self) -> Option<HeaderValue> { Some(self.0.etag.clone()) }
|
||||
fn len(&self) -> u64 { self.0.slices.len() }
|
||||
fn get_range(&self, range: Range<u64>) -> Body { self.0.slices.get_range(self, range) }
|
||||
fn get_range(&self, range: Range<u64>)
|
||||
-> Box<Stream<Item = Self::Data, Error = Self::Error> + Send> {
|
||||
self.0.slices.get_range(self, range)
|
||||
}
|
||||
}
|
||||
|
||||
/// Tests. There are two general strategies used to validate the resulting files:
|
||||
@@ -1521,6 +1526,7 @@ impl http_serve::Entity for File {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use base::strutil;
|
||||
use bytes::Buf;
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
use clock::RealClocks;
|
||||
use db::recording::{self, TIME_UNITS_PER_SEC};
|
||||
@@ -1528,7 +1534,6 @@ mod tests {
|
||||
use db::writer;
|
||||
use futures::Future;
|
||||
use futures::Stream as FuturesStream;
|
||||
use hyper::header;
|
||||
use openssl::hash;
|
||||
use http_serve::{self, Entity};
|
||||
use std::fs;
|
||||
@@ -1538,26 +1543,28 @@ mod tests {
|
||||
use super::*;
|
||||
use stream::{self, Opener, Stream};
|
||||
|
||||
fn fill_slice<E: http_serve::Entity>(slice: &mut [u8], e: &E, start: u64) {
|
||||
fn fill_slice<E: http_serve::Entity>(slice: &mut [u8], e: &E, start: u64)
|
||||
where E::Error : ::std::fmt::Debug {
|
||||
let mut p = 0;
|
||||
e.get_range(start .. start + slice.len() as u64)
|
||||
.for_each(|chunk| {
|
||||
let c: &[u8] = chunk.as_ref();
|
||||
let c: &[u8] = chunk.bytes();
|
||||
slice[p .. p + c.len()].copy_from_slice(c);
|
||||
p += c.len();
|
||||
Ok::<_, ::hyper::Error>(())
|
||||
Ok::<_, E::Error>(())
|
||||
})
|
||||
.wait()
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
/// Returns the SHA-1 digest of the given `Entity`.
|
||||
fn digest<E: http_serve::Entity>(e: &E) -> hash::DigestBytes {
|
||||
fn digest<E: http_serve::Entity>(e: &E) -> hash::DigestBytes
|
||||
where E::Error : ::std::fmt::Debug {
|
||||
e.get_range(0 .. e.len())
|
||||
.fold(hash::Hasher::new(hash::MessageDigest::sha1()).unwrap(), |mut sha1, chunk| {
|
||||
let c: &[u8] = chunk.as_ref();
|
||||
let c: &[u8] = chunk.bytes();
|
||||
sha1.update(c).unwrap();
|
||||
Ok::<_, ::hyper::Error>(sha1)
|
||||
Ok::<_, E::Error>(sha1)
|
||||
})
|
||||
.wait()
|
||||
.unwrap()
|
||||
@@ -1812,7 +1819,7 @@ mod tests {
|
||||
use ::std::io::Write;
|
||||
mp4.get_range(0 .. mp4.len())
|
||||
.for_each(|chunk| {
|
||||
out.write_all(&chunk)?;
|
||||
out.write_all(chunk.bytes())?;
|
||||
Ok(())
|
||||
})
|
||||
.wait()
|
||||
@@ -2116,8 +2123,8 @@ mod tests {
|
||||
// combine ranges from the new format with ranges from the old format.
|
||||
let sha1 = digest(&mp4);
|
||||
assert_eq!("1e5331e8371bd97ac3158b3a86494abc87cdc70e", strutil::hex(&sha1[..]));
|
||||
const EXPECTED_ETAG: &'static str = "04298efb2df0cc45a6cea65dfdf2e817a3b42ca8";
|
||||
assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag());
|
||||
const EXPECTED_ETAG: &'static str = "\"04298efb2df0cc45a6cea65dfdf2e817a3b42ca8\"";
|
||||
assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag());
|
||||
drop(db.syncer_channel);
|
||||
db.db.lock().clear_on_flush();
|
||||
db.syncer_join.join().unwrap();
|
||||
@@ -2137,8 +2144,8 @@ mod tests {
|
||||
// combine ranges from the new format with ranges from the old format.
|
||||
let sha1 = digest(&mp4);
|
||||
assert_eq!("de382684a471f178e4e3a163762711b0653bfd83", strutil::hex(&sha1[..]));
|
||||
const EXPECTED_ETAG: &'static str = "16a4f6348560c3de0d149675dccba21ef7906be3";
|
||||
assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag());
|
||||
const EXPECTED_ETAG: &'static str = "\"16a4f6348560c3de0d149675dccba21ef7906be3\"";
|
||||
assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag());
|
||||
drop(db.syncer_channel);
|
||||
db.db.lock().clear_on_flush();
|
||||
db.syncer_join.join().unwrap();
|
||||
@@ -2158,8 +2165,8 @@ mod tests {
|
||||
// combine ranges from the new format with ranges from the old format.
|
||||
let sha1 = digest(&mp4);
|
||||
assert_eq!("d655945f94e18e6ed88a2322d27522aff6f76403", strutil::hex(&sha1[..]));
|
||||
const EXPECTED_ETAG: &'static str = "80e418b029e81aa195f90aa6b806015a5030e5be";
|
||||
assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag());
|
||||
const EXPECTED_ETAG: &'static str = "\"80e418b029e81aa195f90aa6b806015a5030e5be\"";
|
||||
assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag());
|
||||
drop(db.syncer_channel);
|
||||
db.db.lock().clear_on_flush();
|
||||
db.syncer_join.join().unwrap();
|
||||
@@ -2179,8 +2186,8 @@ mod tests {
|
||||
// combine ranges from the new format with ranges from the old format.
|
||||
let sha1 = digest(&mp4);
|
||||
assert_eq!("e0d28ddf08e24575a82657b1ce0b2da73f32fd88", strutil::hex(&sha1[..]));
|
||||
const EXPECTED_ETAG: &'static str = "5bfea0f20108a7c5b77ef1e21d82ef2abc29540f";
|
||||
assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag());
|
||||
const EXPECTED_ETAG: &'static str = "\"5bfea0f20108a7c5b77ef1e21d82ef2abc29540f\"";
|
||||
assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag());
|
||||
drop(db.syncer_channel);
|
||||
db.db.lock().clear_on_flush();
|
||||
db.syncer_join.join().unwrap();
|
||||
@@ -2195,12 +2202,11 @@ mod bench {
|
||||
use base::clock::RealClocks;
|
||||
use db::recording;
|
||||
use db::testutil::{self, TestDb};
|
||||
use futures::Stream;
|
||||
use futures::future;
|
||||
use futures::{Future, future};
|
||||
use hyper;
|
||||
use http_serve;
|
||||
use reffers::ARefs;
|
||||
use self::test::Bencher;
|
||||
use std::error::Error as StdError;
|
||||
use super::tests::create_mp4_from_db;
|
||||
use url::Url;
|
||||
|
||||
@@ -2225,14 +2231,14 @@ mod bench {
|
||||
let (tx, rx) = ::std::sync::mpsc::channel();
|
||||
::std::thread::spawn(move || {
|
||||
let addr = "127.0.0.1:0".parse().unwrap();
|
||||
let server = hyper::server::Http::new()
|
||||
.bind(&addr, move || Ok(MyService(mp4.clone())))
|
||||
.unwrap();
|
||||
tx.send(server.local_addr().unwrap()).unwrap();
|
||||
server.run().unwrap();
|
||||
let server = hyper::server::Server::bind(&addr)
|
||||
.tcp_nodelay(true)
|
||||
.serve(move || Ok::<_, Box<StdError + Send + Sync>>(MyService(mp4.clone())));
|
||||
tx.send(server.local_addr()).unwrap();
|
||||
::tokio::run(server.map_err(|e| panic!(e)));
|
||||
});
|
||||
let addr = rx.recv().unwrap();
|
||||
BenchServer{
|
||||
BenchServer {
|
||||
url: Url::parse(&format!("http://{}:{}/", addr.ip(), addr.port())).unwrap(),
|
||||
generated_len: p,
|
||||
}
|
||||
@@ -2241,14 +2247,13 @@ mod bench {
|
||||
|
||||
struct MyService(super::File);
|
||||
|
||||
impl hyper::server::Service for MyService {
|
||||
type Request = hyper::server::Request;
|
||||
type Response = hyper::server::Response<
|
||||
Box<Stream<Item = ARefs<'static, [u8]>, Error = hyper::Error> + Send>>;
|
||||
type Error = hyper::Error;
|
||||
type Future = future::FutureResult<Self::Response, Self::Error>;
|
||||
impl hyper::service::Service for MyService {
|
||||
type ReqBody = ::hyper::Body;
|
||||
type ResBody = ::body::Body;
|
||||
type Error = ::body::BoxedError;
|
||||
type Future = future::FutureResult<::http::Response<Self::ResBody>, Self::Error>;
|
||||
|
||||
fn call(&self, req: hyper::server::Request) -> Self::Future {
|
||||
fn call(&mut self, req: ::http::Request<Self::ReqBody>) -> Self::Future {
|
||||
future::ok(http_serve::serve(self.0.clone(), &req))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,16 +30,13 @@
|
||||
|
||||
//! Tools for implementing a `http_serve::Entity` body composed from many "slices".
|
||||
|
||||
use body::{BoxedError, wrap_error};
|
||||
use failure::Error;
|
||||
use futures::stream;
|
||||
use futures::Stream;
|
||||
use reffers::ARefs;
|
||||
use std::fmt;
|
||||
use std::ops::Range;
|
||||
|
||||
pub type Chunk = ARefs<'static, [u8]>;
|
||||
pub type Body = Box<Stream<Item = Chunk, Error = ::hyper::Error> + Send>;
|
||||
|
||||
/// Writes a byte range to the given `io::Write` given a context argument; meant for use with
|
||||
/// `Slices`.
|
||||
pub trait Slice : fmt::Debug + Sized + Sync + 'static {
|
||||
@@ -54,7 +51,7 @@ pub trait Slice : fmt::Debug + Sized + Sync + 'static {
|
||||
/// The additional argument `ctx` is as supplied to the `Slices`.
|
||||
/// The additional argument `l` is the length of this slice, as determined by the `Slices`.
|
||||
fn get_range(&self, ctx: &Self::Ctx, r: Range<u64>, len: u64)
|
||||
-> Box<Stream<Item = Self::Chunk, Error = ::hyper::Error> + Send>;
|
||||
-> Box<Stream<Item = Self::Chunk, Error = BoxedError> + Send>;
|
||||
|
||||
fn get_slices(ctx: &Self::Ctx) -> &Slices<Self>;
|
||||
}
|
||||
@@ -113,10 +110,10 @@ impl<S> Slices<S> where S: Slice {
|
||||
/// Writes `range` to `out`.
|
||||
/// This interface mirrors `http_serve::Entity::write_to`, with the additional `ctx` argument.
|
||||
pub fn get_range(&self, ctx: &S::Ctx, range: Range<u64>)
|
||||
-> Box<Stream<Item = S::Chunk, Error = ::hyper::Error> + Send> {
|
||||
-> Box<Stream<Item = S::Chunk, Error = BoxedError> + Send> {
|
||||
if range.start > range.end || range.end > self.len {
|
||||
error!("Bad range {:?} for slice of length {}", range, self.len);
|
||||
return Box::new(stream::once(Err(::hyper::Error::Incomplete)));
|
||||
return Box::new(stream::once(Err(wrap_error(format_err!(
|
||||
"Bad range {:?} for slice of length {}", range, self.len)))));
|
||||
}
|
||||
|
||||
// Binary search for the first slice of the range to write, determining its index and
|
||||
@@ -143,7 +140,7 @@ impl<S> Slices<S> where S: Slice {
|
||||
let l = s_end - slice_start;
|
||||
body = s.get_range(&c, start_pos .. min_end - slice_start, l);
|
||||
};
|
||||
Some(Ok::<_, ::hyper::Error>((body, (c, i+1, 0, min_end))))
|
||||
Some(Ok::<_, BoxedError>((body, (c, i+1, 0, min_end))))
|
||||
});
|
||||
Box::new(bodies.flatten())
|
||||
}
|
||||
@@ -151,6 +148,7 @@ impl<S> Slices<S> where S: Slice {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use body::BoxedError;
|
||||
use db::testutil;
|
||||
use futures::{Future, Stream};
|
||||
use futures::stream;
|
||||
@@ -176,7 +174,7 @@ mod tests {
|
||||
fn end(&self) -> u64 { self.end }
|
||||
|
||||
fn get_range(&self, _ctx: &&'static Slices<FakeSlice>, r: Range<u64>, _l: u64)
|
||||
-> Box<Stream<Item = FakeChunk, Error = ::hyper::Error> + Send> {
|
||||
-> Box<Stream<Item = FakeChunk, Error = BoxedError> + Send> {
|
||||
Box::new(stream::once(Ok(FakeChunk{slice: self.name, range: r})))
|
||||
}
|
||||
|
||||
|
||||
110
src/web.rs
110
src/web.rs
@@ -31,25 +31,22 @@
|
||||
extern crate hyper;
|
||||
|
||||
use base::strutil;
|
||||
use body::{Body, BoxedError, wrap_error};
|
||||
use core::borrow::Borrow;
|
||||
use core::str::FromStr;
|
||||
use db::{self, recording};
|
||||
use db::dir::SampleFileDir;
|
||||
use failure::Error;
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{future, stream};
|
||||
use futures::future;
|
||||
use futures_cpupool;
|
||||
use json;
|
||||
use http;
|
||||
use http::{self, Request, Response, status::StatusCode};
|
||||
use http_serve;
|
||||
use hyper::header::{self, Header};
|
||||
use hyper::server::{self, Request, Response};
|
||||
use mime;
|
||||
use http::header::{self, HeaderValue};
|
||||
use mp4;
|
||||
use reffers::ARefs;
|
||||
use regex::Regex;
|
||||
use serde_json;
|
||||
use slices;
|
||||
use std::collections::HashMap;
|
||||
use std::cmp;
|
||||
use std::fs;
|
||||
@@ -185,7 +182,7 @@ impl Segments {
|
||||
/// The files themselves are opened on every request so they can be changed during development.
|
||||
#[derive(Debug)]
|
||||
struct UiFile {
|
||||
mime: http::header::HeaderValue,
|
||||
mime: HeaderValue,
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
@@ -193,21 +190,21 @@ struct ServiceInner {
|
||||
db: Arc<db::Database>,
|
||||
dirs_by_stream_id: Arc<FnvHashMap<i32, Arc<SampleFileDir>>>,
|
||||
ui_files: HashMap<String, UiFile>,
|
||||
allow_origin: Option<header::AccessControlAllowOrigin>,
|
||||
allow_origin: Option<HeaderValue>,
|
||||
pool: futures_cpupool::CpuPool,
|
||||
time_zone_name: String,
|
||||
}
|
||||
|
||||
impl ServiceInner {
|
||||
fn not_found(&self) -> Result<Response<slices::Body>, Error> {
|
||||
let body: slices::Body = Box::new(stream::once(Ok(ARefs::new(&b"not found"[..]))));
|
||||
Ok(Response::new()
|
||||
.with_status(hyper::StatusCode::NotFound)
|
||||
.with_header(header::ContentType(mime::TEXT_PLAIN))
|
||||
.with_body(body))
|
||||
fn not_found(&self) -> Result<Response<Body>, Error> {
|
||||
let body: Body = (&b"not found"[..]).into();
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NOT_FOUND)
|
||||
.header(header::CONTENT_TYPE, HeaderValue::from_static("text/plain"))
|
||||
.body(body)?)
|
||||
}
|
||||
|
||||
fn top_level(&self, req: &Request) -> Result<Response<slices::Body>, Error> {
|
||||
fn top_level(&self, req: &Request<::hyper::Body>) -> Result<Response<Body>, Error> {
|
||||
let mut days = false;
|
||||
if let Some(q) = req.uri().query() {
|
||||
for (key, value) in form_urlencoded::parse(q.as_bytes()) {
|
||||
@@ -219,8 +216,10 @@ impl ServiceInner {
|
||||
}
|
||||
}
|
||||
|
||||
let mut resp = Response::new().with_header(header::ContentType(mime::APPLICATION_JSON));
|
||||
if let Some(mut w) = http_serve::streaming_body(&req, &mut resp).build() {
|
||||
let (mut resp, writer) = http_serve::streaming_body(&req).build();
|
||||
resp.headers_mut().insert(header::CONTENT_TYPE,
|
||||
HeaderValue::from_static("application/json"));
|
||||
if let Some(mut w) = writer {
|
||||
let db = self.db.lock();
|
||||
serde_json::to_writer(&mut w, &json::TopLevel {
|
||||
time_zone_name: &self.time_zone_name,
|
||||
@@ -230,9 +229,11 @@ impl ServiceInner {
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
fn camera(&self, req: &Request, uuid: Uuid) -> Result<Response<slices::Body>, Error> {
|
||||
let mut resp = Response::new().with_header(header::ContentType(mime::APPLICATION_JSON));
|
||||
if let Some(mut w) = http_serve::streaming_body(&req, &mut resp).build() {
|
||||
fn camera(&self, req: &Request<::hyper::Body>, uuid: Uuid) -> Result<Response<Body>, Error> {
|
||||
let (mut resp, writer) = http_serve::streaming_body(&req).build();
|
||||
resp.headers_mut().insert(header::CONTENT_TYPE,
|
||||
HeaderValue::from_static("application/json"));
|
||||
if let Some(mut w) = writer {
|
||||
let db = self.db.lock();
|
||||
let camera = db.get_camera(uuid)
|
||||
.ok_or_else(|| format_err!("no such camera {}", uuid))?;
|
||||
@@ -241,8 +242,8 @@ impl ServiceInner {
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
fn stream_recordings(&self, req: &Request, uuid: Uuid, type_: db::StreamType)
|
||||
-> Result<Response<slices::Body>, Error> {
|
||||
fn stream_recordings(&self, req: &Request<::hyper::Body>, uuid: Uuid, type_: db::StreamType)
|
||||
-> Result<Response<Body>, Error> {
|
||||
let (r, split) = {
|
||||
let mut time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value());
|
||||
let mut split = recording::Duration(i64::max_value());
|
||||
@@ -286,14 +287,17 @@ impl ServiceInner {
|
||||
Ok(())
|
||||
})?;
|
||||
}
|
||||
let mut resp = Response::new().with_header(header::ContentType(mime::APPLICATION_JSON));
|
||||
if let Some(mut w) = http_serve::streaming_body(&req, &mut resp).build() {
|
||||
let (mut resp, writer) = http_serve::streaming_body(&req).build();
|
||||
resp.headers_mut().insert(header::CONTENT_TYPE,
|
||||
HeaderValue::from_static("application/json"));
|
||||
if let Some(mut w) = writer {
|
||||
serde_json::to_writer(&mut w, &out)?
|
||||
};
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
fn init_segment(&self, sha1: [u8; 20], req: &Request) -> Result<Response<slices::Body>, Error> {
|
||||
fn init_segment(&self, sha1: [u8; 20], req: &Request<::hyper::Body>)
|
||||
-> Result<Response<Body>, Error> {
|
||||
let mut builder = mp4::FileBuilder::new(mp4::Type::InitSegment);
|
||||
let db = self.db.lock();
|
||||
for ent in db.video_sample_entries_by_id().values() {
|
||||
@@ -306,8 +310,9 @@ impl ServiceInner {
|
||||
self.not_found()
|
||||
}
|
||||
|
||||
fn stream_view_mp4(&self, req: &Request, uuid: Uuid, stream_type_: db::StreamType,
|
||||
mp4_type_: mp4::Type) -> Result<Response<slices::Body>, Error> {
|
||||
fn stream_view_mp4(&self, req: &Request<::hyper::Body>, uuid: Uuid,
|
||||
stream_type_: db::StreamType, mp4_type_: mp4::Type)
|
||||
-> Result<Response<Body>, Error> {
|
||||
let stream_id = {
|
||||
let db = self.db.lock();
|
||||
let camera = db.get_camera(uuid)
|
||||
@@ -403,14 +408,14 @@ impl ServiceInner {
|
||||
Ok(http_serve::serve(mp4, req))
|
||||
}
|
||||
|
||||
fn static_file(&self, req: &Request) -> Result<Response<slices::Body>, Error> {
|
||||
fn static_file(&self, req: &Request<::hyper::Body>) -> Result<Response<Body>, Error> {
|
||||
let s = match self.ui_files.get(req.uri().path()) {
|
||||
None => { return self.not_found() },
|
||||
Some(s) => s,
|
||||
};
|
||||
let f = fs::File::open(&s.path)?;
|
||||
let mut hdrs = http::HeaderMap::new();
|
||||
hdrs.insert(http::header::CONTENT_TYPE, s.mime.clone());
|
||||
hdrs.insert(header::CONTENT_TYPE, s.mime.clone());
|
||||
let e = http_serve::ChunkedReadFile::new(f, Some(self.pool.clone()), hdrs)?;
|
||||
Ok(http_serve::serve(e, &req))
|
||||
}
|
||||
@@ -445,7 +450,7 @@ impl Service {
|
||||
};
|
||||
let allow_origin = match allow_origin {
|
||||
None => None,
|
||||
Some(o) => Some(header::AccessControlAllowOrigin::parse_header(&header::Raw::from(o))?),
|
||||
Some(o) => Some(HeaderValue::from_str(&o)?),
|
||||
};
|
||||
Ok(Service(Arc::new(ServiceInner {
|
||||
db,
|
||||
@@ -492,22 +497,22 @@ impl Service {
|
||||
},
|
||||
};
|
||||
files.insert(p, UiFile {
|
||||
mime: http::header::HeaderValue::from_static(mime),
|
||||
mime: HeaderValue::from_static(mime),
|
||||
path: e.path(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl server::Service for Service {
|
||||
type Request = Request;
|
||||
type Response = Response<slices::Body>;
|
||||
type Error = hyper::Error;
|
||||
type Future = future::FutureResult<Self::Response, Self::Error>;
|
||||
impl ::hyper::service::Service for Service {
|
||||
type ReqBody = ::hyper::Body;
|
||||
type ResBody = Body;
|
||||
type Error = BoxedError;
|
||||
type Future = future::FutureResult<Response<Self::ResBody>, Self::Error>;
|
||||
|
||||
fn call(&self, req: Request) -> Self::Future {
|
||||
fn call(&mut self, req: Request<::hyper::Body>) -> Self::Future {
|
||||
debug!("request on: {}", req.uri());
|
||||
let res = match decode_path(req.uri().path()) {
|
||||
let mut res = match decode_path(req.uri().path()) {
|
||||
Path::InitSegment(sha1) => self.0.init_segment(sha1, &req),
|
||||
Path::TopLevel => self.0.top_level(&req),
|
||||
Path::Camera(uuid) => self.0.camera(&req, uuid),
|
||||
@@ -521,15 +526,12 @@ impl server::Service for Service {
|
||||
Path::NotFound => self.0.not_found(),
|
||||
Path::Static => self.0.static_file(&req),
|
||||
};
|
||||
let res = if let Some(ref o) = self.0.allow_origin {
|
||||
res.map(|resp| resp.with_header(o.clone()))
|
||||
} else {
|
||||
res
|
||||
};
|
||||
future::result(res.map_err(|e| {
|
||||
error!("error: {}", e);
|
||||
hyper::Error::Incomplete
|
||||
}))
|
||||
if let Ok(ref mut resp) = res {
|
||||
if let Some(ref o) = self.0.allow_origin {
|
||||
resp.headers_mut().insert(header::ACCESS_CONTROL_ALLOW_ORIGIN, o.clone());
|
||||
}
|
||||
}
|
||||
future::result(res.map_err(|e| wrap_error(e)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -570,8 +572,10 @@ mod bench {
|
||||
extern crate test;
|
||||
|
||||
use db::testutil::{self, TestDb};
|
||||
use futures::Future;
|
||||
use hyper;
|
||||
use self::test::Bencher;
|
||||
use std::error::Error as StdError;
|
||||
use uuid::Uuid;
|
||||
|
||||
struct Server {
|
||||
@@ -589,11 +593,11 @@ mod bench {
|
||||
let addr = "127.0.0.1:0".parse().unwrap();
|
||||
let service = super::Service::new(db.db.clone(), None, None,
|
||||
"".to_owned()).unwrap();
|
||||
let server = hyper::server::Http::new()
|
||||
.bind(&addr, move || Ok(service.clone()))
|
||||
.unwrap();
|
||||
tx.send(server.local_addr().unwrap()).unwrap();
|
||||
server.run().unwrap();
|
||||
let server = hyper::server::Server::bind(&addr)
|
||||
.tcp_nodelay(true)
|
||||
.serve(move || Ok::<_, Box<StdError + Send + Sync>>(service.clone()));
|
||||
tx.send(server.local_addr()).unwrap();
|
||||
::tokio::run(server.map_err(|e| panic!(e)));
|
||||
});
|
||||
let addr = rx.recv().unwrap();
|
||||
Server {
|
||||
|
||||
Reference in New Issue
Block a user