diff --git a/server/Cargo.lock b/server/Cargo.lock index a6512ef..8e7ef8a 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -1294,6 +1294,7 @@ dependencies = [ "nom", "num-rational", "password-hash", + "pin-project", "pretty-hex", "protobuf", "reffers", @@ -1552,18 +1553,18 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pin-project" -version = "1.1.8" +version = "1.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e2ec53ad785f4d35dac0adea7f7dc6f1bb277ad84a680c7afefeae05d1f5916" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.8" +version = "1.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" dependencies = [ "proc-macro2", "quote", diff --git a/server/Cargo.toml b/server/Cargo.toml index 1240745..74083d9 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -84,6 +84,7 @@ flate2 = "1.0.26" hyper-util = { version = "0.1.7", features = ["server-graceful", "tokio"] } http-body = "1.0.1" http-body-util = "0.1.2" +pin-project = "1.1.10" [target.'cfg(target_os = "linux")'.dependencies] libsystemd = "0.7.0" diff --git a/server/db/dir/mod.rs b/server/db/dir/mod.rs index 2e9fa0d..ab88614 100644 --- a/server/db/dir/mod.rs +++ b/server/db/dir/mod.rs @@ -7,7 +7,7 @@ //! This mostly includes opening a directory and looking for recordings within it. //! Updates to the directory happen through [crate::writer]. -mod reader; +pub mod reader; use crate::coding; use crate::db::CompositeId; diff --git a/server/src/mp4.rs b/server/src/mp4.rs index 0ac1fa9..4e18b6e 100644 --- a/server/src/mp4.rs +++ b/server/src/mp4.rs @@ -61,10 +61,10 @@ use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use bytes::BytesMut; use db::dir; use db::recording::{self, rescale, TIME_UNITS_PER_SEC}; -use futures::stream::{self, TryStreamExt}; use futures::Stream; use http::header::HeaderValue; use hyper::body::Buf; +use pin_project::pin_project; use reffers::ARefss; use smallvec::SmallVec; use std::cmp; @@ -758,19 +758,37 @@ impl Slice { } } +#[pin_project(project = SliceStreamProj)] +enum SliceStream { + Once(Option>), + File(#[pin] db::dir::reader::FileStream), +} + +impl futures::stream::Stream for SliceStream { + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.project() { + SliceStreamProj::Once(o) => { + std::task::Poll::Ready(o.take().map(|r| r.map_err(wrap_error))) + } + SliceStreamProj::File(f) => f.poll_next(cx).map_ok(Chunk::from).map_err(wrap_error), + } + } +} + impl slices::Slice for Slice { type Ctx = File; type Chunk = Chunk; + type Stream = SliceStream; fn end(&self) -> u64 { self.0 & 0xFF_FF_FF_FF_FF } - fn get_range( - &self, - f: &File, - range: Range, - len: u64, - ) -> Box> + Send + Sync> { + fn get_range(&self, f: &File, range: Range, len: u64) -> SliceStream { trace!("getting mp4 slice {:?}'s range {:?} / {}", self, range, len); let p = self.p(); let res = match self.t() { @@ -802,22 +820,20 @@ impl slices::Slice for Slice { SliceType::SubtitleSampleData => f.0.get_subtitle_sample_data(p, range.clone(), len), SliceType::Truns => self.wrap_truns(f, range.clone(), len as usize), }; - Box::new(stream::once(futures::future::ready( - res.map_err(wrap_error).and_then(move |c| { - if c.remaining() != (range.end - range.start) as usize { - return Err(wrap_error(err!( - Internal, - msg( - "{:?} range {:?} produced incorrect len {}", - self, - range, - c.remaining() - ) - ))); - } - Ok(c) - }), - ))) + SliceStream::Once(Some(res.and_then(move |c| { + if c.remaining() != (range.end - range.start) as usize { + bail!( + Internal, + msg( + "{:?} range {:?} produced incorrect len {}", + self, + range, + c.remaining() + ) + ); + } + Ok(c) + }))) } fn get_slices(ctx: &File) -> &Slices { @@ -1796,32 +1812,20 @@ impl FileInner { .into()) } - /// Gets a `Chunk` of video sample data from disk. - /// This works by `mmap()`ing in the data. There are a couple caveats: - /// - /// * The thread which reads the resulting slice is likely to experience major page faults. - /// Eventually this will likely be rewritten to `mmap()` the memory in another thread, and - /// `mlock()` and send chunks of it to be read and `munlock()`ed to avoid this problem. - /// - /// * If the backing file is truncated, the program will crash with `SIGBUS`. This shouldn't - /// happen because nothing should be touching Moonfire NVR's files but itself. - fn get_video_sample_data( - &self, - i: usize, - r: Range, - ) -> Box> + Send + Sync> { + /// Gets a stream representing a range of segment `i`'s sample data from disk. + fn get_video_sample_data(&self, i: usize, r: Range) -> SliceStream { let s = &self.segments[i]; let sr = s.s.sample_file_range(); let f = match self.dirs_by_stream_id.get(&s.s.id.stream()) { None => { - return Box::new(stream::iter(std::iter::once(Err(wrap_error(err!( + return SliceStream::Once(Some(Err(err!( NotFound, msg("{}: stream not found", s.s.id) - )))))) + )))) } Some(d) => d.open_file(s.s.id, (r.start + sr.start)..(r.end + sr.start)), }; - Box::new(f.map_ok(Chunk::from).map_err(wrap_error)) + SliceStream::File(f) } fn get_subtitle_sample_data(&self, i: usize, r: Range, len: u64) -> Result { diff --git a/server/src/slices.rs b/server/src/slices.rs index e380813..7111226 100644 --- a/server/src/slices.rs +++ b/server/src/slices.rs @@ -17,7 +17,8 @@ use tracing_futures::Instrument; /// Each `Slice` instance belongs to a single `Slices`. pub trait Slice: fmt::Debug + Sized + Sync + 'static { type Ctx: Send + Sync + Clone; - type Chunk: Send + Sync; + type Chunk: Send + Sync + 'static; + type Stream: Stream> + Send + Sync; /// The byte position (relative to the start of the `Slices`) of the end of this slice, /// exclusive. Note the starting position (and thus length) are inferred from the previous @@ -27,12 +28,10 @@ pub trait Slice: fmt::Debug + Sized + Sync + 'static { /// Gets the body bytes indicated by `r`, which is relative to this slice's start. /// The additional argument `ctx` is as supplied to the `Slices`. /// The additional argument `l` is the length of this slice, as determined by the `Slices`. - fn get_range( - &self, - ctx: &Self::Ctx, - r: Range, - len: u64, - ) -> Box> + Sync + Send>; + /// + /// Note that unlike [`http_entity::Entity::get_range`], this is called many times per request, + /// so it's worth defining a custom stream type to avoid allocation overhead. + fn get_range(&self, ctx: &Self::Ctx, r: Range, len: u64) -> Self::Stream; fn get_slices(ctx: &Self::Ctx) -> &Slices; } @@ -127,7 +126,7 @@ where } /// Writes `range` to `out`. - /// This interface mirrors `http_serve::Entity::write_to`, with the additional `ctx` argument. + /// This interface mirrors `http_serve::Entity::get_range`, with the additional `ctx` argument. pub fn get_range( &self, ctx: &S::Ctx, @@ -170,7 +169,7 @@ where let l = s_end - slice_start; body = s.get_range(&c, start_pos..min_end - slice_start, l); }; - futures::future::ready(Some((Pin::from(body), (c, i + 1, 0, min_end)))) + futures::future::ready(Some((body, (c, i + 1, 0, min_end)))) }, ); Box::pin(bodies.flatten().in_current_span()) @@ -182,7 +181,7 @@ mod tests { use super::{Slice, Slices}; use crate::body::BoxedError; use db::testutil; - use futures::stream::{self, Stream, TryStreamExt}; + use futures::stream::{self, TryStreamExt}; use std::ops::Range; use std::pin::Pin; @@ -201,6 +200,7 @@ mod tests { impl Slice for FakeSlice { type Ctx = &'static Slices; type Chunk = FakeChunk; + type Stream = stream::Once>>; fn end(&self) -> u64 { self.end @@ -211,11 +211,11 @@ mod tests { _ctx: &&'static Slices, r: Range, _l: u64, - ) -> Box> + Send + Sync> { - Box::new(stream::once(futures::future::ok(FakeChunk { + ) -> Self::Stream { + stream::once(futures::future::ok(FakeChunk { slice: self.name, range: r, - }))) + })) } fn get_slices(ctx: &&'static Slices) -> &'static Slices { diff --git a/server/src/stream.rs b/server/src/stream.rs index 101e010..cfc366e 100644 --- a/server/src/stream.rs +++ b/server/src/stream.rs @@ -8,7 +8,6 @@ use futures::StreamExt; use retina::client::Demuxed; use retina::codec::CodecItem; use std::pin::Pin; -use std::result::Result; use tracing::Instrument; use url::Url; diff --git a/server/src/streamer.rs b/server/src/streamer.rs index 6213360..7d73235 100644 --- a/server/src/streamer.rs +++ b/server/src/streamer.rs @@ -6,7 +6,6 @@ use crate::stream; use base::clock::{Clocks, TimerGuard}; use base::{bail, err, Error}; use db::{dir, recording, writer, Camera, Database, Stream}; -use std::result::Result; use std::str::FromStr; use std::sync::Arc; use tracing::{debug, info, trace, warn, Instrument};