upgrade to hyper 0.13 ecosystem

This doesn't take much advantage of async fns so far. For example, the
with_{form,json}_body functions are still designed to be used with
future combinators when it'd be more natural to call them from async
fns now. But it's a start.

Similarly, this still uses the old version of reqwest. Small steps.

Requires Rust 1.40 now. (1.39 is a requirement of async, and 1.40 is a
requirement of http-serve 0.2.0.)
This commit is contained in:
Scott Lamb 2020-01-08 23:04:36 -08:00
parent fce0c5b014
commit 8af7bca6c2
10 changed files with 1844 additions and 1642 deletions

View File

@ -23,7 +23,7 @@ matrix:
script:
- ci/script-rust.sh
- language: rust
rust: 1.36.0
rust: 1.40.0
script:
- ci/script-rust.sh
- language: node_js

2724
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -20,7 +20,7 @@ members = ["base", "db", "ffmpeg"]
[dependencies]
base = { package = "moonfire-base", path = "base" }
base64 = "0.10.0"
bytes = "0.4.6"
bytes = "0.5.3"
byteorder = "1.0"
cstr = "0.1.7"
cursive = "0.12"
@ -28,12 +28,11 @@ db = { package = "moonfire-db", path = "db" }
docopt = "1.0"
failure = "0.1.1"
ffmpeg = { package = "moonfire-ffmpeg", path = "ffmpeg" }
futures = "0.1"
futures-cpupool = "0.1"
futures = "0.3"
fnv = "1.0"
http = "0.1.5"
http-serve = "0.1.0"
hyper = "0.12.9"
http = "0.2.0"
http-serve = "0.2.0"
hyper = "0.13.0"
lazy_static = "1.0"
libc = "0.2"
log = { version = "0.4", features = ["release_max_level_info"] }
@ -52,8 +51,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
smallvec = "0.6"
time = "0.1"
tokio = "0.1.8"
tokio-signal = "0.2"
tokio = { version = "0.2.0", features = ["blocking", "macros", "rt-threaded", "signal"] }
url = "1.4"
uuid = { version = "0.7", features = ["serde", "std", "v4"] }

View File

