update deps (particularly hyper) + fix warnings

This commit is contained in:
Scott Lamb 2017-09-21 21:51:58 -07:00
parent 857a66f29c
commit 11420df065
9 changed files with 439 additions and 426 deletions

739
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -18,8 +18,8 @@ byteorder = "1.0"
docopt = "0.8" docopt = "0.8"
futures = "0.1" futures = "0.1"
fnv = "1.0" fnv = "1.0"
http-entity = { git = "https://github.com/scottlamb/http-entity", branch = "hyper-0.11.x" } http-entity = { git = "https://github.com/scottlamb/http-entity" }
hyper = { git = "https://github.com/scottlamb/hyper", branch = "moonfire-on-0.11.x" } hyper = "0.11.2"
lazy_static = "0.2" lazy_static = "0.2"
libc = "0.2" libc = "0.2"
log = { version = "0.3", features = ["release_max_level_info"] } log = { version = "0.3", features = ["release_max_level_info"] }
@ -44,7 +44,7 @@ url = "1.4"
uuid = { version = "0.5", features = ["serde", "v4"] } uuid = { version = "0.5", features = ["serde", "v4"] }
[dev-dependencies] [dev-dependencies]
reqwest = "0.6" reqwest = "0.7"
tempdir = "0.3" tempdir = "0.3"
[dependencies.cursive] [dependencies.cursive]
@ -59,5 +59,4 @@ debug = true
debug = true debug = true
[replace] [replace]
"https://github.com/hyperium/hyper#hyper:0.11.0-a.0" = { git = "https://github.com/scottlamb/hyper", branch = "moonfire-on-0.11.x" } "hyper:0.11.2" = { git = "https://github.com/scottlamb/hyper", branch = "moonfire-on-0.11.x" }
"hyper:0.10.12" = { git = "https://github.com/scottlamb/hyper", branch = "moonfire-on-0.10.x" }

View File

