reduce per-slice allocations

`slices::Slices::get_range` was too closely following the example of
`http_serve::Entity::get_range`:

The latter is once-per-request, so just boxing is low-cost and makes
sense to easily avoid monomorphization bloat when there are potentially
many types of entity streams in one program. In Moonfire, it's used with
different streams defined in the `moonfire_nvr::web::mp4`,
`moonfire_nvr::bundled_ui`, and `http_serve::file` modules. Putting them
all into a single boxless enum would be a pain. In particular, the last
one is not a nameable type today and would need more generic parameters
to implement the caller-demanded `Entity` definition.

The former is once-per-slice, there are tons of slices per request, and
it's easy to define a two-case enum right where it's needed. So the
trade-off is quite different.

Also fix up some out-of-date comments.
This commit is contained in:
Scott Lamb 2025-03-07 18:54:34 -08:00
parent 3cc9603ff3
commit 2985214d87
7 changed files with 64 additions and 60 deletions

9
server/Cargo.lock generated
View File

@ -1294,6 +1294,7 @@ dependencies = [
"nom",
"num-rational",
"password-hash",
"pin-project",
"pretty-hex",
"protobuf",
"reffers",
@ -1552,18 +1553,18 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "pin-project"
version = "1.1.8"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e2ec53ad785f4d35dac0adea7f7dc6f1bb277ad84a680c7afefeae05d1f5916"
checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.8"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb"
checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
dependencies = [
"proc-macro2",
"quote",

View File

@ -84,6 +84,7 @@ flate2 = "1.0.26"
hyper-util = { version = "0.1.7", features = ["server-graceful", "tokio"] }
http-body = "1.0.1"
http-body-util = "0.1.2"
pin-project = "1.1.10"
[target.'cfg(target_os = "linux")'.dependencies]
libsystemd = "0.7.0"

View File

@ -7,7 +7,7 @@
//! This mostly includes opening a directory and looking for recordings within it.
//! Updates to the directory happen through [crate::writer].
mod reader;
pub mod reader;
use crate::coding;
use crate::db::CompositeId;

View File

@ -61,10 +61,10 @@ use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use bytes::BytesMut;
use db::dir;
use db::recording::{self, rescale, TIME_UNITS_PER_SEC};
use futures::stream::{self, TryStreamExt};
use futures::Stream;
use http::header::HeaderValue;
use hyper::body::Buf;
use pin_project::pin_project;
use reffers::ARefss;
use smallvec::SmallVec;
use std::cmp;
@ -758,19 +758,37 @@ impl Slice {
}
}
#[pin_project(project = SliceStreamProj)]
enum SliceStream {
Once(Option<Result<Chunk, Error>>),
File(#[pin] db::dir::reader::FileStream),
}
impl futures::stream::Stream for SliceStream {
type Item = Result<Chunk, BoxedError>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.project() {
SliceStreamProj::Once(o) => {
std::task::Poll::Ready(o.take().map(|r| r.map_err(wrap_error)))
}
SliceStreamProj::File(f) => f.poll_next(cx).map_ok(Chunk::from).map_err(wrap_error),
}
}
}
impl slices::Slice for Slice {
type Ctx = File;
type Chunk = Chunk;
type Stream = SliceStream;
fn end(&self) -> u64 {
self.0 & 0xFF_FF_FF_FF_FF
}
fn get_range(
&self,
f: &File,
range: Range<u64>,
len: u64,
) -> Box<dyn Stream<Item = Result<Self::Chunk, BoxedError>> + Send + Sync> {
fn get_range(&self, f: &File, range: Range<u64>, len: u64) -> SliceStream {
trace!("getting mp4 slice {:?}'s range {:?} / {}", self, range, len);
let p = self.p();
let res = match self.t() {
@ -802,22 +820,20 @@ impl slices::Slice for Slice {
SliceType::SubtitleSampleData => f.0.get_subtitle_sample_data(p, range.clone(), len),
SliceType::Truns => self.wrap_truns(f, range.clone(), len as usize),
};
Box::new(stream::once(futures::future::ready(
res.map_err(wrap_error).and_then(move |c| {
if c.remaining() != (range.end - range.start) as usize {
return Err(wrap_error(err!(
Internal,
msg(
"{:?} range {:?} produced incorrect len {}",
self,
range,
c.remaining()
)
)));
}
Ok(c)
}),
)))
SliceStream::Once(Some(res.and_then(move |c| {
if c.remaining() != (range.end - range.start) as usize {
bail!(
Internal,
msg(
"{:?} range {:?} produced incorrect len {}",
self,
range,
c.remaining()
)
);
}
Ok(c)
})))
}
fn get_slices(ctx: &File) -> &Slices<Self> {
@ -1796,32 +1812,20 @@ impl FileInner {
.into())
}
/// Gets a `Chunk` of video sample data from disk.
/// This works by `mmap()`ing in the data. There are a couple caveats:
///
/// * The thread which reads the resulting slice is likely to experience major page faults.
/// Eventually this will likely be rewritten to `mmap()` the memory in another thread, and
/// `mlock()` and send chunks of it to be read and `munlock()`ed to avoid this problem.
///
/// * If the backing file is truncated, the program will crash with `SIGBUS`. This shouldn't
/// happen because nothing should be touching Moonfire NVR's files but itself.
fn get_video_sample_data(
&self,
i: usize,
r: Range<u64>,
) -> Box<dyn Stream<Item = Result<Chunk, BoxedError>> + Send + Sync> {
/// Gets a stream representing a range of segment `i`'s sample data from disk.
fn get_video_sample_data(&self, i: usize, r: Range<u64>) -> SliceStream {
let s = &self.segments[i];
let sr = s.s.sample_file_range();
let f = match self.dirs_by_stream_id.get(&s.s.id.stream()) {
None => {
return Box::new(stream::iter(std::iter::once(Err(wrap_error(err!(
return SliceStream::Once(Some(Err(err!(
NotFound,
msg("{}: stream not found", s.s.id)
))))))
))))
}
Some(d) => d.open_file(s.s.id, (r.start + sr.start)..(r.end + sr.start)),
};
Box::new(f.map_ok(Chunk::from).map_err(wrap_error))
SliceStream::File(f)
}
fn get_subtitle_sample_data(&self, i: usize, r: Range<u64>, len: u64) -> Result<Chunk, Error> {

View File

@ -17,7 +17,8 @@ use tracing_futures::Instrument;
/// Each `Slice` instance belongs to a single `Slices`.
pub trait Slice: fmt::Debug + Sized + Sync + 'static {
type Ctx: Send + Sync + Clone;
type Chunk: Send + Sync;
type Chunk: Send + Sync + 'static;
type Stream: Stream<Item = Result<Self::Chunk, BoxedError>> + Send + Sync;
/// The byte position (relative to the start of the `Slices`) of the end of this slice,
/// exclusive. Note the starting position (and thus length) are inferred from the previous
@ -27,12 +28,10 @@ pub trait Slice: fmt::Debug + Sized + Sync + 'static {
/// Gets the body bytes indicated by `r`, which is relative to this slice's start.
/// The additional argument `ctx` is as supplied to the `Slices`.
/// The additional argument `l` is the length of this slice, as determined by the `Slices`.
fn get_range(
&self,
ctx: &Self::Ctx,
r: Range<u64>,
len: u64,
) -> Box<dyn Stream<Item = Result<Self::Chunk, BoxedError>> + Sync + Send>;
///
/// Note that unlike [`http_entity::Entity::get_range`], this is called many times per request,
/// so it's worth defining a custom stream type to avoid allocation overhead.
fn get_range(&self, ctx: &Self::Ctx, r: Range<u64>, len: u64) -> Self::Stream;
fn get_slices(ctx: &Self::Ctx) -> &Slices<Self>;
}
@ -127,7 +126,7 @@ where
}
/// Writes `range` to `out`.
/// This interface mirrors `http_serve::Entity::write_to`, with the additional `ctx` argument.
/// This interface mirrors `http_serve::Entity::get_range`, with the additional `ctx` argument.
pub fn get_range(
&self,
ctx: &S::Ctx,
@ -170,7 +169,7 @@ where
let l = s_end - slice_start;
body = s.get_range(&c, start_pos..min_end - slice_start, l);
};
futures::future::ready(Some((Pin::from(body), (c, i + 1, 0, min_end))))
futures::future::ready(Some((body, (c, i + 1, 0, min_end))))
},
);
Box::pin(bodies.flatten().in_current_span())
@ -182,7 +181,7 @@ mod tests {
use super::{Slice, Slices};
use crate::body::BoxedError;
use db::testutil;
use futures::stream::{self, Stream, TryStreamExt};
use futures::stream::{self, TryStreamExt};
use std::ops::Range;
use std::pin::Pin;
@ -201,6 +200,7 @@ mod tests {
impl Slice for FakeSlice {
type Ctx = &'static Slices<FakeSlice>;
type Chunk = FakeChunk;
type Stream = stream::Once<futures::future::Ready<Result<FakeChunk, BoxedError>>>;
fn end(&self) -> u64 {
self.end
@ -211,11 +211,11 @@ mod tests {
_ctx: &&'static Slices<FakeSlice>,
r: Range<u64>,
_l: u64,
) -> Box<dyn Stream<Item = Result<FakeChunk, BoxedError>> + Send + Sync> {
Box::new(stream::once(futures::future::ok(FakeChunk {
) -> Self::Stream {
stream::once(futures::future::ok(FakeChunk {
slice: self.name,
range: r,
})))
}))
}
fn get_slices(ctx: &&'static Slices<FakeSlice>) -> &'static Slices<Self> {

View File

@ -8,7 +8,6 @@ use futures::StreamExt;
use retina::client::Demuxed;
use retina::codec::CodecItem;
use std::pin::Pin;
use std::result::Result;
use tracing::Instrument;
use url::Url;

View File

@ -6,7 +6,6 @@ use crate::stream;
use base::clock::{Clocks, TimerGuard};
use base::{bail, err, Error};
use db::{dir, recording, writer, Camera, Database, Stream};
use std::result::Result;
use std::str::FromStr;
use std::sync::Arc;
use tracing::{debug, info, trace, warn, Instrument};