@ -48,7 +48,7 @@ $ sudo apt-get install \
tzdata
```
Next, you need Rust 1.36+ and Cargo. The easiest way to install them is by
Next, you need Rust 1.40+ and Cargo. The easiest way to install them is by
following the instructions at [rustup.rs](https://www.rustup.rs/).
Finally, building the UI requires [yarn](https://yarnpkg.com/en/).

View File

@ -40,7 +40,7 @@ fi
NODE_MIN_VERSION="8"
YARN_MIN_VERSION="1.0"
CARGO_MIN_VERSION="0.2"
RUSTC_MIN_VERSION="1.36"
RUSTC_MIN_VERSION="1.40"
normalizeDirPath()
{

View File

@ -32,77 +32,81 @@
use base::Error;
use futures::{Stream, stream};
use hyper::body::Payload;
use reffers::ARefs;
use reffers::ARefss;
use std::error::Error as StdError;
use std::pin::Pin;
pub struct Chunk(ARefs<'static, [u8]>);
pub struct Chunk(ARefss<'static, [u8]>);
//pub type CompatError = ::failure::Compat<Error>;
pub type BoxedError = Box<dyn StdError + Send + Sync>;
pub type BodyStream = Box<dyn Stream<Item = Chunk, Error = BoxedError> + Send + 'static>;
pub type BodyStream = Box<dyn Stream<Item = Result<Chunk, BoxedError>> + Send + Sync + 'static>;
pub fn wrap_error(e: Error) -> BoxedError {
Box::new(e.compat())
}
impl From<ARefs<'static, [u8]>> for Chunk {
fn from(r: ARefs<'static, [u8]>) -> Self { Chunk(r) }
impl From<ARefss<'static, [u8]>> for Chunk {
fn from(r: ARefss<'static, [u8]>) -> Self { Chunk(r) }
}
impl From<&'static [u8]> for Chunk {
fn from(r: &'static [u8]) -> Self { Chunk(ARefs::new(r)) }
fn from(r: &'static [u8]) -> Self { Chunk(ARefss::new(r)) }
}
impl From<&'static str> for Chunk {
fn from(r: &'static str) -> Self { Chunk(ARefs::new(r.as_bytes())) }
fn from(r: &'static str) -> Self { Chunk(ARefss::new(r.as_bytes())) }
}
impl From<String> for Chunk {
fn from(r: String) -> Self { Chunk(ARefs::new(r.into_bytes()).map(|v| &v[..])) }
fn from(r: String) -> Self { Chunk(ARefss::new(r.into_bytes()).map(|v| &v[..])) }
}
impl From<Vec<u8>> for Chunk {
fn from(r: Vec<u8>) -> Self { Chunk(ARefs::new(r).map(|v| &v[..])) }
fn from(r: Vec<u8>) -> Self { Chunk(ARefss::new(r).map(|v| &v[..])) }
}
impl ::bytes::Buf for Chunk {
fn remaining(&self) -> usize { self.0.len() }
fn bytes(&self) -> &[u8] { &*self.0 }
fn advance(&mut self, cnt: usize) {
self.0 = ::std::mem::replace(&mut self.0, ARefs::new(&[][..])).map(|b| &b[cnt..]);
self.0 = ::std::mem::replace(&mut self.0, ARefss::new(&[][..])).map(|b| &b[cnt..]);
}
}
pub struct Body(BodyStream);
pub struct Body(Pin<BodyStream>);
impl Payload for Body {
impl hyper::body::HttpBody for Body {
type Data = Chunk;
type Error = BoxedError;
fn poll_data(&mut self) -> ::futures::Poll<Option<Self::Data>, Self::Error> {
self.0.poll()
fn poll_data(self: Pin<&mut Self>, cx: &mut std::task::Context)
-> std::task::Poll<Option<Result<Self::Data, Self::Error>>> {
// This is safe because the pin is not structural.
// https://doc.rust-lang.org/std/pin/#pinning-is-not-structural-for-field
// (The field _holds_ a pin, but isn't itself pinned.)
unsafe { self.get_unchecked_mut() }.0.as_mut().poll_next(cx)
//Pin::from(self.0).as_mut().poll_next(cx)
}
fn poll_trailers(self: Pin<&mut Self>, _cx: &mut std::task::Context)
-> std::task::Poll<Result<Option<http::header::HeaderMap>, Self::Error>> {
std::task::Poll::Ready(Ok(None))
}
}
impl From<BodyStream> for Body {
fn from(b: BodyStream) -> Self { Body(b) }
fn from(b: BodyStream) -> Self { Body(Pin::from(b)) }
}
impl<C: Into<Chunk>> From<C> for Body {
fn from(c: C) -> Self {
Body(Box::new(stream::once(Ok(c.into()))))
Body(Box::pin(stream::once(futures::future::ok(c.into()))))
}
}
impl From<Error> for Body {
fn from(e: Error) -> Self {
Body(Box::new(stream::once(Err(wrap_error(e)))))
Body(Box::pin(stream::once(futures::future::err(wrap_error(e)))))
}
}
//impl<C: Into<Chunk>> From<C> for Body {
// fn from(c: C) -> Self {
// Body(Box::new(stream::once(Ok(c.into()))))
// }
//}

View File

@ -35,15 +35,16 @@ use crate::web;
use db::{dir, writer};
use failure::{Error, ResultExt, bail};
use fnv::FnvHashMap;
use futures::{Future, Stream};
use log::{error, info, warn};
use futures::future::FutureExt;
use hyper::service::{make_service_fn, service_fn};
use log::{info, warn};
use serde::Deserialize;
use std::error::Error as StdError;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use tokio;
use tokio_signal::unix::{Signal, SIGINT, SIGTERM};
use tokio::signal::unix::{SignalKind, signal};
// These are used in a hack to get the name of the current time zone (e.g. America/Los_Angeles).
// They seem to be correct for Linux and macOS at least.
@ -92,14 +93,6 @@ struct Args {
flag_trust_forward_hdrs: bool,
}
fn setup_shutdown() -> impl Future<Item = (), Error = ()> + Send {
let int = Signal::new(SIGINT).flatten_stream().into_future();
let term = Signal::new(SIGTERM).flatten_stream().into_future();
int.select(term)
.map(|_| ())
.map_err(|_| ())
}
fn trim_zoneinfo(p: &str) -> &str {
for zp in &ZONEINFO_PATHS {
if p.starts_with(zp) {
@ -173,7 +166,8 @@ struct Syncer {
join: thread::JoinHandle<()>,
}
pub fn run() -> Result<(), Error> {
#[tokio::main]
pub async fn run() -> Result<(), Error> {
let args: Args = super::parse_args(USAGE)?;
let clocks = clock::RealClocks {};
let (_db_dir, conn) = super::open_conn(
@ -274,18 +268,29 @@ pub fn run() -> Result<(), Error> {
// Start the web interface.
let addr = args.flag_http_addr.parse().unwrap();
let server = ::hyper::server::Server::bind(&addr).tcp_nodelay(true).serve(
move || Ok::<_, Box<dyn StdError + Send + Sync>>(s.clone()));
let make_svc = make_service_fn(move |_conn| {
futures::future::ok::<_, std::convert::Infallible>(service_fn({
let mut s = s.clone();
move |req| Pin::from(s.serve(req))
}))
});
let server = ::hyper::server::Server::bind(&addr)
.tcp_nodelay(true)
.serve(make_svc);
let shutdown = setup_shutdown().shared();
let mut int = signal(SignalKind::interrupt())?;
let mut term = signal(SignalKind::terminate())?;
let shutdown = futures::future::select(
Box::pin(int.recv()),
Box::pin(term.recv()));
let (shutdown_tx, shutdown_rx) = futures::channel::oneshot::channel();
let server = server.with_graceful_shutdown(shutdown_rx.map(|_| ()));
let server_handle = tokio::spawn(server);
info!("Ready to serve HTTP requests");
let reactor = ::std::thread::spawn({
let shutdown = shutdown.clone();
|| tokio::run(server.with_graceful_shutdown(shutdown.map(|_| ()))
.map_err(|e| error!("hyper error: {}", e)))
});
shutdown.wait().unwrap();
shutdown.await;
shutdown_tx.send(()).unwrap();
info!("Shutting down streamers.");
shutdown_streamers.store(true, Ordering::SeqCst);
@ -306,7 +311,7 @@ pub fn run() -> Result<(), Error> {
db.lock().clear_watches();
info!("Waiting for HTTP requests to finish.");
reactor.join().unwrap();
server_handle.await??;
info!("Exiting.");
Ok(())
}

View File

@ -91,7 +91,7 @@ use log::{debug, error, trace, warn};
use memmap;
use openssl::hash;
use parking_lot::Once;
use reffers::ARefs;
use reffers::ARefss;
use crate::slices::{self, Slices};
use smallvec::SmallVec;
use std::cell::UnsafeCell;
@ -613,7 +613,7 @@ impl Slice {
fn wrap_index<F>(&self, mp4: &File, r: Range<u64>, f: &F) -> Result<Chunk, Error>
where F: Fn(&[u8], SegmentLengths) -> &[u8] {
let mp4 = ARefs::new(mp4.0.clone());
let mp4 = ARefss::new(mp4.0.clone());
let r = r.start as usize .. r.end as usize;
let p = self.p();
Ok(mp4.try_map(|mp4| Ok::<_, Error>(&mp4.segments[p].get_index(&mp4.db, f)?[r]))?.into())
@ -630,7 +630,7 @@ impl Slice {
mp4.0.db.lock()
.with_recording_playback(s.s.id, &mut |playback| s.truns(playback, pos, len))
.err_kind(ErrorKind::Unknown)?;
let truns = ARefs::new(truns);
let truns = ARefss::new(truns);
Ok(truns.map(|t| &t[r.start as usize .. r.end as usize]).into())
}
}
@ -641,7 +641,7 @@ impl slices::Slice for Slice {
fn end(&self) -> u64 { return self.0 & 0xFF_FF_FF_FF_FF }
fn get_range(&self, f: &File, range: Range<u64>, len: u64)
-> Box<dyn Stream<Item = Self::Chunk, Error = BoxedError> + Send> {
-> Box<dyn Stream<Item = Result<Self::Chunk, BoxedError>> + Send + Sync> {
trace!("getting mp4 slice {:?}'s range {:?} / {}", self, range, len);
let p = self.p();
let res = match self.t() {
@ -651,11 +651,11 @@ impl slices::Slice for Slice {
Ok(part.into())
},
SliceType::Buf => {
let r = ARefs::new(f.0.clone());
let r = ARefss::new(f.0.clone());
Ok(r.map(|f| &f.buf[p+range.start as usize .. p+range.end as usize]).into())
},
SliceType::VideoSampleEntry => {
let r = ARefs::new(f.0.clone());
let r = ARefss::new(f.0.clone());
Ok(r.map(|f| &f.video_sample_entries[p]
.data[range.start as usize .. range.end as usize]).into())
},
@ -667,7 +667,7 @@ impl slices::Slice for Slice {
SliceType::SubtitleSampleData => f.0.get_subtitle_sample_data(p, range.clone(), len),
SliceType::Truns => self.wrap_truns(f, range.clone(), len as usize),
};
Box::new(stream::once(res
Box::new(stream::once(futures::future::ready(res
.map_err(|e| wrap_error(e))
.and_then(move |c| {
if c.remaining() != (range.end - range.start) as usize {
@ -677,7 +677,7 @@ impl slices::Slice for Slice {
self, range, c.remaining())));
}
Ok(c)
})))
}))))
}
fn get_slices(ctx: &File) -> &Slices<Self> { &ctx.0.slices }
@ -1444,7 +1444,7 @@ impl FileInner {
let r = s.s.sample_file_range();
pos += r.end - r.start;
}
Ok(ARefs::new(v).map(|v| &v[r.start as usize .. r.end as usize]).into())
Ok(ARefss::new(v).map(|v| &v[r.start as usize .. r.end as usize]).into())
}
/// Gets a `Chunk` of video sample data from disk.
@ -1470,7 +1470,7 @@ impl FileInner {
.map(&f).err_kind(ErrorKind::Internal)?
});
use core::ops::Deref;
Ok(ARefs::new(mmap).map(|m| m.deref()).into())
Ok(ARefss::new(mmap).map(|m| m.deref()).into())
}
fn get_subtitle_sample_data(&self, i: usize, r: Range<u64>, l: u64) -> Result<Chunk, Error> {
@ -1487,7 +1487,7 @@ impl FileInner {
write!(v, "{}", tm.strftime(SUBTITLE_TEMPLATE).err_kind(ErrorKind::Internal)?)
.expect("Vec write shouldn't fail");
}
Ok(ARefs::new(v).map(|v| &v[r.start as usize .. r.end as usize]).into())
Ok(ARefss::new(v).map(|v| &v[r.start as usize .. r.end as usize]).into())
}
}
@ -1512,13 +1512,13 @@ impl http_serve::Entity for File {
}
mime.extend_from_slice(b"\"");
hdrs.insert(http::header::CONTENT_TYPE,
http::header::HeaderValue::from_shared(mime.freeze()).unwrap());
http::header::HeaderValue::from_maybe_shared(mime.freeze()).unwrap());
}
fn last_modified(&self) -> Option<SystemTime> { Some(self.0.last_modified) }
fn etag(&self) -> Option<HeaderValue> { Some(self.0.etag.clone()) }
fn len(&self) -> u64 { self.0.slices.len() }
fn get_range(&self, range: Range<u64>)
-> Box<dyn Stream<Item = Self::Data, Error = Self::Error> + Send> {
-> Box<dyn Stream<Item = Result<Self::Data, Self::Error>> + Send + Sync> {
self.0.slices.get_range(self, range)
}
}
@ -1553,41 +1553,41 @@ mod tests {
use db::recording::{self, TIME_UNITS_PER_SEC};
use db::testutil::{self, TestDb, TEST_STREAM_ID};
use db::writer;
use futures::Future;
use futures::Stream as FuturesStream;
use futures::stream::TryStreamExt;
use log::info;
use openssl::hash;
use http_serve::{self, Entity};
use std::fs;
use std::ops::Range;
use std::path::Path;
use std::pin::Pin;
use std::str;
use super::*;
fn fill_slice<E: http_serve::Entity>(slice: &mut [u8], e: &E, start: u64)
async fn fill_slice<E: http_serve::Entity>(slice: &mut [u8], e: &E, start: u64)
where E::Error : ::std::fmt::Debug {
let mut p = 0;
e.get_range(start .. start + slice.len() as u64)
.for_each(|chunk| {
Pin::from(e.get_range(start .. start + slice.len() as u64))
.try_for_each(|chunk| {
let c: &[u8] = chunk.bytes();
slice[p .. p + c.len()].copy_from_slice(c);
p += c.len();
Ok::<_, E::Error>(())
futures::future::ok::<_, E::Error>(())
})
.wait()
.await
.unwrap();
}
/// Returns the SHA-1 digest of the given `Entity`.
fn digest<E: http_serve::Entity>(e: &E) -> hash::DigestBytes
async fn digest<E: http_serve::Entity>(e: &E) -> hash::DigestBytes
where E::Error : ::std::fmt::Debug {
e.get_range(0 .. e.len())
.fold(hash::Hasher::new(hash::MessageDigest::sha1()).unwrap(), |mut sha1, chunk| {
Pin::from(e.get_range(0 .. e.len()))
.try_fold(hash::Hasher::new(hash::MessageDigest::sha1()).unwrap(), |mut sha1, chunk| {
let c: &[u8] = chunk.bytes();
sha1.update(c).unwrap();
Ok::<_, E::Error>(sha1)
futures::future::ok::<_, E::Error>(sha1)
})
.wait()
.await
.unwrap()
.finish()
.unwrap()
@ -1618,14 +1618,14 @@ mod tests {
/// Pushes the box at the given position onto the stack (returning true), or returns
/// false if pos == max.
fn internal_push(&mut self, pos: u64, max: u64) -> bool {
async fn internal_push(&mut self, pos: u64, max: u64) -> bool {
if pos == max { return false; }
let mut hdr = [0u8; 16];
fill_slice(&mut hdr[..8], &self.mp4, pos);
fill_slice(&mut hdr[..8], &self.mp4, pos).await;
let (len, hdr_len, boxtype_slice) = match BigEndian::read_u32(&hdr[..4]) {
0 => (self.mp4.len() - pos, 8, &hdr[4..8]),
1 => {
fill_slice(&mut hdr[8..], &self.mp4, pos + 8);
fill_slice(&mut hdr[8..], &self.mp4, pos + 8).await;
(BigEndian::read_u64(&hdr[8..16]), 16, &hdr[4..8])
},
l => (l as u64, 8, &hdr[4..8]),
@ -1661,53 +1661,53 @@ mod tests {
/// Gets the specified byte range within the current box (excluding length and type).
/// Must not be at EOF.
pub fn get(&self, start: u64, buf: &mut [u8]) {
pub async fn get(&self, start: u64, buf: &mut [u8]) {
let interior = &self.stack.last().expect("at root").interior;
assert!(start + (buf.len() as u64) <= interior.end - interior.start,
"path={} start={} buf.len={} interior={:?}",
self.path(), start, buf.len(), interior);
fill_slice(buf, &self.mp4, start+interior.start);
fill_slice(buf, &self.mp4, start+interior.start).await;
}
pub fn get_all(&self) -> Vec<u8> {
pub async fn get_all(&self) -> Vec<u8> {
let interior = self.stack.last().expect("at root").interior.clone();
let len = (interior.end - interior.start) as usize;
trace!("get_all: start={}, len={}", interior.start, len);
let mut out = Vec::with_capacity(len);
unsafe { out.set_len(len) };
fill_slice(&mut out[..], &self.mp4, interior.start);
fill_slice(&mut out[..], &self.mp4, interior.start).await;
out
}
/// Gets the specified u32 within the current box (excluding length and type).
/// Must not be at EOF.
pub fn get_u32(&self, p: u64) -> u32 {
pub async fn get_u32(&self, p: u64) -> u32 {
let mut buf = [0u8; 4];
self.get(p, &mut buf);
self.get(p, &mut buf).await;
BigEndian::read_u32(&buf[..])
}
pub fn get_u64(&self, p: u64) -> u64 {
pub async fn get_u64(&self, p: u64) -> u64 {
let mut buf = [0u8; 8];
self.get(p, &mut buf);
self.get(p, &mut buf).await;
BigEndian::read_u64(&buf[..])
}
/// Navigates to the next box after the current one, or up if the current one is last.
pub fn next(&mut self) -> bool {
pub async fn next(&mut self) -> bool {
let old = self.stack.pop().expect("positioned at root; there is no next");
let max = self.stack.last().map(|b| b.interior.end).unwrap_or_else(|| self.mp4.len());
self.internal_push(old.interior.end, max)
self.internal_push(old.interior.end, max).await
}
/// Finds the next box of the given type after the current one, or navigates up if absent.
pub fn find(&mut self, boxtype: &[u8]) -> bool {
pub async fn find(&mut self, boxtype: &[u8]) -> bool {
trace!("looking for {}", str::from_utf8(boxtype).unwrap());
loop {
if &self.stack.last().unwrap().boxtype[..] == boxtype {
return true;
}
if !self.next() {
if !self.next().await {
return false;
}
}
@ -1717,10 +1717,11 @@ mod tests {
pub fn up(&mut self) { self.stack.pop(); }
/// Moves down the stack. Must be positioned on a box with children.
pub fn down(&mut self) {
pub async fn down(&mut self) {
let range = self.stack.last().map(|b| b.interior.clone())
.unwrap_or_else(|| 0 .. self.mp4.len());
assert!(self.internal_push(range.start, range.end), "no children in {}", self.path());
assert!(self.internal_push(range.start, range.end).await,
"no children in {}", self.path());
}
}
@ -1732,17 +1733,17 @@ mod tests {
/// Finds the `moov/trak` that has a `tkhd` associated with the given `track_id`, which must
/// exist.
fn find_track(mp4: File, track_id: u32) -> Track {
async fn find_track(mp4: File, track_id: u32) -> Track {
let mut cursor = BoxCursor::new(mp4);
cursor.down();
assert!(cursor.find(b"moov"));
cursor.down();
cursor.down().await;
assert!(cursor.find(b"moov").await);
cursor.down().await;
loop {
assert!(cursor.find(b"trak"));
cursor.down();
assert!(cursor.find(b"tkhd"));
assert!(cursor.find(b"trak").await);
cursor.down().await;
assert!(cursor.find(b"tkhd").await);
let mut version = [0u8; 1];
cursor.get(0, &mut version);
cursor.get(0, &mut version).await;
// Let id_pos be the offset after the FullBox section of the track_id.
let id_pos = match version[0] {
@ -1750,27 +1751,27 @@ mod tests {
1 => 16, // ...64-bit times...
v => panic!("unexpected tkhd version {}", v),
};
let cur_track_id = cursor.get_u32(4 + id_pos);
let cur_track_id = cursor.get_u32(4 + id_pos).await;
trace!("found moov/trak/tkhd with id {}; want {}", cur_track_id, track_id);
if cur_track_id == track_id {
break;
}
cursor.up();
assert!(cursor.next());
assert!(cursor.next().await);
}
let edts_cursor;
if cursor.find(b"edts") {
if cursor.find(b"edts").await {
edts_cursor = Some(cursor.clone());
cursor.up();
} else {
edts_cursor = None;
};
cursor.down();
assert!(cursor.find(b"mdia"));
cursor.down();
assert!(cursor.find(b"minf"));
cursor.down();
assert!(cursor.find(b"stbl"));
cursor.down().await;
assert!(cursor.find(b"mdia").await);
cursor.down().await;
assert!(cursor.find(b"minf").await);
cursor.down().await;
assert!(cursor.find(b"stbl").await);
Track{
edts_cursor: edts_cursor,
stbl_cursor: cursor,
@ -1833,17 +1834,16 @@ mod tests {
builder.build(tdb.db.clone(), tdb.dirs_by_stream_id.clone()).unwrap()
}
fn write_mp4(mp4: &File, dir: &Path) -> String {
async fn write_mp4(mp4: &File, dir: &Path) -> String {
let mut filename = dir.to_path_buf();
filename.push("clip.new.mp4");
let mut out = fs::OpenOptions::new().write(true).create_new(true).open(&filename).unwrap();
use ::std::io::Write;
mp4.get_range(0 .. mp4.len())
.for_each(|chunk| {
out.write_all(chunk.bytes())?;
Ok(())
Pin::from(mp4.get_range(0 .. mp4.len()))
.try_for_each(|chunk| {
futures::future::ready(out.write_all(chunk.bytes()).map_err(|e| e.into()))
})
.wait()
.await
.unwrap();
info!("wrote {:?}", filename);
filename.to_str().unwrap().to_string()
@ -1909,8 +1909,8 @@ mod tests {
}
/// Tests sample table for a simple video index of all sync frames.
#[test]
fn test_all_sync_frames() {
#[tokio::test]
async fn test_all_sync_frames() {
testutil::init();
let db = TestDb::new(RealClocks {});
let mut r = db::RecordingToInsert::default();
@ -1923,12 +1923,12 @@ mod tests {
// Time range [2, 2+4+6+8) means the 2nd, 3rd, and 4th samples should be included.
let mp4 = make_mp4_from_encoders(Type::Normal, &db, vec![r], 2 .. 2+4+6+8).unwrap();
let track = find_track(mp4, 1);
let track = find_track(mp4, 1).await;
assert!(track.edts_cursor.is_none());
let mut cursor = track.stbl_cursor;
cursor.down();
cursor.find(b"stts");
assert_eq!(cursor.get_all(), &[
cursor.down().await;
cursor.find(b"stts").await;
assert_eq!(cursor.get_all().await, &[
0x00, 0x00, 0x00, 0x00, // version + flags
0x00, 0x00, 0x00, 0x03, // entry_count
@ -1938,8 +1938,8 @@ mod tests {
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08,
]);
cursor.find(b"stsz");
assert_eq!(cursor.get_all(), &[
cursor.find(b"stsz").await;
assert_eq!(cursor.get_all().await, &[
0x00, 0x00, 0x00, 0x00, // version + flags
0x00, 0x00, 0x00, 0x00, // sample_size
0x00, 0x00, 0x00, 0x03, // sample_count
@ -1950,8 +1950,8 @@ mod tests {
0x00, 0x00, 0x00, 0x0c,
]);
cursor.find(b"stss");
assert_eq!(cursor.get_all(), &[
cursor.find(b"stss").await;
assert_eq!(cursor.get_all().await, &[
0x00, 0x00, 0x00, 0x00, // version + flags
0x00, 0x00, 0x00, 0x03, // entry_count
@ -1963,8 +1963,8 @@ mod tests {
}
/// Tests sample table and edit list for a video index with half sync frames.
#[test]
fn test_half_sync_frames() {
#[tokio::test]
async fn test_half_sync_frames() {
testutil::init();
let db = TestDb::new(RealClocks {});
let mut r = db::RecordingToInsert::default();
@ -1978,13 +1978,13 @@ mod tests {
// Time range [2+4+6, 2+4+6+8) means the 4th sample should be included.
// The 3rd gets pulled in also because it's a sync frame and the 4th isn't.
let mp4 = make_mp4_from_encoders(Type::Normal, &db, vec![r], 2+4+6 .. 2+4+6+8).unwrap();
let track = find_track(mp4, 1);
let track = find_track(mp4, 1).await;
// Examine edts. It should skip the 3rd frame.
let mut cursor = track.edts_cursor.unwrap();
cursor.down();
cursor.find(b"elst");
assert_eq!(cursor.get_all(), &[
cursor.down().await;
cursor.find(b"elst").await;
assert_eq!(cursor.get_all().await, &[
0x01, 0x00, 0x00, 0x00, // version + flags
0x00, 0x00, 0x00, 0x01, // length
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, // segment_duration
@ -1994,9 +1994,9 @@ mod tests {
// Examine stbl.
let mut cursor = track.stbl_cursor;
cursor.down();
cursor.find(b"stts");
assert_eq!(cursor.get_all(), &[
cursor.down().await;
cursor.find(b"stts").await;
assert_eq!(cursor.get_all().await, &[
0x00, 0x00, 0x00, 0x00, // version + flags
0x00, 0x00, 0x00, 0x02, // entry_count
@ -2005,8 +2005,8 @@ mod tests {
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08,
]);
cursor.find(b"stsz");
assert_eq!(cursor.get_all(), &[
cursor.find(b"stsz").await;
assert_eq!(cursor.get_all().await, &[
0x00, 0x00, 0x00, 0x00, // version + flags
0x00, 0x00, 0x00, 0x00, // sample_size
0x00, 0x00, 0x00, 0x02, // sample_count
@ -2016,8 +2016,8 @@ mod tests {
0x00, 0x00, 0x00, 0x0c,
]);
cursor.find(b"stss");
assert_eq!(cursor.get_all(), &[
cursor.find(b"stss").await;
assert_eq!(cursor.get_all().await, &[
0x00, 0x00, 0x00, 0x00, // version + flags
0x00, 0x00, 0x00, 0x01, // entry_count
@ -2026,16 +2026,16 @@ mod tests {
]);
}
#[test]
fn test_no_segments() {
#[tokio::test]
async fn test_no_segments() {
testutil::init();
let db = TestDb::new(RealClocks {});
let e = make_mp4_from_encoders(Type::Normal, &db, vec![], 0 .. 0).err().unwrap();
assert_eq!(e.to_string(), "Invalid argument: no video_sample_entries");
}
#[test]
fn test_multi_segment() {
#[tokio::test]
async fn test_multi_segment() {
testutil::init();
let db = TestDb::new(RealClocks {});
let mut encoders = Vec::new();
@ -2054,25 +2054,25 @@ mod tests {
// This should include samples 3 and 4 only, both sync frames.
let mp4 = make_mp4_from_encoders(Type::Normal, &db, encoders, 1+2 .. 1+2+3+4).unwrap();
let mut cursor = BoxCursor::new(mp4);
cursor.down();
assert!(cursor.find(b"moov"));
cursor.down();
assert!(cursor.find(b"trak"));
cursor.down();
assert!(cursor.find(b"mdia"));
cursor.down();
assert!(cursor.find(b"minf"));
cursor.down();
assert!(cursor.find(b"stbl"));
cursor.down();
assert!(cursor.find(b"stss"));
assert_eq!(cursor.get_u32(4), 2); // entry_count
assert_eq!(cursor.get_u32(8), 1);
assert_eq!(cursor.get_u32(12), 2);
cursor.down().await;
assert!(cursor.find(b"moov").await);
cursor.down().await;
assert!(cursor.find(b"trak").await);
cursor.down().await;
assert!(cursor.find(b"mdia").await);
cursor.down().await;
assert!(cursor.find(b"minf").await);
cursor.down().await;
assert!(cursor.find(b"stbl").await);
cursor.down().await;
assert!(cursor.find(b"stss").await);
assert_eq!(cursor.get_u32(4).await, 2); // entry_count
assert_eq!(cursor.get_u32(8).await, 1);
assert_eq!(cursor.get_u32(12).await, 2);
}
#[test]
fn test_zero_duration_recording() {
#[tokio::test]
async fn test_zero_duration_recording() {
testutil::init();
let db = TestDb::new(RealClocks {});
let mut encoders = Vec::new();
@ -2088,17 +2088,17 @@ mod tests {
// Multi-segment recording with an edit list, encoding with a zero-duration recording.
let mp4 = make_mp4_from_encoders(Type::Normal, &db, encoders, 1 .. 2+3).unwrap();
let track = find_track(mp4, 1);
let track = find_track(mp4, 1).await;
let mut cursor = track.edts_cursor.unwrap();
cursor.down();
cursor.find(b"elst");
assert_eq!(cursor.get_u32(4), 1); // entry_count
assert_eq!(cursor.get_u64(8), 4); // segment_duration
assert_eq!(cursor.get_u64(16), 1); // media_time
cursor.down().await;
cursor.find(b"elst").await;
assert_eq!(cursor.get_u32(4).await, 1); // entry_count
assert_eq!(cursor.get_u64(8).await, 4); // segment_duration
assert_eq!(cursor.get_u64(16).await, 1); // media_time
}
#[test]
fn test_media_segment() {
#[tokio::test]
async fn test_media_segment() {
testutil::init();
let db = TestDb::new(RealClocks {});
let mut r = db::RecordingToInsert::default();
@ -2114,45 +2114,45 @@ mod tests {
let mp4 = make_mp4_from_encoders(Type::MediaSegment, &db, vec![r],
2+4+6 .. 2+4+6+8+1).unwrap();
let mut cursor = BoxCursor::new(mp4);
cursor.down();
cursor.down().await;
let mut mdat = cursor.clone();
assert!(mdat.find(b"mdat"));
assert!(mdat.find(b"mdat").await);
assert!(cursor.find(b"moof"));
cursor.down();
assert!(cursor.find(b"traf"));
cursor.down();
assert!(cursor.find(b"trun"));
assert_eq!(cursor.get_u32(4), 2);
assert_eq!(cursor.get_u32(8) as u64, mdat.interior().start);
assert_eq!(cursor.get_u32(12), 174063616); // first_sample_flags
assert_eq!(cursor.get_u32(16), 6); // sample duration
assert_eq!(cursor.get_u32(20), 9); // sample size
assert_eq!(cursor.get_u32(24), 8); // sample duration
assert_eq!(cursor.get_u32(28), 12); // sample size
assert!(cursor.next());
assert!(cursor.find(b"moof").await);
cursor.down().await;
assert!(cursor.find(b"traf").await);
cursor.down().await;
assert!(cursor.find(b"trun").await);
assert_eq!(cursor.get_u32(4).await, 2);
assert_eq!(cursor.get_u32(8).await as u64, mdat.interior().start);
assert_eq!(cursor.get_u32(12).await, 174063616); // first_sample_flags
assert_eq!(cursor.get_u32(16).await, 6); // sample duration
assert_eq!(cursor.get_u32(20).await, 9); // sample size
assert_eq!(cursor.get_u32(24).await, 8); // sample duration
assert_eq!(cursor.get_u32(28).await, 12); // sample size
assert!(cursor.next().await);
assert_eq!(cursor.name(), "trun");
assert_eq!(cursor.get_u32(4), 1);
assert_eq!(cursor.get_u32(8) as u64, mdat.interior().start + 9 + 12);
assert_eq!(cursor.get_u32(12), 174063616); // first_sample_flags
assert_eq!(cursor.get_u32(16), 1); // sample duration
assert_eq!(cursor.get_u32(20), 15); // sample size
assert_eq!(cursor.get_u32(4).await, 1);
assert_eq!(cursor.get_u32(8).await as u64, mdat.interior().start + 9 + 12);
assert_eq!(cursor.get_u32(12).await, 174063616); // first_sample_flags
assert_eq!(cursor.get_u32(16).await, 1); // sample duration
assert_eq!(cursor.get_u32(20).await, 15); // sample size
}
#[test]
fn test_round_trip() {
#[tokio::test]
async fn test_round_trip() {
testutil::init();
let db = TestDb::new(RealClocks {});
copy_mp4_to_db(&db);
let mp4 = create_mp4_from_db(&db, 0, 0, false);
let new_filename = write_mp4(&mp4, db.tmpdir.path());
let new_filename = write_mp4(&mp4, db.tmpdir.path()).await;
compare_mp4s(&new_filename, 0, 0);
// Test the metadata. This is brittle, which is the point. Any time the digest comparison
// here fails, it can be updated, but the etag must change as well! Otherwise clients may
// combine ranges from the new format with ranges from the old format.
let sha1 = digest(&mp4);
let sha1 = digest(&mp4).await;
assert_eq!("17376879bcf872dd4ad1197225a32d5473fb0dc6", strutil::hex(&sha1[..]));
const EXPECTED_ETAG: &'static str = "\"953dcf1a61debe785d5dec3ae2d3992a819b68ae\"";
assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag());
@ -2161,19 +2161,19 @@ mod tests {
db.syncer_join.join().unwrap();
}
#[test]
fn test_round_trip_with_subtitles() {
#[tokio::test]
async fn test_round_trip_with_subtitles() {
testutil::init();
let db = TestDb::new(RealClocks {});
copy_mp4_to_db(&db);
let mp4 = create_mp4_from_db(&db, 0, 0, true);
let new_filename = write_mp4(&mp4, db.tmpdir.path());
let new_filename = write_mp4(&mp4, db.tmpdir.path()).await;
compare_mp4s(&new_filename, 0, 0);
// Test the metadata. This is brittle, which is the point. Any time the digest comparison
// here fails, it can be updated, but the etag must change as well! Otherwise clients may
// combine ranges from the new format with ranges from the old format.
let sha1 = digest(&mp4);
let sha1 = digest(&mp4).await;
assert_eq!("1cd90e0b49747cc54c953153d6709f2fb5df6b14", strutil::hex(&sha1[..]));
const EXPECTED_ETAG: &'static str = "\"736655313f10747528a663190517620cdffea6d0\"";
assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag());
@ -2182,19 +2182,19 @@ mod tests {
db.syncer_join.join().unwrap();
}
#[test]
fn test_round_trip_with_edit_list() {
#[tokio::test]
async fn test_round_trip_with_edit_list() {
testutil::init();
let db = TestDb::new(RealClocks {});
copy_mp4_to_db(&db);
let mp4 = create_mp4_from_db(&db, 1, 0, false);
let new_filename = write_mp4(&mp4, db.tmpdir.path());
let new_filename = write_mp4(&mp4, db.tmpdir.path()).await;
compare_mp4s(&new_filename, 1, 0);
// Test the metadata. This is brittle, which is the point. Any time the digest comparison
// here fails, it can be updated, but the etag must change as well! Otherwise clients may
// combine ranges from the new format with ranges from the old format.
let sha1 = digest(&mp4);
let sha1 = digest(&mp4).await;
assert_eq!("49893e3997da6bc625a04b09abf4b1ddbe0bc85d", strutil::hex(&sha1[..]));
const EXPECTED_ETAG: &'static str = "\"e87ed99dea31b7c4d1e9186045abaf5ac3c2d2f8\"";
assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag());
@ -2203,19 +2203,19 @@ mod tests {
db.syncer_join.join().unwrap();
}
#[test]
fn test_round_trip_with_shorten() {
#[tokio::test]
async fn test_round_trip_with_shorten() {
testutil::init();
let db = TestDb::new(RealClocks {});
copy_mp4_to_db(&db);
let mp4 = create_mp4_from_db(&db, 0, 1, false);
let new_filename = write_mp4(&mp4, db.tmpdir.path());
let new_filename = write_mp4(&mp4, db.tmpdir.path()).await;
compare_mp4s(&new_filename, 0, 1);
// Test the metadata. This is brittle, which is the point. Any time the digest comparison
// here fails, it can be updated, but the etag must change as well! Otherwise clients may
// combine ranges from the new format with ranges from the old format.
let sha1 = digest(&mp4);
let sha1 = digest(&mp4).await;
assert_eq!("0615feaa3c50a7889fb0e6842de3bd3d3143bc78", strutil::hex(&sha1[..]));
const EXPECTED_ETAG: &'static str = "\"6f0d21a6027b0e444f404a68527dbf5c9a5c1a26\"";
assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag());
@ -2232,12 +2232,10 @@ mod bench {
use base::clock::RealClocks;
use db::recording;
use db::testutil::{self, TestDb};
use futures::{Future, future};
use futures::future;
use hyper;
use http::header;
use http_serve;
use lazy_static::lazy_static;
use std::error::Error as StdError;
use super::tests::create_mp4_from_db;
use url::Url;
@ -2259,17 +2257,24 @@ mod bench {
testutil::add_dummy_recordings_to_db(&db.db, 60);
let mp4 = create_mp4_from_db(&db, 0, 0, false);
let p = mp4.0.initial_sample_byte_pos;
let (tx, rx) = ::std::sync::mpsc::channel();
::std::thread::spawn(move || {
let addr = "127.0.0.1:0".parse().unwrap();
let server = hyper::server::Server::bind(&addr)
.tcp_nodelay(true)
.serve(move || Ok::<_, Box<dyn StdError + Send + Sync>>(
MyService(mp4.clone())));
tx.send(server.local_addr()).unwrap();
::tokio::run(server.map_err(|e| panic!(e)));
let make_svc = hyper::service::make_service_fn(move |_conn| {
future::ok::<_, std::convert::Infallible>(hyper::service::service_fn({
let mp4 = mp4.clone();
move |req| future::ok::<hyper::Response<crate::body::Body>, hyper::Error>(
http_serve::serve(mp4.clone(), &req))
}))
});
let mut rt = tokio::runtime::Runtime::new().unwrap();
let srv = rt.enter(|| {
let addr = ([127, 0, 0, 1], 0).into();
hyper::server::Server::bind(&addr)
.tcp_nodelay(true)
.serve(make_svc)
});
let addr = srv.local_addr(); // resolve port 0 to a real ephemeral port number.
::std::thread::spawn(move || {
rt.block_on(srv).unwrap();
});
let addr = rx.recv().unwrap();
BenchServer {
url: Url::parse(&format!("http://{}:{}/", addr.ip(), addr.port())).unwrap(),
generated_len: p,
@ -2277,19 +2282,6 @@ mod bench {
}
}
struct MyService(super::File);
impl hyper::service::Service for MyService {
type ReqBody = hyper::Body;
type ResBody = crate::body::Body;
type Error = crate::body::BoxedError;
type Future = future::FutureResult<::http::Response<Self::ResBody>, Self::Error>;
fn call(&mut self, req: ::http::Request<Self::ReqBody>) -> Self::Future {
future::ok(http_serve::serve(self.0.clone(), &req))
}
}
lazy_static! {
static ref SERVER: BenchServer = { BenchServer::new() };
}
@ -2332,7 +2324,7 @@ mod bench {
let mut run = || {
let mut resp =
client.get(server.url.clone())
.header(header::RANGE, format!("bytes=0-{}", p - 1))
.header(reqwest::header::RANGE, format!("bytes=0-{}", p - 1))
.send()
.unwrap();
buf.clear();

View File

@ -33,15 +33,16 @@
use base::format_err_t;
use crate::body::{BoxedError, wrap_error};
use failure::{Error, bail};
use futures::{Stream, stream};
use futures::{Stream, stream, stream::StreamExt};
use std::fmt;
use std::ops::Range;
use std::pin::Pin;
/// Gets a byte range given a context argument.
/// Each `Slice` instance belongs to a single `Slices`.
pub trait Slice : fmt::Debug + Sized + Sync + 'static {
type Ctx: Send + Clone;
type Chunk: Send;
type Ctx: Send + Sync + Clone;
type Chunk: Send + Sync;
/// The byte position (relative to the start of the `Slices`) of the end of this slice,
/// exclusive. Note the starting position (and thus length) are inferred from the previous
@ -52,7 +53,7 @@ pub trait Slice : fmt::Debug + Sized + Sync + 'static {
/// The additional argument `ctx` is as supplied to the `Slices`.
/// The additional argument `l` is the length of this slice, as determined by the `Slices`.
fn get_range(&self, ctx: &Self::Ctx, r: Range<u64>, len: u64)
-> Box<dyn Stream<Item = Self::Chunk, Error = BoxedError> + Send>;
-> Box<dyn Stream<Item = Result<Self::Chunk, BoxedError>> + Sync + Send>;
fn get_slices(ctx: &Self::Ctx) -> &Slices<Self>;
}
@ -111,9 +112,9 @@ impl<S> Slices<S> where S: Slice {
/// Writes `range` to `out`.
/// This interface mirrors `http_serve::Entity::write_to`, with the additional `ctx` argument.
pub fn get_range(&self, ctx: &S::Ctx, range: Range<u64>)
-> Box<dyn Stream<Item = S::Chunk, Error = BoxedError> + Send> {
-> Box<dyn Stream<Item = Result<S::Chunk, BoxedError>> + Sync + Send> {
if range.start > range.end || range.end > self.len {
return Box::new(stream::once(Err(wrap_error(format_err_t!(
return Box::new(stream::once(futures::future::err(wrap_error(format_err_t!(
Internal, "Bad range {:?} for slice of length {}", range, self.len)))));
}
@ -133,15 +134,15 @@ impl<S> Slices<S> where S: Slice {
let (body, min_end);
{
let self_ = S::get_slices(&c);
if i == self_.slices.len() { return None }
if i == self_.slices.len() { return futures::future::ready(None) }
let s = &self_.slices[i];
if range.end == slice_start + start_pos { return None }
if range.end == slice_start + start_pos { return futures::future::ready(None) }
let s_end = s.end();
min_end = ::std::cmp::min(range.end, s_end);
let l = s_end - slice_start;
body = s.get_range(&c, start_pos .. min_end - slice_start, l);
};
Some(Ok::<_, BoxedError>((body, (c, i+1, 0, min_end))))
futures::future::ready(Some((Pin::from(body), (c, i+1, 0, min_end))))
});
Box::new(bodies.flatten())
}
@ -151,10 +152,10 @@ impl<S> Slices<S> where S: Slice {
mod tests {
use crate::body::BoxedError;
use db::testutil;
use futures::{Future, Stream};
use futures::stream;
use futures::stream::{self, Stream, TryStreamExt};
use lazy_static::lazy_static;
use std::ops::Range;
use std::pin::Pin;
use super::{Slice, Slices};
#[derive(Debug, Eq, PartialEq)]
@ -176,8 +177,8 @@ mod tests {
fn end(&self) -> u64 { self.end }
fn get_range(&self, _ctx: &&'static Slices<FakeSlice>, r: Range<u64>, _l: u64)
-> Box<dyn Stream<Item = FakeChunk, Error = BoxedError> + Send> {
Box::new(stream::once(Ok(FakeChunk{slice: self.name, range: r})))
-> Box<dyn Stream<Item = Result<FakeChunk, BoxedError>> + Send + Sync> {
Box::new(stream::once(futures::future::ok(FakeChunk{slice: self.name, range: r})))
}
fn get_slices(ctx: &&'static Slices<FakeSlice>) -> &'static Slices<Self> { *ctx }
@ -195,33 +196,37 @@ mod tests {
};
}
async fn get_range(r: Range<u64>) -> Vec<FakeChunk> {
Pin::from(SLICES.get_range(&&*SLICES, r)).try_collect().await.unwrap()
}
#[test]
pub fn size() {
testutil::init();
assert_eq!(5 + 13 + 7 + 17 + 19, SLICES.len());
}
#[test]
pub fn exact_slice() {
#[tokio::test]
pub async fn exact_slice() {
// Test writing exactly slice b.
testutil::init();
let out = SLICES.get_range(&&*SLICES, 5 .. 18).collect().wait().unwrap();
let out = get_range(5 .. 18).await;
assert_eq!(&[FakeChunk{slice: "b", range: 0 .. 13}], &out[..]);
}
#[test]
pub fn offset_first() {
#[tokio::test]
pub async fn offset_first() {
// Test writing part of slice a.
testutil::init();
let out = SLICES.get_range(&&*SLICES, 1 .. 3).collect().wait().unwrap();
let out = get_range(1 .. 3).await;
assert_eq!(&[FakeChunk{slice: "a", range: 1 .. 3}], &out[..]);
}
#[test]
pub fn offset_mid() {
#[tokio::test]
pub async fn offset_mid() {
// Test writing part of slice b, all of slice c, and part of slice d.
testutil::init();
let out = SLICES.get_range(&&*SLICES, 17 .. 26).collect().wait().unwrap();
let out = get_range(17 .. 26).await;
assert_eq!(&[
FakeChunk{slice: "b", range: 12 .. 13},
FakeChunk{slice: "c", range: 0 .. 7},
@ -229,11 +234,11 @@ mod tests {
], &out[..]);
}
#[test]
pub fn everything() {
#[tokio::test]
pub async fn everything() {
// Test writing the whole Slices.
testutil::init();
let out = SLICES.get_range(&&*SLICES, 0 .. 61).collect().wait().unwrap();
let out = get_range(0 .. 61).await;
assert_eq!(&[
FakeChunk{slice: "a", range: 0 .. 5},
FakeChunk{slice: "b", range: 0 .. 13},
@ -243,10 +248,10 @@ mod tests {
], &out[..]);
}
#[test]
pub fn at_end() {
#[tokio::test]
pub async fn at_end() {
testutil::init();
let out = SLICES.get_range(&&*SLICES, 61 .. 61).collect().wait().unwrap();
let out = get_range(61 .. 61).await;
let empty: &[FakeChunk] = &[];
assert_eq!(empty, &out[..]);
}

View File

@ -30,6 +30,7 @@
use base::clock::Clocks;
use base::{ErrorKind, ResultExt, bail_t, strutil};
use bytes::Bytes;
use crate::body::{Body, BoxedError};
use crate::json;
use crate::mp4;
@ -41,8 +42,8 @@ use db::{auth, recording};
use db::dir::SampleFileDir;
use failure::{Error, bail, format_err};
use fnv::FnvHashMap;
use futures::{Future, Stream, future};
use futures_cpupool;
use futures::future::{self, Future, TryFutureExt};
use futures::stream::{Stream, StreamExt, TryStreamExt};
use http::{Request, Response, status::StatusCode};
use http_serve;
use http::header::{self, HeaderValue};
@ -56,6 +57,7 @@ use std::fs;
use std::net::IpAddr;
use std::ops::Range;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use url::form_urlencoded;
use uuid::Uuid;
@ -68,6 +70,9 @@ lazy_static! {
Regex::new(r"^(\d+)(-\d+)?(@\d+)?(?:\.(\d+)?-(\d+)?)?$").unwrap();
}
type BoxedFuture = Box<dyn Future<Output = Result<Response<Body>, BoxedError>> +
Sync + Send + 'static>;
#[derive(Debug, Eq, PartialEq)]
enum Path {
TopLevel, // "/api/"
@ -254,7 +259,6 @@ struct ServiceInner {
db: Arc<db::Database>,
dirs_by_stream_id: Arc<FnvHashMap<i32, Arc<SampleFileDir>>>,
ui_files: HashMap<String, UiFile>,
pool: futures_cpupool::CpuPool,
time_zone_name: String,
allow_unauthenticated_permissions: Option<db::Permissions>,
trust_forward_hdrs: bool,
@ -505,11 +509,12 @@ impl ServiceInner {
fn static_file(&self, req: &Request<::hyper::Body>, path: &str) -> ResponseResult {
let s = self.ui_files.get(path).ok_or_else(|| not_found("no such static file"))?;
let f = fs::File::open(&s.path).map_err(internal_server_err)?;
let f = tokio::task::block_in_place(move || {
fs::File::open(&s.path).map_err(internal_server_err)
})?;
let mut hdrs = http::HeaderMap::new();
hdrs.insert(header::CONTENT_TYPE, s.mime.clone());
let e = http_serve::ChunkedReadFile::new(f, Some(self.pool.clone()), hdrs)
.map_err(internal_server_err)?;
let e = http_serve::ChunkedReadFile::new(f, hdrs).map_err(internal_server_err)?;
Ok(http_serve::serve(e, &req))
}
@ -552,7 +557,7 @@ impl ServiceInner {
.unwrap_or(false)
}
fn login(&self, req: &Request<::hyper::Body>, body: hyper::Chunk) -> ResponseResult {
fn login(&self, req: &Request<::hyper::Body>, body: Bytes) -> ResponseResult {
let mut username = None;
let mut password = None;
for (key, value) in form_urlencoded::parse(&body) {
@ -583,23 +588,24 @@ impl ServiceInner {
flags)
.map_err(|e| plain_response(StatusCode::UNAUTHORIZED, e.to_string()))?;
let s_suffix = if is_secure {
"; HttpOnly; Secure; SameSite=Strict; Max-Age=2147483648; Path=/"
&b"; HttpOnly; Secure; SameSite=Strict; Max-Age=2147483648; Path=/"[..]
} else {
"; HttpOnly; SameSite=Strict; Max-Age=2147483648; Path=/"
&b"; HttpOnly; SameSite=Strict; Max-Age=2147483648; Path=/"[..]
};
let mut encoded = [0u8; 64];
base64::encode_config_slice(&sid, base64::STANDARD_NO_PAD, &mut encoded);
let mut cookie = BytesMut::with_capacity("s=".len() + encoded.len() + s_suffix.len());
cookie.put("s=");
cookie.put(&b"s="[..]);
cookie.put(&encoded[..]);
cookie.put(s_suffix);
Ok(Response::builder()
.header(header::SET_COOKIE, cookie.freeze())
.header(header::SET_COOKIE, HeaderValue::from_maybe_shared(cookie.freeze())
.expect("cookie can't have invalid bytes"))
.status(StatusCode::NO_CONTENT)
.body(b""[..].into()).unwrap())
}
fn logout(&self, req: &Request<hyper::Body>, body: hyper::Chunk) -> ResponseResult {
fn logout(&self, req: &Request<hyper::Body>, body: Bytes) -> ResponseResult {
// Parse parameters.
let mut csrf = None;
for (key, value) in form_urlencoded::parse(&body) {
@ -649,7 +655,7 @@ impl ServiceInner {
Ok(res)
}
fn post_signals(&self, req: &Request<hyper::Body>, caller: Caller, body: hyper::Chunk)
fn post_signals(&self, req: &Request<hyper::Body>, caller: Caller, body: Bytes)
-> ResponseResult {
if !caller.permissions.update_signals {
return Err(plain_response(StatusCode::UNAUTHORIZED, "update_signals required"));
@ -769,13 +775,10 @@ fn extract_sid(req: &Request<hyper::Body>) -> Option<auth::RawSessionId> {
/// `application/x-www-form-urlencoded`, returns an appropriate error response instead.
///
/// Use with `and_then` to chain logic which consumes the form body.
fn with_form_body(mut req: Request<hyper::Body>)
-> Box<dyn Future<Item = (Request<hyper::Body>, hyper::Chunk),
Error = Response<Body>> +
Send + 'static> {
async fn with_form_body(mut req: Request<hyper::Body>)
-> Result<(Request<hyper::Body>, Bytes), Response<Body>> {
if *req.method() != http::method::Method::POST {
return Box::new(future::err(plain_response(StatusCode::METHOD_NOT_ALLOWED,
"POST expected")));
return Err(plain_response(StatusCode::METHOD_NOT_ALLOWED, "POST expected"));
}
let correct_mime_type = match req.headers().get(header::CONTENT_TYPE) {
Some(t) if t == "application/x-www-form-urlencoded" => true,
@ -783,25 +786,21 @@ fn with_form_body(mut req: Request<hyper::Body>)
_ => false,
};
if !correct_mime_type {
return Box::new(future::err(bad_req(
"expected application/x-www-form-urlencoded request body")));
return Err(bad_req("expected application/x-www-form-urlencoded request body"));
}
let b = ::std::mem::replace(req.body_mut(), hyper::Body::empty());
Box::new(b.concat2()
.map(|b| (req, b))
.map_err(|e| internal_server_err(format_err!("unable to read request body: {}",
e))))
match hyper::body::to_bytes(b).await {
Ok(b) => Ok((req, b)),
Err(e) => Err(internal_server_err(format_err!("unable to read request body: {}", e))),
}
}
// TODO: remove redundancy with above. Probably better to just always expect requests in json
// format rather than using the form style for login/logout.
fn with_json_body(mut req: Request<hyper::Body>)
-> Box<dyn Future<Item = (Request<hyper::Body>, hyper::Chunk),
Error = Response<Body>> +
Send + 'static> {
async fn with_json_body(mut req: Request<hyper::Body>)
-> Result<(Request<hyper::Body>, Bytes), Response<Body>> {
if *req.method() != http::method::Method::POST {
return Box::new(future::err(plain_response(StatusCode::METHOD_NOT_ALLOWED,
"POST expected")));
return Err(plain_response(StatusCode::METHOD_NOT_ALLOWED, "POST expected"));
}
let correct_mime_type = match req.headers().get(header::CONTENT_TYPE) {
Some(t) if t == "application/json" => true,
@ -809,14 +808,13 @@ fn with_json_body(mut req: Request<hyper::Body>)
_ => false,
};
if !correct_mime_type {
return Box::new(future::err(bad_req(
"expected application/json request body")));
return Err(bad_req("expected application/json request body"));
}
let b = ::std::mem::replace(req.body_mut(), hyper::Body::empty());
Box::new(b.concat2()
.map(|b| (req, b))
.map_err(|e| internal_server_err(format_err!("unable to read request body: {}",
e))))
match hyper::body::to_bytes(b).await {
Ok(b) => Ok((req, b)),
Err(e) => Err(internal_server_err(format_err!("unable to read request body: {}", e))),
}
}
@ -859,7 +857,6 @@ impl Service {
db: config.db,
dirs_by_stream_id,
ui_files,
pool: futures_cpupool::Builder::new().pool_size(1).name_prefix("static").create(),
allow_unauthenticated_permissions: config.allow_unauthenticated_permissions,
trust_forward_hdrs: config.trust_forward_hdrs,
time_zone_name: config.time_zone_name,
@ -914,7 +911,7 @@ impl Service {
}
let stream_id;
let open_id;
let (sub_tx, sub_rx) = futures::sync::mpsc::unbounded();
let (sub_tx, sub_rx) = futures::channel::mpsc::unbounded();
{
let mut db = self.0.db.lock();
open_id = match db.open {
@ -934,9 +931,8 @@ impl Service {
.expect("stream_id refed by camera");
}
let inner = self.0.clone();
let body: crate::body::BodyStream = Box::new(sub_rx
.map_err(|()| unreachable!())
.and_then(move |live| -> Result<_, base::Error> {
let body = sub_rx
.map(move |live| -> Result<_, base::Error> {
let mut builder = mp4::FileBuilder::new(mp4::Type::MediaSegment);
let mut vse_id = None;
{
@ -960,7 +956,6 @@ impl Service {
let mp4 = builder.build(inner.db.clone(), inner.dirs_by_stream_id.clone())?;
let mut hdrs = http::header::HeaderMap::new();
mp4.add_headers(&mut hdrs);
//Ok(format!("{:?}\n\n", mp4).into())
let mime_type = hdrs.get(http::header::CONTENT_TYPE).unwrap();
let len = mp4.len();
use futures::stream::once;
@ -977,16 +972,17 @@ impl Service {
live.off_90k.start,
live.off_90k.end,
&vse_id);
let v: Vec<crate::body::BodyStream> = vec![
Box::new(once(Ok(hdr.into()))),
mp4.get_range(0 .. len),
Box::new(once(Ok("\r\n\r\n".into())))
let v: Vec<Pin<crate::body::BodyStream>> = vec![
Box::pin(once(futures::future::ok(hdr.into()))),
Pin::from(mp4.get_range(0 .. len)),
Box::pin(once(futures::future::ok("\r\n\r\n".into())))
];
Ok(futures::stream::iter_ok::<_, crate::body::BoxedError>(v))
})
.map_err(|e| Box::new(e.compat()))
.flatten()
.flatten());
Ok(futures::stream::iter(v).flatten())
});
let body = body.map_err::<BoxedError, _>(|e| Box::new(e.compat()));
let _: &dyn Stream<Item = Result<_, BoxedError>> = &body;
let body = body.try_flatten();
let body: crate::body::BodyStream = Box::new(body);
let body: Body = body.into();
Ok(http::Response::builder()
.header("X-Open-Id", open_id.to_string())
@ -996,32 +992,24 @@ impl Service {
}
fn signals(&self, req: Request<hyper::Body>, caller: Caller)
-> Box<dyn Future<Item = Response<Body>, Error = Response<Body>> + Send + 'static> {
-> Box<dyn Future<Output = Result<Response<Body>, Response<Body>>> + Send + Sync + 'static> {
use http::method::Method;
match *req.method() {
Method::POST => Box::new(with_json_body(req)
.and_then({
let s = self.0.clone();
move |(req, b)| s.post_signals(&req, caller, b)
move |(req, b)| future::ready(s.post_signals(&req, caller, b))
})),
Method::GET | Method::HEAD => Box::new(future::result(self.0.get_signals(&req))),
Method::GET | Method::HEAD => Box::new(future::ready(self.0.get_signals(&req))),
_ => Box::new(future::err(plain_response(StatusCode::METHOD_NOT_ALLOWED,
"POST, GET, or HEAD expected"))),
}
}
}
impl ::hyper::service::Service for Service {
type ReqBody = ::hyper::Body;
type ResBody = Body;
type Error = BoxedError;
type Future = Box<dyn Future<Item = Response<Self::ResBody>, Error = Self::Error> + Send + 'static>;
fn call(&mut self, req: Request<::hyper::Body>) -> Self::Future {
fn wrap<R>(is_private: bool, r: R)
-> Box<dyn Future<Item = Response<Body>, Error = BoxedError> + Send + 'static>
where R: Future<Item = Response<Body>, Error = Response<Body>> + Send + 'static {
return Box::new(r.or_else(|e| Ok(e)).map(move |mut r| {
pub fn serve(&mut self, req: Request<::hyper::Body>) -> BoxedFuture {
fn wrap<R>(is_private: bool, r: R) -> BoxedFuture
where R: Future<Output = Result<Response<Body>, Response<Body>>> + Send + Sync + 'static {
return Box::new(r.or_else(|e| futures::future::ok(e)).map_ok(move |mut r| {
if is_private {
r.headers_mut().insert("Cache-Control", HeaderValue::from_static("private"));
}
@ -1030,8 +1018,8 @@ impl ::hyper::service::Service for Service {
}
fn wrap_r(is_private: bool, r: ResponseResult)
-> Box<dyn Future<Item = Response<Body>, Error = BoxedError> + Send + 'static> {
return wrap(is_private, future::result(r))
-> Box<dyn Future<Output = Result<Response<Body>, BoxedError>> + Send + Sync + 'static> {
return wrap(is_private, future::ready(r))
}
let p = Path::decode(req.uri().path());
@ -1066,13 +1054,13 @@ impl ::hyper::service::Service for Service {
Path::NotFound => wrap(true, future::err(not_found("path not understood"))),
Path::Login => wrap(true, with_form_body(req).and_then({
let s = self.clone();
move |(req, b)| { s.0.login(&req, b) }
move |(req, b)| future::ready(s.0.login(&req, b))
})),
Path::Logout => wrap(true, with_form_body(req).and_then({
let s = self.clone();
move |(req, b)| { s.0.logout(&req, b) }
move |(req, b)| future::ready(s.0.logout(&req, b))
})),
Path::Signals => wrap(true, self.signals(req, caller)),
Path::Signals => wrap(true, Pin::from(self.signals(req, caller))),
Path::Static => wrap_r(false, self.0.static_file(&req, req.uri().path())),
}
}
@ -1081,11 +1069,9 @@ impl ::hyper::service::Service for Service {
#[cfg(test)]
mod tests {
use db::testutil::{self, TestDb};
use futures::Future;
use http::header;
use futures::future::FutureExt;
use log::info;
use std::collections::HashMap;
use std::error::Error as StdError;
use super::Segments;
struct Server {
@ -1093,14 +1079,13 @@ mod tests {
base_url: String,
//test_camera_uuid: Uuid,
handle: Option<::std::thread::JoinHandle<()>>,
shutdown_tx: Option<futures::sync::oneshot::Sender<()>>,
shutdown_tx: Option<futures::channel::oneshot::Sender<()>>,
}
impl Server {
fn new(allow_unauthenticated_permissions: Option<db::Permissions>) -> Server {
let db = TestDb::new(base::clock::RealClocks {});
let (shutdown_tx, shutdown_rx) = futures::sync::oneshot::channel::<()>();
let addr = "127.0.0.1:0".parse().unwrap();
let (shutdown_tx, shutdown_rx) = futures::channel::oneshot::channel::<()>();
let service = super::Service::new(super::Config {
db: db.db.clone(),
ui_dir: None,
@ -1108,12 +1093,22 @@ mod tests {
trust_forward_hdrs: true,
time_zone_name: "".to_owned(),
}).unwrap();
let server = hyper::server::Server::bind(&addr)
.tcp_nodelay(true)
.serve(move || Ok::<_, Box<dyn StdError + Send + Sync>>(service.clone()));
let addr = server.local_addr(); // resolve port 0 to a real ephemeral port number.
let make_svc = hyper::service::make_service_fn(move |_conn| {
futures::future::ok::<_, std::convert::Infallible>(hyper::service::service_fn({
let mut s = service.clone();
move |req| std::pin::Pin::from(s.serve(req))
}))
});
let mut rt = tokio::runtime::Runtime::new().unwrap();
let srv = rt.enter(|| {
let addr = ([127, 0, 0, 1], 0).into();
hyper::server::Server::bind(&addr)
.tcp_nodelay(true)
.serve(make_svc)
});
let addr = srv.local_addr(); // resolve port 0 to a real ephemeral port number.
let handle = ::std::thread::spawn(move || {
::tokio::run(server.with_graceful_shutdown(shutdown_rx).map_err(|e| panic!(e)));
rt.block_on(srv.with_graceful_shutdown(shutdown_rx.map(|_| ()))).unwrap();
});
// Create a user.
@ -1141,14 +1136,14 @@ mod tests {
struct SessionCookie(Option<String>);
impl SessionCookie {
pub fn new(headers: &http::HeaderMap) -> Self {
pub fn new(headers: &reqwest::header::HeaderMap) -> Self {
let mut c = SessionCookie::default();
c.update(headers);
c
}
pub fn update(&mut self, headers: &http::HeaderMap) {
for set_cookie in headers.get_all(header::SET_COOKIE) {
pub fn update(&mut self, headers: &reqwest::header::HeaderMap) {
for set_cookie in headers.get_all(reqwest::header::SET_COOKIE) {
let mut set_cookie = set_cookie.to_str().unwrap().split("; ");
let c = set_cookie.next().unwrap();
let mut clear = false;
@ -1256,7 +1251,7 @@ mod tests {
let s = Server::new(None);
let cli = reqwest::Client::new();
let resp = cli.get(&format!("{}/api/", &s.base_url)).send().unwrap();
assert_eq!(resp.status(), http::StatusCode::UNAUTHORIZED);
assert_eq!(resp.status(), reqwest::StatusCode::UNAUTHORIZED);
}
#[test]
@ -1267,29 +1262,29 @@ mod tests {
let login_url = format!("{}/api/login", &s.base_url);
let resp = cli.get(&login_url).send().unwrap();
assert_eq!(resp.status(), http::StatusCode::METHOD_NOT_ALLOWED);
assert_eq!(resp.status(), reqwest::StatusCode::METHOD_NOT_ALLOWED);
let resp = cli.post(&login_url).send().unwrap();
assert_eq!(resp.status(), http::StatusCode::BAD_REQUEST);
assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
let mut p = HashMap::new();
p.insert("username", "slamb");
p.insert("password", "asdf");
let resp = cli.post(&login_url).form(&p).send().unwrap();
assert_eq!(resp.status(), http::StatusCode::UNAUTHORIZED);
assert_eq!(resp.status(), reqwest::StatusCode::UNAUTHORIZED);
p.insert("password", "hunter2");
let resp = cli.post(&login_url).form(&p).send().unwrap();
assert_eq!(resp.status(), http::StatusCode::NO_CONTENT);
assert_eq!(resp.status(), reqwest::StatusCode::NO_CONTENT);
let cookie = SessionCookie::new(resp.headers());
info!("cookie: {:?}", cookie);
info!("header: {}", cookie.header());
let resp = cli.get(&format!("{}/api/", &s.base_url))
.header(header::COOKIE, cookie.header())
.header(reqwest::header::COOKIE, cookie.header())
.send()
.unwrap();
assert_eq!(resp.status(), http::StatusCode::OK);
assert_eq!(resp.status(), reqwest::StatusCode::OK);
}
#[test]
@ -1301,38 +1296,38 @@ mod tests {
p.insert("username", "slamb");
p.insert("password", "hunter2");
let resp = cli.post(&format!("{}/api/login", &s.base_url)).form(&p).send().unwrap();
assert_eq!(resp.status(), http::StatusCode::NO_CONTENT);
assert_eq!(resp.status(), reqwest::StatusCode::NO_CONTENT);
let cookie = SessionCookie::new(resp.headers());
// A GET shouldn't work.
let resp = cli.get(&format!("{}/api/logout", &s.base_url))
.header(header::COOKIE, cookie.header())
.header(reqwest::header::COOKIE, cookie.header())
.send()
.unwrap();
assert_eq!(resp.status(), http::StatusCode::METHOD_NOT_ALLOWED);
assert_eq!(resp.status(), reqwest::StatusCode::METHOD_NOT_ALLOWED);
// Neither should a POST without a csrf token.
let resp = cli.post(&format!("{}/api/logout", &s.base_url))
.header(header::COOKIE, cookie.header())
.header(reqwest::header::COOKIE, cookie.header())
.send()
.unwrap();
assert_eq!(resp.status(), http::StatusCode::BAD_REQUEST);
assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
// But it should work with the csrf token.
// Retrieve that from the toplevel API request.
let toplevel: serde_json::Value = cli.post(&format!("{}/api/", &s.base_url))
.header(header::COOKIE, cookie.header())
.header(reqwest::header::COOKIE, cookie.header())
.send().unwrap()
.json().unwrap();
let csrf = toplevel.get("session").unwrap().get("csrf").unwrap().as_str();
let mut p = HashMap::new();
p.insert("csrf", csrf);
let resp = cli.post(&format!("{}/api/logout", &s.base_url))
.header(header::COOKIE, cookie.header())
.header(reqwest::header::COOKIE, cookie.header())
.form(&p)
.send()
.unwrap();
assert_eq!(resp.status(), http::StatusCode::NO_CONTENT);
assert_eq!(resp.status(), reqwest::StatusCode::NO_CONTENT);
let mut updated_cookie = cookie.clone();
updated_cookie.update(resp.headers());
@ -1341,10 +1336,10 @@ mod tests {
// It should also be invalidated server-side.
let resp = cli.get(&format!("{}/api/", &s.base_url))
.header(header::COOKIE, cookie.header())
.header(reqwest::header::COOKIE, cookie.header())
.send()
.unwrap();
assert_eq!(resp.status(), http::StatusCode::UNAUTHORIZED);
assert_eq!(resp.status(), reqwest::StatusCode::UNAUTHORIZED);
}
#[test]
@ -1357,7 +1352,7 @@ mod tests {
let resp = cli.get(
&format!("{}/api/cameras/{}/main/view.mp4", &s.base_url, s.db.test_camera_uuid))
.send().unwrap();
assert_eq!(resp.status(), http::StatusCode::BAD_REQUEST);
assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
}
}
@ -1366,10 +1361,8 @@ mod bench {
extern crate test;
use db::testutil::{self, TestDb};
use futures::Future;
use hyper;
use lazy_static::lazy_static;
use std::error::Error as StdError;
use uuid::Uuid;
struct Server {
@ -1382,7 +1375,6 @@ mod bench {
let db = TestDb::new(::base::clock::RealClocks {});
let test_camera_uuid = db.test_camera_uuid;
testutil::add_dummy_recordings_to_db(&db.db, 1440);
let addr = "127.0.0.1:0".parse().unwrap();
let service = super::Service::new(super::Config {
db: db.db.clone(),
ui_dir: None,
@ -1390,12 +1382,22 @@ mod bench {
trust_forward_hdrs: false,
time_zone_name: "".to_owned(),
}).unwrap();
let server = hyper::server::Server::bind(&addr)
.tcp_nodelay(true)
.serve(move || Ok::<_, Box<dyn StdError + Send + Sync>>(service.clone()));
let addr = server.local_addr(); // resolve port 0 to a real ephemeral port number.
let make_svc = hyper::service::make_service_fn(move |_conn| {
futures::future::ok::<_, std::convert::Infallible>(hyper::service::service_fn({
let mut s = service.clone();
move |req| std::pin::Pin::from(s.serve(req))
}))
});
let mut rt = tokio::runtime::Runtime::new().unwrap();
let srv = rt.enter(|| {
let addr = ([127, 0, 0, 1], 0).into();
hyper::server::Server::bind(&addr)
.tcp_nodelay(true)
.serve(make_svc)
});
let addr = srv.local_addr(); // resolve port 0 to a real ephemeral port number.
::std::thread::spawn(move || {
::tokio::run(server.map_err(|e| panic!(e)));
rt.block_on(srv).unwrap();
});
Server {
base_url: format!("http://{}:{}", addr.ip(), addr.port()),