@ -76,7 +76,11 @@ static int lock_callback(void **mutex, enum AVLockOp op) {
case AV_LOCK_RELEASE: case AV_LOCK_RELEASE:
if (pthread_mutex_unlock(*mutex) != 0) if (pthread_mutex_unlock(*mutex) != 0)
return -1; return -1;
break;
default:
return -1;
} }
return 0;
} }
void moonfire_ffmpeg_init(void) { void moonfire_ffmpeg_init(void) {
@ -91,7 +95,7 @@ struct moonfire_ffmpeg_streams {
}; };
struct moonfire_ffmpeg_data { struct moonfire_ffmpeg_data {
const char *data; uint8_t *data;
size_t len; size_t len;
}; };

View File

@ -32,7 +32,7 @@ use clock;
use db; use db;
use dir; use dir;
use error::Error; use error::Error;
use futures::{BoxFuture, Future, Stream}; use futures::{Future, Stream};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::thread; use std::thread;
@ -66,13 +66,12 @@ struct Args {
flag_read_only: bool, flag_read_only: bool,
} }
fn setup_shutdown_future(h: &reactor::Handle) -> BoxFuture<(), ()> { fn setup_shutdown_future(h: &reactor::Handle) -> Box<Future<Item = (), Error = ()>> {
let int = Signal::new(SIGINT, h).flatten_stream().into_future(); let int = Signal::new(SIGINT, h).flatten_stream().into_future();
let term = Signal::new(SIGTERM, h).flatten_stream().into_future(); let term = Signal::new(SIGTERM, h).flatten_stream().into_future();
int.select(term) Box::new(int.select(term)
.map(|_| ()) .map(|_| ())
.map_err(|_| ()) .map_err(|_| ()))
.boxed()
} }
pub fn run() -> Result<(), Error> { pub fn run() -> Result<(), Error> {

View File

@ -613,7 +613,7 @@ impl<'a> Transaction<'a> {
(":uuid", &uuid), (":uuid", &uuid),
(":state", &(ReservationState::Deleting as i64)) (":state", &(ReservationState::Deleting as i64))
])?; ])?;
let mut m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, row.camera_id); let m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, row.camera_id);
m.duration -= row.time.end - row.time.start; m.duration -= row.time.end - row.time.start;
m.sample_file_bytes -= row.sample_file_bytes as i64; m.sample_file_bytes -= row.sample_file_bytes as i64;
adjust_days(row.time.clone(), -1, &mut m.days); adjust_days(row.time.clone(), -1, &mut m.days);
@ -665,7 +665,7 @@ impl<'a> Transaction<'a> {
} }
} }
self.must_rollback = true; self.must_rollback = true;
let mut m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, r.camera_id); let m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, r.camera_id);
{ {
let recording_id = m.new_next_recording_id.unwrap_or(cam.next_recording_id); let recording_id = m.new_next_recording_id.unwrap_or(cam.next_recording_id);
let composite_id = composite_id(r.camera_id, recording_id); let composite_id = composite_id(r.camera_id, recording_id);
@ -720,7 +720,7 @@ impl<'a> Transaction<'a> {
if changes != 1 { if changes != 1 {
return Err(Error::new(format!("no such camera {}", camera_id))); return Err(Error::new(format!("no such camera {}", camera_id)));
} }
let mut m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, camera_id); let m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, camera_id);
m.new_retain_bytes = Some(new_limit); m.new_retain_bytes = Some(new_limit);
Ok(()) Ok(())
} }
@ -731,7 +731,7 @@ impl<'a> Transaction<'a> {
self.precommit()?; self.precommit()?;
self.tx.commit()?; self.tx.commit()?;
for (&camera_id, m) in &self.mods_by_camera { for (&camera_id, m) in &self.mods_by_camera {
let mut camera = self.state.cameras_by_id.get_mut(&camera_id) let camera = self.state.cameras_by_id.get_mut(&camera_id)
.expect("modified camera must exist"); .expect("modified camera must exist");
camera.duration += m.duration; camera.duration += m.duration;
camera.sample_file_bytes += m.sample_file_bytes; camera.sample_file_bytes += m.sample_file_bytes;
@ -1348,7 +1348,7 @@ impl Database {
}, },
})); }));
{ {
let mut l = &mut *db.lock(); let l = &mut *db.lock();
l.init_video_sample_entries().annotate_err("init_video_sample_entries")?; l.init_video_sample_entries().annotate_err("init_video_sample_entries")?;
l.init_cameras().annotate_err("init_cameras")?; l.init_cameras().annotate_err("init_cameras")?;
for (&camera_id, ref mut camera) in &mut l.state.cameras_by_id { for (&camera_id, ref mut camera) in &mut l.state.cameras_by_id {

View File

@ -76,14 +76,12 @@
//! * mdat (media data container) //! * mdat (media data container)
//! ``` //! ```
extern crate byteorder;
extern crate time; extern crate time;
use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use db; use db;
use dir; use dir;
use error::Error; use error::Error;
use futures::Stream;
use futures::stream; use futures::stream;
use http_entity; use http_entity;
use hyper::header; use hyper::header;
@ -396,7 +394,7 @@ impl Segment {
v.into_boxed_slice() v.into_boxed_slice()
}; };
{ {
let (stts, mut rest) = buf.split_at_mut(lens.stts); let (stts, rest) = buf.split_at_mut(lens.stts);
let (stsz, stss) = rest.split_at_mut(lens.stsz); let (stsz, stss) = rest.split_at_mut(lens.stsz);
let mut frame = 0; let mut frame = 0;
let mut key_frame = 0; let mut key_frame = 0;
@ -536,7 +534,7 @@ impl slices::Slice for Slice {
SliceType::VideoSampleData => f.0.get_video_sample_data(p, range.clone()), SliceType::VideoSampleData => f.0.get_video_sample_data(p, range.clone()),
SliceType::SubtitleSampleData => f.0.get_subtitle_sample_data(p, range.clone(), len), SliceType::SubtitleSampleData => f.0.get_subtitle_sample_data(p, range.clone(), len),
}; };
stream::once(res Box::new(stream::once(res
.map_err(|e| { .map_err(|e| {
error!("Error producing {:?}: {:?}", self, e); error!("Error producing {:?}: {:?}", self, e);
::hyper::Error::Incomplete ::hyper::Error::Incomplete
@ -548,7 +546,7 @@ impl slices::Slice for Slice {
return Err(::hyper::Error::Incomplete); return Err(::hyper::Error::Incomplete);
} }
Ok(c) Ok(c)
})).boxed() })))
} }
fn get_slices(ctx: &File) -> &Slices<Self> { &ctx.0.slices } fn get_slices(ctx: &File) -> &Slices<Self> { &ctx.0.slices }
@ -1217,7 +1215,10 @@ impl FileInner {
#[derive(Clone)] #[derive(Clone)]
pub struct File(Arc<FileInner>); pub struct File(Arc<FileInner>);
impl http_entity::Entity<Body> for File { impl http_entity::Entity for File {
type Chunk = slices::Chunk;
type Body = slices::Body;
fn add_headers(&self, hdrs: &mut header::Headers) { fn add_headers(&self, hdrs: &mut header::Headers) {
hdrs.set(header::ContentType("video/mp4".parse().unwrap())); hdrs.set(header::ContentType("video/mp4".parse().unwrap()));
} }
@ -1258,12 +1259,13 @@ mod tests {
use stream::{self, Opener, Stream}; use stream::{self, Opener, Stream};
use testutil::{self, TestDb, TEST_CAMERA_ID}; use testutil::{self, TestDb, TEST_CAMERA_ID};
fn fill_slice(slice: &mut [u8], e: &http_entity::Entity<Body>, start: u64) { fn fill_slice<E: http_entity::Entity>(slice: &mut [u8], e: &E, start: u64) {
let mut p = 0; let mut p = 0;
e.get_range(start .. start + slice.len() as u64) e.get_range(start .. start + slice.len() as u64)
.for_each(|chunk| { .for_each(|chunk| {
slice[p .. p + chunk.len()].copy_from_slice(&chunk); let c: &[u8] = chunk.as_ref();
p += chunk.len(); slice[p .. p + c.len()].copy_from_slice(c);
p += c.len();
Ok::<_, ::hyper::Error>(()) Ok::<_, ::hyper::Error>(())
}) })
.wait() .wait()
@ -1271,10 +1273,11 @@ mod tests {
} }
/// Returns the SHA-1 digest of the given `Entity`. /// Returns the SHA-1 digest of the given `Entity`.
fn digest(e: &http_entity::Entity<Body>) -> hash::DigestBytes { fn digest<E: http_entity::Entity>(e: &E) -> hash::DigestBytes {
e.get_range(0 .. e.len()) e.get_range(0 .. e.len())
.fold(hash::Hasher::new(hash::MessageDigest::sha1()).unwrap(), |mut sha1, chunk| { .fold(hash::Hasher::new(hash::MessageDigest::sha1()).unwrap(), |mut sha1, chunk| {
sha1.update(&chunk).unwrap(); let c: &[u8] = chunk.as_ref();
sha1.update(c).unwrap();
Ok::<_, ::hyper::Error>(sha1) Ok::<_, ::hyper::Error>(sha1)
}) })
.wait() .wait()
@ -1770,14 +1773,13 @@ mod bench {
extern crate reqwest; extern crate reqwest;
extern crate test; extern crate test;
use futures::Stream;
use futures::future; use futures::future;
use futures::stream::BoxStream;
use hyper; use hyper;
use http_entity; use http_entity;
use recording; use recording;
use reffers::ARefs; use reffers::ARefs;
use self::test::Bencher; use self::test::Bencher;
use std::str;
use super::tests::create_mp4_from_db; use super::tests::create_mp4_from_db;
use testutil::{self, TestDb}; use testutil::{self, TestDb};
use url::Url; use url::Url;
@ -1821,7 +1823,8 @@ mod bench {
impl hyper::server::Service for MyService { impl hyper::server::Service for MyService {
type Request = hyper::server::Request; type Request = hyper::server::Request;
type Response = hyper::server::Response<BoxStream<ARefs<'static, [u8]>, hyper::Error>>; type Response = hyper::server::Response<
Box<Stream<Item = ARefs<'static, [u8]>, Error = hyper::Error> + Send>>;
type Error = hyper::Error; type Error = hyper::Error;
type Future = future::FutureResult<Self::Response, Self::Error>; type Future = future::FutureResult<Self::Response, Self::Error>;
@ -1873,6 +1876,7 @@ mod bench {
use self::reqwest::header::{Range, ByteRangeSpec}; use self::reqwest::header::{Range, ByteRangeSpec};
let mut resp = let mut resp =
client.get(server.url.clone()) client.get(server.url.clone())
.unwrap()
.header(Range::Bytes(vec![ByteRangeSpec::FromTo(0, p - 1)])) .header(Range::Bytes(vec![ByteRangeSpec::FromTo(0, p - 1)]))
.send() .send()
.unwrap(); .unwrap();

View File

@ -28,8 +28,6 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
extern crate uuid;
use coding::{append_varint32, decode_varint32, unzigzag32, zigzag32}; use coding::{append_varint32, decode_varint32, unzigzag32, zigzag32};
use core::str::FromStr; use core::str::FromStr;
use db; use db;

View File

@ -31,13 +31,13 @@
//! Tools for implementing a `http_entity::Entity` body composed from many "slices". //! Tools for implementing a `http_entity::Entity` body composed from many "slices".
use reffers::ARefs; use reffers::ARefs;
use futures::stream::{self, BoxStream}; use futures::stream;
use futures::Stream; use futures::Stream;
use std::fmt; use std::fmt;
use std::ops::Range; use std::ops::Range;
pub type Chunk = ARefs<'static, [u8]>; pub type Chunk = ARefs<'static, [u8]>;
pub type Body = stream::BoxStream<Chunk, ::hyper::Error>; pub type Body = Box<Stream<Item = Chunk, Error = ::hyper::Error> + Send>;
/// Writes a byte range to the given `io::Write` given a context argument; meant for use with /// Writes a byte range to the given `io::Write` given a context argument; meant for use with
/// `Slices`. /// `Slices`.
@ -53,7 +53,7 @@ pub trait Slice : Sync + Sized + 'static {
/// The additional argument `ctx` is as supplied to the `Slices`. /// The additional argument `ctx` is as supplied to the `Slices`.
/// The additional argument `l` is the length of this slice, as determined by the `Slices`. /// The additional argument `l` is the length of this slice, as determined by the `Slices`.
fn get_range(&self, ctx: &Self::Ctx, r: Range<u64>, len: u64) fn get_range(&self, ctx: &Self::Ctx, r: Range<u64>, len: u64)
-> stream::BoxStream<Self::Chunk, ::hyper::Error>; -> Box<Stream<Item = Self::Chunk, Error = ::hyper::Error> + Send>;
fn get_slices(ctx: &Self::Ctx) -> &Slices<Self>; fn get_slices(ctx: &Self::Ctx) -> &Slices<Self>;
} }
@ -108,10 +108,10 @@ impl<S> Slices<S> where S: Slice {
/// Writes `range` to `out`. /// Writes `range` to `out`.
/// This interface mirrors `http_entity::Entity::write_to`, with the additional `ctx` argument. /// This interface mirrors `http_entity::Entity::write_to`, with the additional `ctx` argument.
pub fn get_range(&self, ctx: &S::Ctx, range: Range<u64>) pub fn get_range(&self, ctx: &S::Ctx, range: Range<u64>)
-> BoxStream<S::Chunk, ::hyper::Error> { -> Box<Stream<Item = S::Chunk, Error = ::hyper::Error> + Send> {
if range.start > range.end || range.end > self.len { if range.start > range.end || range.end > self.len {
error!("Bad range {:?} for slice of length {}", range, self.len); error!("Bad range {:?} for slice of length {}", range, self.len);
return stream::once(Err(::hyper::Error::Incomplete)).boxed(); return Box::new(stream::once(Err(::hyper::Error::Incomplete)));
} }
// Binary search for the first slice of the range to write, determining its index and // Binary search for the first slice of the range to write, determining its index and
@ -139,14 +139,14 @@ impl<S> Slices<S> where S: Slice {
}; };
Some(Ok::<_, ::hyper::Error>((body, (c, i+1, 0, end)))) Some(Ok::<_, ::hyper::Error>((body, (c, i+1, 0, end))))
}); });
bodies.flatten().boxed() Box::new(bodies.flatten())
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use futures::{Future, Stream}; use futures::{Future, Stream};
use futures::stream::{self, BoxStream}; use futures::stream;
use std::ops::Range; use std::ops::Range;
use super::{Slice, Slices}; use super::{Slice, Slices};
use testutil; use testutil;
@ -169,8 +169,8 @@ mod tests {
fn end(&self) -> u64 { self.end } fn end(&self) -> u64 { self.end }
fn get_range(&self, _ctx: &&'static Slices<FakeSlice>, r: Range<u64>, _l: u64) fn get_range(&self, _ctx: &&'static Slices<FakeSlice>, r: Range<u64>, _l: u64)
-> BoxStream<FakeChunk, ::hyper::Error> { -> Box<Stream<Item = FakeChunk, Error = ::hyper::Error> + Send> {
stream::once(Ok(FakeChunk{slice: self.name, range: r})).boxed() Box::new(stream::once(Ok(FakeChunk{slice: self.name, range: r})))
} }
fn get_slices(ctx: &&'static Slices<FakeSlice>) -> &'static Slices<Self> { *ctx } fn get_slices(ctx: &&'static Slices<FakeSlice>) -> &'static Slices<Self> { *ctx }

View File

@ -35,11 +35,10 @@ use core::str::FromStr;
use db; use db;
use dir::SampleFileDir; use dir::SampleFileDir;
use error::Error; use error::Error;
use futures::Stream;
use futures::{future, stream}; use futures::{future, stream};
use json; use json;
use http_entity; use http_entity;
use hyper::{header, status}; use hyper::header;
use hyper::server::{self, Request, Response}; use hyper::server::{self, Request, Response};
use mime; use mime;
use mp4; use mp4;
@ -224,10 +223,11 @@ impl Service {
} }
fn not_found(&self) -> Result<Response<slices::Body>, Error> { fn not_found(&self) -> Result<Response<slices::Body>, Error> {
let body: slices::Body = Box::new(stream::once(Ok(ARefs::new(&b"not found"[..]))));
Ok(Response::new() Ok(Response::new()
.with_status(status::StatusCode::NotFound) .with_status(hyper::StatusCode::NotFound)
.with_header(header::ContentType(mime::TEXT_PLAIN)) .with_header(header::ContentType(mime::TEXT_PLAIN))
.with_body(stream::once(Ok(ARefs::new(&b"not found"[..]))).boxed())) .with_body(body))
} }
fn list_cameras(&self, req: &Request) -> Result<Response<slices::Body>, Error> { fn list_cameras(&self, req: &Request) -> Result<Response<slices::Body>, Error> {
@ -240,11 +240,13 @@ impl Service {
self.list_cameras_html(db)? self.list_cameras_html(db)?
} }
}; };
let len = buf.len();
let body: slices::Body = Box::new(stream::once(Ok(ARefs::new(buf))));
Ok(Response::new() Ok(Response::new()
.with_header(header::ContentType(if json { mime::APPLICATION_JSON } .with_header(header::ContentType(if json { mime::APPLICATION_JSON }
else { mime::TEXT_HTML })) else { mime::TEXT_HTML }))
.with_header(header::ContentLength(buf.len() as u64)) .with_header(header::ContentLength(len as u64))
.with_body(stream::once(Ok(ARefs::new(buf))).boxed())) .with_body(body))
} }
fn list_cameras_html(&self, db: MutexGuard<db::LockedDatabase>) -> Result<Vec<u8>, Error> { fn list_cameras_html(&self, db: MutexGuard<db::LockedDatabase>) -> Result<Vec<u8>, Error> {
@ -294,11 +296,13 @@ impl Service {
self.camera_html(db, query, uuid)? self.camera_html(db, query, uuid)?
} }
}; };
let len = buf.len();
let body: slices::Body = Box::new(stream::once(Ok(ARefs::new(buf))));
Ok(Response::new() Ok(Response::new()
.with_header(header::ContentType(if json { mime::APPLICATION_JSON } .with_header(header::ContentType(if json { mime::APPLICATION_JSON }
else { mime::TEXT_HTML })) else { mime::TEXT_HTML }))
.with_header(header::ContentLength(buf.len() as u64)) .with_header(header::ContentLength(len as u64))
.with_body(stream::once(Ok(ARefs::new(buf))).boxed())) .with_body(body))
} }
fn camera_html(&self, db: MutexGuard<db::LockedDatabase>, query: Option<&str>, fn camera_html(&self, db: MutexGuard<db::LockedDatabase>, query: Option<&str>,
@ -398,10 +402,11 @@ impl Service {
-> Result<Response<slices::Body>, Error> { -> Result<Response<slices::Body>, Error> {
let r = Service::get_optional_range(query)?; let r = Service::get_optional_range(query)?;
if !is_json(req) { if !is_json(req) {
let body: slices::Body = Box::new(stream::once(
Ok(ARefs::new(&b"only available for JSON requests"[..]))));
return Ok(Response::new() return Ok(Response::new()
.with_status(status::StatusCode::NotAcceptable) .with_status(hyper::StatusCode::NotAcceptable)
.with_body(stream::once( .with_body(body));
Ok(ARefs::new(&b"only available for JSON requests"[..]))).boxed()));
} }
let mut out = json::ListRecordings{recordings: Vec::new()}; let mut out = json::ListRecordings{recordings: Vec::new()};
{ {
@ -423,10 +428,12 @@ impl Service {
})?; })?;
} }
let buf = serde_json::to_vec(&out)?; let buf = serde_json::to_vec(&out)?;
let len = buf.len();
let body: slices::Body = Box::new(stream::once(Ok(ARefs::new(buf))));
Ok(Response::new() Ok(Response::new()
.with_header(header::ContentType(mime::APPLICATION_JSON)) .with_header(header::ContentType(mime::APPLICATION_JSON))
.with_header(header::ContentLength(buf.len() as u64)) .with_header(header::ContentLength(len as u64))
.with_body(stream::once(Ok(ARefs::new(buf))).boxed())) .with_body(body))
} }
fn camera_view_mp4(&self, uuid: Uuid, query: Option<&str>, req: &Request) fn camera_view_mp4(&self, uuid: Uuid, query: Option<&str>, req: &Request)
@ -614,7 +621,6 @@ mod bench {
use hyper; use hyper;
use self::test::Bencher; use self::test::Bencher;
use std::str;
use testutil::{self, TestDb}; use testutil::{self, TestDb};
struct Server { struct Server {
@ -653,8 +659,8 @@ mod bench {
let mut buf = Vec::new(); let mut buf = Vec::new();
let client = reqwest::Client::new().unwrap(); let client = reqwest::Client::new().unwrap();
let mut f = || { let mut f = || {
let mut resp = client.get(url.clone()).send().unwrap(); let mut resp = client.get(url.clone()).unwrap().send().unwrap();
assert_eq!(*resp.status(), reqwest::StatusCode::Ok); assert_eq!(resp.status(), reqwest::StatusCode::Ok);
buf.clear(); buf.clear();
use std::io::Read; use std::io::Read;
resp.read_to_end(&mut buf).unwrap(); resp.read_to_end(&mut buf).unwrap();