diff --git a/guide/troubleshooting.md b/guide/troubleshooting.md index 17a6049..33ac6ba 100644 --- a/guide/troubleshooting.md +++ b/guide/troubleshooting.md @@ -85,12 +85,17 @@ Moonfire NVR names a few important thread types as follows: * `main`: during `moonfire-nvr run`, the main thread does initial setup then just waits for the other threads. In other subcommands, it does everything. -* `s-CAMERA-TYPE`: there is one of these threads for every recorded stream - (up to two per camera, where `TYPE` is `main` or `sub`). These threads read - frames from the cameras via RTSP and write them to disk. -* `sync-PATH`: there is one of these threads for every sample file directory. - These threads call `fsync` to commit sample files to disk, delete old sample - files, and flush the database. +* `s-CAMERA-TYPE` (one per stream, where `TYPE` is `main` or `sub`): + These threads read frames from the cameras via RTSP and write them to disk. +* `sync-PATH` (one per sample file directory): These threads call `fsync` to +* commit sample files to disk, delete old sample files, and flush the + database. +* `r-PATH` (one per sample file directory): These threads read sample files + from disk for serving `.mp4` files. +* `tokio-runtime-worker` (one per core): these threads handle HTTP requests. +* `logger`: this thread writes the log buffer to `stderr`. Logging is + asynchronous; other threads don't wait for log messages to be written + unless the log buffer is full. You can use the following command to teach [`lnav`](http://lnav.org/) Moonfire NVR's log format: diff --git a/server/Cargo.lock b/server/Cargo.lock index 8694ebe..25f9664 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -1126,16 +1126,6 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc" -[[package]] -name = "memmap" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "mime" version = "0.3.16" @@ -1199,6 +1189,7 @@ dependencies = [ "cstr", "failure", "fnv", + "futures", "h264-reader", "hashlink", "itertools", @@ -1220,6 +1211,7 @@ dependencies = [ "smallvec", "tempfile", "time", + "tokio", "uuid", ] @@ -1257,7 +1249,6 @@ dependencies = [ "libc", "log", "memchr", - "memmap", "moonfire-base", "moonfire-db", "moonfire-ffmpeg", diff --git a/server/Cargo.toml b/server/Cargo.toml index 6794377..552d851 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -35,13 +35,12 @@ futures = "0.3" fnv = "1.0" h264-reader = { git = "https://github.com/dholroyd/h264-reader" } http = "0.2.3" -http-serve = { version = "0.3.0", features = ["dir"] } +http-serve = { version = "0.3.1", features = ["dir"] } hyper = { version = "0.14.2", features = ["http1", "server", "stream", "tcp"] } lazy_static = "1.0" libc = "0.2" log = { version = "0.4", features = ["release_max_level_info"] } memchr = "2.0.2" -memmap = "0.7" moonfire-tflite = { git = "https://github.com/scottlamb/moonfire-tflite", features = ["edgetpu"], optional = true } mylog = { git = "https://github.com/scottlamb/mylog" } nix = "0.20.0" diff --git a/server/db/Cargo.toml b/server/db/Cargo.toml index 64b39fe..fa93e1d 100644 --- a/server/db/Cargo.toml +++ b/server/db/Cargo.toml @@ -20,6 +20,7 @@ byteorder = "1.0" cstr = "0.2.5" failure = "0.1.1" fnv = "1.0" +futures = "0.3" h264-reader = { git = "https://github.com/dholroyd/h264-reader" } hashlink = "0.7.0" lazy_static = "1.0" @@ -38,6 +39,7 @@ rusqlite = "0.25.3" smallvec = "1.0" tempfile = "3.2.0" time = "0.1" +tokio = { version = "1.0", features = ["macros", "parking_lot", "rt-multi-thread", "sync"] } uuid = { version = "0.8", features = ["std", "v4"] } itertools = "0.10.0" diff --git a/server/db/dir.rs b/server/db/dir/mod.rs similarity index 96% rename from server/db/dir.rs rename to server/db/dir/mod.rs index e84fa5e..1b3ade7 100644 --- a/server/db/dir.rs +++ b/server/db/dir/mod.rs @@ -1,12 +1,14 @@ // This file is part of Moonfire NVR, a security camera network video recorder. -// Copyright (C) 2018 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt. -// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception. +// Copyright (C) 2021 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt. +// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception //! Sample file directory management. //! //! This mostly includes opening a directory and looking for recordings within it. //! Updates to the directory happen through [crate::writer]. +mod reader; + use crate::coding; use crate::db::CompositeId; use crate::schema; @@ -23,6 +25,7 @@ use protobuf::Message; use std::ffi::CStr; use std::fs; use std::io::{Read, Write}; +use std::ops::Range; use std::os::unix::io::{AsRawFd, RawFd}; use std::sync::Arc; @@ -44,7 +47,9 @@ pub struct SampleFileDir { /// [crate::writer::start_syncer] uses it to create files and sync the /// directory. Other threads use it to open sample files for reading during /// video serving. - pub(crate) fd: Fd, + pub(crate) fd: Arc, + + reader: reader::Reader, } /// The on-disk filename of a recording file within the sample file directory. @@ -307,14 +312,17 @@ impl SampleFileDir { } fn open_self(path: &str, create: bool) -> Result, Error> { - let fd = Fd::open(path, create)?; - Ok(Arc::new(SampleFileDir { fd })) + let fd = Arc::new(Fd::open(path, create)?); + let reader = reader::Reader::spawn(path, fd.clone()); + Ok(Arc::new(SampleFileDir { + fd, + reader, + })) } /// Opens the given sample file for reading. - pub fn open_file(&self, composite_id: CompositeId) -> Result { - let p = CompositeIdPath::from(composite_id); - crate::fs::openat(self.fd.0, &p, OFlag::O_RDONLY, Mode::empty()) + pub fn open_file(&self, composite_id: CompositeId, range: Range) -> reader::FileStream { + self.reader.open_file(composite_id, range) } pub fn create_file(&self, composite_id: CompositeId) -> Result { diff --git a/server/db/dir/reader.rs b/server/db/dir/reader.rs new file mode 100644 index 0000000..e73a149 --- /dev/null +++ b/server/db/dir/reader.rs @@ -0,0 +1,372 @@ +// This file is part of Moonfire NVR, a security camera network video recorder. +// Copyright (C) 2021 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt. +// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception + +//! Reads sample files in a dedicated thread. +//! +//! Typically sample files are on spinning disk where IO operations take +//! ~10 ms on success. When disks fail, operations can stall for arbitrarily +//! long. POSIX doesn't have good support for asynchronous disk IO, +//! so it's desirable to do this from a dedicated thread for each disk rather +//! than stalling the tokio IO threads or (as when using `tokio::fs`) creating +//! unbounded numbers of workers. +//! +//! This also has some minor theoretical efficiency advantages over +//! `tokio::fs::File`: +//! * it uses `mmap`, which means fewer system calls and a somewhat faster +//! userspace `memcpy` implementation (see [Why mmap is faster than system +//! calls](https://sasha-f.medium.com/why-mmap-is-faster-than-system-calls-24718e75ab37).) +//! * it has fewer thread handoffs because it batches operations on open +//! (open, fstat, mmap, madvise, close, memcpy first chunk) and close +//! (memcpy last chunk, munmap). + +use std::convert::TryFrom; +use std::os::unix::prelude::AsRawFd; +use std::{ops::Range, pin::Pin, sync::Arc, task::{Context, Poll}}; +use std::future::Future; + +use base::bail_t; +use base::clock::{RealClocks, TimerGuard}; +use base::{Error, ErrorKind, ResultExt, format_err_t}; +use nix::{fcntl::OFlag, sys::stat::Mode}; + +use crate::CompositeId; + +/// Handle for a reader thread, used to send it commands. +/// +/// The reader will shut down after the last handle is closed. +#[derive(Clone, Debug)] +pub(super) struct Reader( + // This is an unbounded channel because FileStream::drop can't block. + // There is backpressure elsewhere: typically the caller will await the + // open/read reply before sending another request. + tokio::sync::mpsc::UnboundedSender +); + +impl Reader { + pub(super) fn spawn(path: &str, dir: Arc) -> Self { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let page_size = usize::try_from( + nix::unistd::sysconf(nix::unistd::SysconfVar::PAGE_SIZE) + .expect("PAGE_SIZE fetch must succeed") + .expect("PAGE_SIZE must be defined") + ).expect("PAGE_SIZE fits in usize"); + assert_eq!(page_size.count_ones(), 1, "invalid page size {}", page_size); + std::thread::Builder::new() + .name(format!("r-{}", path)) + .spawn(move || ReaderInt { dir, page_size }.run(rx)) + .expect("unable to create reader thread"); + Self(tx) + } + + pub(super) fn open_file(&self, composite_id: CompositeId, range: Range) -> FileStream { + if range.is_empty() { + return FileStream { + state: FileStreamState::Invalid, + reader: Reader(self.0.clone()), + } + } + let (tx, rx) = tokio::sync::oneshot::channel(); + self.send(ReaderCommand::OpenFile { + composite_id, + range, + tx, + }); + FileStream { + state: FileStreamState::Reading(rx), + reader: Reader(self.0.clone()), + } + } + + fn send(&self, cmd: ReaderCommand) { + self.0.send(cmd).map_err(|_| ()).expect("reader thread panicked; see logs."); + } +} + +pub struct FileStream { + state: FileStreamState, + reader: Reader, +} + +type ReadReceiver = tokio::sync::oneshot::Receiver, Vec), Error>>; + +enum FileStreamState { + Idle(OpenFile), + Reading(ReadReceiver), + Invalid, +} + +impl FileStream { + /// Helper for reading during `poll_next`. + fn read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut rx: ReadReceiver, + ) -> Poll, Error>>> { + match Pin::new(&mut rx).poll(cx) { + Poll::Ready(Err(_)) => { + self.state = FileStreamState::Invalid; + Poll::Ready(Some(Err(format_err_t!(Internal, "reader thread panicked; see logs")))) + }, + Poll::Ready(Ok(Err(e))) => { + self.state = FileStreamState::Invalid; + Poll::Ready(Some(Err(e))) + }, + Poll::Ready(Ok(Ok((Some(file), chunk)))) => { + self.state = FileStreamState::Idle(file); + Poll::Ready(Some(Ok(chunk))) + }, + Poll::Ready(Ok(Ok((None, chunk)))) => { + self.state = FileStreamState::Invalid; + Poll::Ready(Some(Ok(chunk))) + }, + Poll::Pending => { + self.state = FileStreamState::Reading(rx); + Poll::Pending + }, + } + } +} + +impl futures::stream::Stream for FileStream { + type Item = Result, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match std::mem::replace(&mut self.state, FileStreamState::Invalid) { + FileStreamState::Idle(file) => { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.reader.send(ReaderCommand::ReadNextChunk { + file, + tx, + }); + + // Try reading right away. It probably will return pending, but Receiver + // needs to see the waker. + self.read(cx, rx) + }, + FileStreamState::Reading(rx) => self.read(cx, rx), + FileStreamState::Invalid => Poll::Ready(None), + } + } +} + +impl Drop for FileStream { + fn drop(&mut self) { + use FileStreamState::{Idle, Invalid}; + if let Idle(file) = std::mem::replace(&mut self.state, Invalid) { + // This will succeed unless reader has panicked. If that happened, + // the logfiles will be loud anyway; no need to add additional + // error messages. + let _ = self.reader.0.send(ReaderCommand::CloseFile(file)); + } + } +} + +/// An open, `mmap()`ed file. +/// +/// This is only actually used by the reader thread, but ownership is passed +/// around between it and the [FileStream] to avoid maintaining extra data +/// structures. +struct OpenFile { + composite_id: CompositeId, + + /// The memory-mapped region backed by the file. Valid up to length `map_len`. + map_ptr: *mut libc::c_void, + + /// The position within the memory mapping. Invariant: `map_pos < map_len`. + map_pos: usize, + + /// The length of the memory mapping. This may be less than the length of + /// the file. + map_len: usize, +} + +// Rust makes us manually state these because of the `*mut` ptr above. +unsafe impl Send for OpenFile {} +unsafe impl Sync for OpenFile {} + +impl Drop for OpenFile { + fn drop(&mut self) { + if let Err(e) = unsafe { + nix::sys::mman::munmap(self.map_ptr as *mut std::ffi::c_void, self.map_len) + } { + // This should never happen. + log::error!("unable to munmap {}, {:?} len {}: {}", + self.composite_id, self.map_ptr, self.map_len, e); + } + } +} + +enum ReaderCommand { + /// Opens a file and reads the first chunk. + OpenFile { + composite_id: CompositeId, + range: std::ops::Range, + tx: tokio::sync::oneshot::Sender, Vec), Error>>, + }, + + /// Reads the next chunk of the file. + ReadNextChunk { + file: OpenFile, + tx: tokio::sync::oneshot::Sender, Vec), Error>>, + }, + + /// Closes the file early, as when the [FileStream] is dropped before completing. + CloseFile(OpenFile), +} + +struct ReaderInt { + /// File descriptor of the sample file directory. + dir: Arc, + + /// The page size as returned by `sysconf`; guaranteed to be a power of two. + page_size: usize, +} + +impl ReaderInt { + fn run(self, mut rx: tokio::sync::mpsc::UnboundedReceiver) { + while let Some(cmd) = rx.blocking_recv() { + // OpenFile's Drop implementation takes care of closing the file on error paths and + // the CloseFile operation. + match cmd { + ReaderCommand::OpenFile { composite_id, range, tx } => { + if tx.is_closed() { // avoid spending effort on expired commands + continue; + } + let _guard = TimerGuard::new(&RealClocks {}, || format!("open {}", composite_id)); + let _ = tx.send(self.open(composite_id, range)); + }, + ReaderCommand::ReadNextChunk { file, tx } => { + if tx.is_closed() { // avoid spending effort on expired commands + continue; + } + let composite_id = file.composite_id; + let _guard = TimerGuard::new( + &RealClocks {}, + || format!("read from {}", composite_id), + ); + let _ = tx.send(Ok(self.chunk(file))); + }, + ReaderCommand::CloseFile(_) => {}, + } + } + } + + fn open( + &self, + composite_id: CompositeId, + range: Range + ) -> Result<(Option, Vec), Error> { + let p = super::CompositeIdPath::from(composite_id); + + // Reader::open_file checks for an empty range, but check again right + // before the unsafe block to make it easier to audit the safety constraints. + assert!(range.start < range.end); + + // mmap offsets must be aligned to page size boundaries. + let unaligned = (range.start as usize) & (self.page_size - 1); + let offset = libc::off_t::try_from(range.start).expect("range.start fits in off_t") - + libc::off_t::try_from(unaligned).expect("usize fits in off_t"); + + // Recordings from very high bitrate streams could theoretically exceed exhaust a 32-bit + // machine's address space, causing either this usize::MAX error or mmap + // failure. If that happens in practice, we'll have to stop mmap()ing + // the whole range. + let map_len = usize::try_from( + range.end - range.start + u64::try_from(unaligned).expect("usize fits in u64")) + .map_err(|_| format_err_t!( + OutOfRange, "file {}'s range {:?} len exceeds usize::MAX", + composite_id, range))?; + + let file = crate::fs::openat(self.dir.0, &p, OFlag::O_RDONLY, Mode::empty()) + .err_kind(ErrorKind::Unknown)?; + + // Check the actual on-disk file length. It's an error (a bug or filesystem corruption) + // for it to be less than the requested read. Check for this now rather than crashing + // with a SIGBUS or reading bad data at the end of the last page later. + let metadata = file.metadata().err_kind(ErrorKind::Unknown)?; + if metadata.len() < u64::try_from(offset).unwrap() + u64::try_from(map_len).unwrap() { + bail_t!(Internal, "file {}, range {:?}, len {}", composite_id, range, metadata.len()); + } + let map_ptr = unsafe { + nix::sys::mman::mmap( + std::ptr::null_mut(), + map_len, + nix::sys::mman::ProtFlags::PROT_READ, + nix::sys::mman::MapFlags::MAP_SHARED, + file.as_raw_fd(), + offset + ) + }.map_err(|e| format_err_t!(Internal, "mmap failed for {} off={} len={}: {}", + composite_id, offset, map_len, e))?; + + if let Err(e) = unsafe { + nix::sys::mman::madvise( + map_ptr as *mut libc::c_void, + map_len, + nix::sys::mman::MmapAdvise::MADV_SEQUENTIAL, + ) + } { + // This shouldn't happen but is "just" a performance problem. + log::warn!("madvise(MADV_SEQUENTIAL) failed for {} off={} len={}: {}", + composite_id, offset, map_len, e); + } + + Ok(self.chunk(OpenFile { + composite_id, + map_ptr, + map_pos: unaligned, + map_len, + })) + } + + fn chunk(&self, mut file: OpenFile) -> (Option, Vec) { + // Read a chunk that's large enough to minimize thread handoffs but + // short enough to keep memory usage under control. It's hopefully + // unnecessary to worry about disk seeks; the madvise call should cause + // the kernel to read ahead. + let end = std::cmp::min(file.map_len, file.map_pos.saturating_add(1<<16)); + let mut chunk = Vec::new(); + let len = end.checked_sub(file.map_pos).unwrap(); + chunk.reserve_exact(len); + + // SAFETY: [map_pos, map_pos + len) is verified to be within map_ptr. + // + // If the read is out of bounds of the file, we'll get a SIGBUS. + // That's not a safety violation. It also shouldn't happen because the + // length was set properly at open time, Moonfire NVR is a closed + // system (nothing else ever touches its files), and sample files are + // never truncated (only appended to or unlinked). + unsafe { + std::ptr::copy_nonoverlapping( + file.map_ptr.add(file.map_pos) as *const u8, + chunk.as_mut_ptr(), + len, + ); + chunk.set_len(len); + } + let file = if end == file.map_len { + None + } else { + file.map_pos = end; + Some(file) + }; + (file, chunk) + } +} + +#[cfg(test)] +mod tests { + use futures::TryStreamExt; + + #[tokio::test] + async fn basic() { + crate::testutil::init(); + let tmpdir = tempfile::Builder::new().prefix("moonfire-db-test-reader").tempdir().unwrap(); + let fd = std::sync::Arc::new(super::super::Fd::open(tmpdir.path(), false).unwrap()); + let reader = super::Reader::spawn("/path/goes/here", fd); + std::fs::write(tmpdir.path().join("0123456789abcdef"), b"blah blah").unwrap(); + let f = reader.open_file(crate::CompositeId(0x01234567_89abcdef), 1..8); + assert_eq!(f.try_concat().await.unwrap(), b"lah bla"); + } +} diff --git a/server/src/body.rs b/server/src/body.rs index 257222e..490aa7c 100644 --- a/server/src/body.rs +++ b/server/src/body.rs @@ -1,8 +1,18 @@ // This file is part of Moonfire NVR, a security camera network video recorder. -// Copyright (C) 2018 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt. -// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception. +// Copyright (C) 2021 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt. +// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception -//! Tools for implementing a `http_serve::Entity` body composed from many "slices". +//! HTTP body implementation using `ARefss<'static, [u8]>` chunks. +//! +//! Moonfire NVR uses this custom chunk type rather than [bytes::Bytes]. This +//! is mostly for historical reasons: we used to use `mmap`-backed chunks. +//! The custom chunk type also helps minimize reference-counting in `mp4::File` +//! as described [here](https://github.com/tokio-rs/bytes/issues/359#issuecomment-640812016), +//! although this is a pretty small optimization. +//! +//! Some day I expect [bytes::Bytes] will expose its vtable (see link above), +//! allowing us to minimize reference-counting while using the standard +//! [hyper::Body]. use base::Error; use futures::{stream, Stream}; diff --git a/server/src/mp4.rs b/server/src/mp4.rs index 976c239..abdc2b4 100644 --- a/server/src/mp4.rs +++ b/server/src/mp4.rs @@ -1,6 +1,6 @@ // This file is part of Moonfire NVR, a security camera network video recorder. // Copyright (C) 2021 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt. -// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception. +// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception //! `.mp4` virtual file serving. //! @@ -61,7 +61,7 @@ use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use bytes::BytesMut; use db::dir; use db::recording::{self, rescale, TIME_UNITS_PER_SEC}; -use futures::stream; +use futures::stream::{self, TryStreamExt}; use futures::Stream; use http::header::HeaderValue; use hyper::body::Buf; @@ -801,7 +801,7 @@ impl slices::Slice for Slice { SliceType::Stsz => self.wrap_index(f, range.clone(), len, &Segment::stsz), SliceType::Stss => self.wrap_index(f, range.clone(), len, &Segment::stss), SliceType::Co64 => f.0.get_co64(range.clone(), len), - SliceType::VideoSampleData => f.0.get_video_sample_data(p, range.clone()), + SliceType::VideoSampleData => return f.0.get_video_sample_data(p, range.clone()), SliceType::SubtitleSampleData => f.0.get_subtitle_sample_data(p, range.clone(), len), SliceType::Truns => self.wrap_truns(f, range.clone(), len as usize), }; @@ -1799,24 +1799,19 @@ impl FileInner { /// /// * If the backing file is truncated, the program will crash with `SIGBUS`. This shouldn't /// happen because nothing should be touching Moonfire NVR's files but itself. - fn get_video_sample_data(&self, i: usize, r: Range) -> Result { + fn get_video_sample_data(&self, i: usize, r: Range) -> Box> + Send + Sync> { let s = &self.segments[i]; - let f = self - .dirs_by_stream_id - .get(&s.s.id.stream()) - .ok_or_else(|| format_err_t!(NotFound, "{}: stream not found", s.s.id))? - .open_file(s.s.id) - .err_kind(ErrorKind::Unknown)?; - let start = s.s.sample_file_range().start + r.start; - let mmap = Box::new(unsafe { - memmap::MmapOptions::new() - .offset(start) - .len((r.end - r.start) as usize) - .map(&f) - .err_kind(ErrorKind::Internal)? - }); - use core::ops::Deref; - Ok(ARefss::new(mmap).map(|m| m.deref()).into()) + let sr = s.s.sample_file_range(); + let f = match self.dirs_by_stream_id.get(&s.s.id.stream()) { + None => return Box::new(stream::iter(std::iter::once(Err( + wrap_error(format_err_t!(NotFound, "{}: stream not found", s.s.id)) + )))), + Some(d) => d.open_file(s.s.id, (r.start + sr.start)..(r.end + sr.start - r.start)), + }; + Box::new(f + .map_ok(Chunk::from) + .map_err(wrap_error) + ) } fn get_subtitle_sample_data(&self, i: usize, r: Range, len: u64) -> Result {