mirror of
https://github.com/scottlamb/moonfire-nvr.git
synced 2025-11-10 05:59:44 -05:00
read sample files from dedicated threads
Reading from the mmap()ed region in the tokio threads could cause
them to stall:
* That could affect UI serving when there were concurrent
UI requests (i.e., not just requests that needed the reads in
question anyway).
* If there's a faulty disk, it could cause the UI to totally hang.
Better to not mix disks between threads.
* Soon, I want to handle RTSP from the tokio threads (#37). Similarly,
we don't want RTSP streaming to block on operations from unrelated
disks.
I went with just one thread per disk which I think is sufficient.
But it'd be possible to do a fixed-size pool instead which might improve
latency when some pages are already cached.
I also dropped the memmap dependency. I had to compute the page
alignment anyway to get mremap to work, and Moonfire NVR already is
Unix-specific, so there wasn't much value from the memmap or memmap2
crates.
Fixes #88
This commit is contained in:
419
server/db/dir/mod.rs
Normal file
419
server/db/dir/mod.rs
Normal file
@@ -0,0 +1,419 @@
|
||||
// This file is part of Moonfire NVR, a security camera network video recorder.
|
||||
// Copyright (C) 2021 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt.
|
||||
// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception
|
||||
|
||||
//! Sample file directory management.
|
||||
//!
|
||||
//! This mostly includes opening a directory and looking for recordings within it.
|
||||
//! Updates to the directory happen through [crate::writer].
|
||||
|
||||
mod reader;
|
||||
|
||||
use crate::coding;
|
||||
use crate::db::CompositeId;
|
||||
use crate::schema;
|
||||
use cstr::cstr;
|
||||
use failure::{bail, format_err, Error, Fail};
|
||||
use log::warn;
|
||||
use nix::sys::statvfs::Statvfs;
|
||||
use nix::{
|
||||
fcntl::{FlockArg, OFlag},
|
||||
sys::stat::Mode,
|
||||
NixPath,
|
||||
};
|
||||
use protobuf::Message;
|
||||
use std::ffi::CStr;
|
||||
use std::fs;
|
||||
use std::io::{Read, Write};
|
||||
use std::ops::Range;
|
||||
use std::os::unix::io::{AsRawFd, RawFd};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// The fixed length of a directory's `meta` file.
|
||||
///
|
||||
/// See `DirMeta` comments within `proto/schema.proto` for more explanation.
|
||||
const FIXED_DIR_META_LEN: usize = 512;
|
||||
|
||||
/// A sample file directory. Typically one per physical disk drive.
|
||||
///
|
||||
/// If the directory is used for writing, [crate::writer::start_syncer] should be
|
||||
/// called to start a background thread. This thread manages deleting files and
|
||||
/// writing new files. It synces the directory and commits these operations to
|
||||
/// the database in the correct order to maintain the invariants described in
|
||||
/// `design/schema.md`.
|
||||
#[derive(Debug)]
|
||||
pub struct SampleFileDir {
|
||||
/// The open file descriptor for the directory. The worker created by
|
||||
/// [crate::writer::start_syncer] uses it to create files and sync the
|
||||
/// directory. Other threads use it to open sample files for reading during
|
||||
/// video serving.
|
||||
pub(crate) fd: Arc<Fd>,
|
||||
|
||||
reader: reader::Reader,
|
||||
}
|
||||
|
||||
/// The on-disk filename of a recording file within the sample file directory.
|
||||
/// This is the [`CompositeId`](crate::db::CompositeId) as 16 hexadigits. It's
|
||||
/// null-terminated so it can be passed to system calls without copying.
|
||||
pub(crate) struct CompositeIdPath([u8; 17]);
|
||||
|
||||
impl CompositeIdPath {
|
||||
pub(crate) fn from(id: CompositeId) -> Self {
|
||||
let mut buf = [0u8; 17];
|
||||
write!(&mut buf[..16], "{:016x}", id.0).expect("can't format id to pathname buf");
|
||||
CompositeIdPath(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl NixPath for CompositeIdPath {
|
||||
fn is_empty(&self) -> bool {
|
||||
false
|
||||
}
|
||||
fn len(&self) -> usize {
|
||||
16
|
||||
}
|
||||
|
||||
fn with_nix_path<T, F>(&self, f: F) -> Result<T, nix::Error>
|
||||
where
|
||||
F: FnOnce(&CStr) -> T,
|
||||
{
|
||||
let p = CStr::from_bytes_with_nul(&self.0[..]).expect("no interior nuls");
|
||||
Ok(f(p))
|
||||
}
|
||||
}
|
||||
|
||||
/// A file descriptor associated with a directory (not necessarily the sample file dir).
|
||||
#[derive(Debug)]
|
||||
pub struct Fd(std::os::unix::io::RawFd);
|
||||
|
||||
impl std::os::unix::io::AsRawFd for Fd {
|
||||
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Fd {
|
||||
fn drop(&mut self) {
|
||||
if let Err(e) = nix::unistd::close(self.0) {
|
||||
warn!("Unable to close sample file dir: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Fd {
|
||||
/// Opens the given path as a directory.
|
||||
pub fn open<P: ?Sized + NixPath>(path: &P, mkdir: bool) -> Result<Fd, nix::Error> {
|
||||
if mkdir {
|
||||
match nix::unistd::mkdir(path, nix::sys::stat::Mode::S_IRWXU) {
|
||||
Ok(()) | Err(nix::Error::Sys(nix::errno::Errno::EEXIST)) => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
let fd = nix::fcntl::open(path, OFlag::O_DIRECTORY | OFlag::O_RDONLY, Mode::empty())?;
|
||||
Ok(Fd(fd))
|
||||
}
|
||||
|
||||
/// `fsync`s this directory, causing all file metadata to be committed to permanent storage.
|
||||
pub(crate) fn sync(&self) -> Result<(), nix::Error> {
|
||||
nix::unistd::fsync(self.0)
|
||||
}
|
||||
|
||||
/// Locks the directory with the specified `flock` operation.
|
||||
pub fn lock(&self, arg: FlockArg) -> Result<(), nix::Error> {
|
||||
nix::fcntl::flock(self.0, arg)
|
||||
}
|
||||
|
||||
/// Returns information about the filesystem on which this directory lives.
|
||||
pub fn statfs(&self) -> Result<nix::sys::statvfs::Statvfs, nix::Error> {
|
||||
nix::sys::statvfs::fstatvfs(self)
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads `dir`'s metadata. If none is found, returns an empty proto.
|
||||
pub(crate) fn read_meta(dir: &Fd) -> Result<schema::DirMeta, Error> {
|
||||
let mut meta = schema::DirMeta::default();
|
||||
let mut f = match crate::fs::openat(dir.0, cstr!("meta"), OFlag::O_RDONLY, Mode::empty()) {
|
||||
Err(e) => {
|
||||
if e == nix::Error::Sys(nix::errno::Errno::ENOENT) {
|
||||
return Ok(meta);
|
||||
}
|
||||
return Err(e.into());
|
||||
}
|
||||
Ok(f) => f,
|
||||
};
|
||||
let mut data = Vec::new();
|
||||
f.read_to_end(&mut data)?;
|
||||
let (len, pos) = coding::decode_varint32(&data, 0)
|
||||
.map_err(|_| format_err!("Unable to decode varint length in meta file"))?;
|
||||
if data.len() != FIXED_DIR_META_LEN || len as usize + pos > FIXED_DIR_META_LEN {
|
||||
bail!(
|
||||
"Expected a {}-byte file with a varint length of a DirMeta message; got \
|
||||
a {}-byte file with length {}",
|
||||
FIXED_DIR_META_LEN,
|
||||
data.len(),
|
||||
len
|
||||
);
|
||||
}
|
||||
let data = &data[pos..pos + len as usize];
|
||||
let mut s = protobuf::CodedInputStream::from_bytes(&data);
|
||||
meta.merge_from(&mut s)
|
||||
.map_err(|e| e.context("Unable to parse metadata proto"))?;
|
||||
Ok(meta)
|
||||
}
|
||||
|
||||
/// Writes `dirfd`'s metadata, clobbering existing data.
|
||||
pub(crate) fn write_meta(dirfd: RawFd, meta: &schema::DirMeta) -> Result<(), Error> {
|
||||
let mut data = meta
|
||||
.write_length_delimited_to_bytes()
|
||||
.expect("proto3->vec is infallible");
|
||||
if data.len() > FIXED_DIR_META_LEN {
|
||||
bail!(
|
||||
"Length-delimited DirMeta message requires {} bytes, over limit of {}",
|
||||
data.len(),
|
||||
FIXED_DIR_META_LEN
|
||||
);
|
||||
}
|
||||
data.resize(FIXED_DIR_META_LEN, 0); // pad to required length.
|
||||
let mut f = crate::fs::openat(
|
||||
dirfd,
|
||||
cstr!("meta"),
|
||||
OFlag::O_CREAT | OFlag::O_WRONLY,
|
||||
Mode::S_IRUSR | Mode::S_IWUSR,
|
||||
)
|
||||
.map_err(|e| e.context("Unable to open meta file"))?;
|
||||
let stat = f
|
||||
.metadata()
|
||||
.map_err(|e| e.context("Unable to stat meta file"))?;
|
||||
if stat.len() == 0 {
|
||||
// Need to sync not only the data but also the file metadata and dirent.
|
||||
f.write_all(&data)
|
||||
.map_err(|e| e.context("Unable to write to meta file"))?;
|
||||
f.sync_all()
|
||||
.map_err(|e| e.context("Unable to sync meta file"))?;
|
||||
nix::unistd::fsync(dirfd).map_err(|e| e.context("Unable to sync dir"))?;
|
||||
} else if stat.len() == FIXED_DIR_META_LEN as u64 {
|
||||
// Just syncing the data will suffice; existing metadata and dirent are fine.
|
||||
f.write_all(&data)
|
||||
.map_err(|e| e.context("Unable to write to meta file"))?;
|
||||
f.sync_data()
|
||||
.map_err(|e| e.context("Unable to sync meta file"))?;
|
||||
} else {
|
||||
bail!(
|
||||
"Existing meta file is {}-byte; expected {}",
|
||||
stat.len(),
|
||||
FIXED_DIR_META_LEN
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl SampleFileDir {
|
||||
/// Opens the directory using the given metadata.
|
||||
///
|
||||
/// `db_meta.in_progress_open` should be filled if the directory should be opened in read/write
|
||||
/// mode; absent in read-only mode.
|
||||
pub fn open(path: &str, db_meta: &schema::DirMeta) -> Result<Arc<SampleFileDir>, Error> {
|
||||
let read_write = db_meta.in_progress_open.is_some();
|
||||
let s = SampleFileDir::open_self(path, false)?;
|
||||
s.fd.lock(if read_write {
|
||||
FlockArg::LockExclusiveNonblock
|
||||
} else {
|
||||
FlockArg::LockSharedNonblock
|
||||
})
|
||||
.map_err(|e| e.context(format!("unable to lock dir {}", path)))?;
|
||||
let dir_meta = read_meta(&s.fd).map_err(|e| e.context("unable to read meta file"))?;
|
||||
if !SampleFileDir::consistent(db_meta, &dir_meta) {
|
||||
let serialized = db_meta
|
||||
.write_length_delimited_to_bytes()
|
||||
.expect("proto3->vec is infallible");
|
||||
bail!(
|
||||
"metadata mismatch.\ndb: {:#?}\ndir: {:#?}\nserialized db: {:#?}",
|
||||
db_meta,
|
||||
&dir_meta,
|
||||
&serialized
|
||||
);
|
||||
}
|
||||
if db_meta.in_progress_open.is_some() {
|
||||
s.write_meta(db_meta)?;
|
||||
}
|
||||
Ok(s)
|
||||
}
|
||||
|
||||
/// Returns true if the existing directory and database metadata are consistent; the directory
|
||||
/// is then openable.
|
||||
pub(crate) fn consistent(db_meta: &schema::DirMeta, dir_meta: &schema::DirMeta) -> bool {
|
||||
if dir_meta.db_uuid != db_meta.db_uuid {
|
||||
return false;
|
||||
}
|
||||
if dir_meta.dir_uuid != db_meta.dir_uuid {
|
||||
return false;
|
||||
}
|
||||
|
||||
if db_meta.last_complete_open.is_some()
|
||||
&& (db_meta.last_complete_open != dir_meta.last_complete_open
|
||||
&& db_meta.last_complete_open != dir_meta.in_progress_open)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if db_meta.last_complete_open.is_none() && dir_meta.last_complete_open.is_some() {
|
||||
return false;
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
pub(crate) fn create(
|
||||
path: &str,
|
||||
db_meta: &schema::DirMeta,
|
||||
) -> Result<Arc<SampleFileDir>, Error> {
|
||||
let s = SampleFileDir::open_self(path, true)?;
|
||||
s.fd.lock(FlockArg::LockExclusiveNonblock)
|
||||
.map_err(|e| e.context(format!("unable to lock dir {}", path)))?;
|
||||
let old_meta = read_meta(&s.fd)?;
|
||||
|
||||
// Verify metadata. We only care that it hasn't been completely opened.
|
||||
// Partial opening by this or another database is fine; we won't overwrite anything.
|
||||
if old_meta.last_complete_open.is_some() {
|
||||
bail!(
|
||||
"Can't create dir at path {}: is already in use:\n{:?}",
|
||||
path,
|
||||
old_meta
|
||||
);
|
||||
}
|
||||
if !s.is_empty()? {
|
||||
bail!("Can't create dir at path {} with existing files", path);
|
||||
}
|
||||
s.write_meta(db_meta)?;
|
||||
Ok(s)
|
||||
}
|
||||
|
||||
pub(crate) fn opendir(&self) -> Result<nix::dir::Dir, nix::Error> {
|
||||
nix::dir::Dir::openat(
|
||||
self.fd.as_raw_fd(),
|
||||
".",
|
||||
OFlag::O_DIRECTORY | OFlag::O_RDONLY,
|
||||
Mode::empty(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Determines if the directory is empty, aside form metadata.
|
||||
pub(crate) fn is_empty(&self) -> Result<bool, Error> {
|
||||
let mut dir = self.opendir()?;
|
||||
for e in dir.iter() {
|
||||
let e = e?;
|
||||
match e.file_name().to_bytes() {
|
||||
b"." | b".." => continue,
|
||||
b"meta" => continue, // existing metadata is fine.
|
||||
_ => return Ok(false),
|
||||
}
|
||||
}
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn open_self(path: &str, create: bool) -> Result<Arc<SampleFileDir>, Error> {
|
||||
let fd = Arc::new(Fd::open(path, create)?);
|
||||
let reader = reader::Reader::spawn(path, fd.clone());
|
||||
Ok(Arc::new(SampleFileDir {
|
||||
fd,
|
||||
reader,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Opens the given sample file for reading.
|
||||
pub fn open_file(&self, composite_id: CompositeId, range: Range<u64>) -> reader::FileStream {
|
||||
self.reader.open_file(composite_id, range)
|
||||
}
|
||||
|
||||
pub fn create_file(&self, composite_id: CompositeId) -> Result<fs::File, nix::Error> {
|
||||
let p = CompositeIdPath::from(composite_id);
|
||||
crate::fs::openat(
|
||||
self.fd.0,
|
||||
&p,
|
||||
OFlag::O_WRONLY | OFlag::O_EXCL | OFlag::O_CREAT,
|
||||
Mode::S_IRUSR | Mode::S_IWUSR,
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn write_meta(&self, meta: &schema::DirMeta) -> Result<(), Error> {
|
||||
write_meta(self.fd.0, meta)
|
||||
}
|
||||
|
||||
pub fn statfs(&self) -> Result<Statvfs, nix::Error> {
|
||||
self.fd.statfs()
|
||||
}
|
||||
|
||||
/// Unlinks the given sample file within this directory.
|
||||
pub(crate) fn unlink_file(&self, id: CompositeId) -> Result<(), nix::Error> {
|
||||
let p = CompositeIdPath::from(id);
|
||||
nix::unistd::unlinkat(Some(self.fd.0), &p, nix::unistd::UnlinkatFlags::NoRemoveDir)
|
||||
}
|
||||
|
||||
/// Syncs the directory itself.
|
||||
pub(crate) fn sync(&self) -> Result<(), nix::Error> {
|
||||
self.fd.sync()
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses a composite id filename.
|
||||
///
|
||||
/// These are exactly 16 bytes, lowercase hex, as created by [CompositeIdPath].
|
||||
pub(crate) fn parse_id(id: &[u8]) -> Result<CompositeId, ()> {
|
||||
if id.len() != 16 {
|
||||
return Err(());
|
||||
}
|
||||
let mut v: u64 = 0;
|
||||
for b in id {
|
||||
v = (v << 4)
|
||||
| match b {
|
||||
b @ b'0'..=b'9' => b - b'0',
|
||||
b @ b'a'..=b'f' => b - b'a' + 10,
|
||||
_ => return Err(()),
|
||||
} as u64;
|
||||
}
|
||||
Ok(CompositeId(v as i64))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parse_id() {
|
||||
use super::parse_id;
|
||||
assert_eq!(parse_id(b"0000000000000000").unwrap().0, 0);
|
||||
assert_eq!(parse_id(b"0000000100000002").unwrap().0, 0x0000000100000002);
|
||||
parse_id(b"").unwrap_err();
|
||||
parse_id(b"meta").unwrap_err();
|
||||
parse_id(b"0").unwrap_err();
|
||||
parse_id(b"000000010000000x").unwrap_err();
|
||||
}
|
||||
|
||||
/// Ensures that a DirMeta with all fields filled fits within the maximum size.
|
||||
#[test]
|
||||
fn max_len_meta() {
|
||||
let mut meta = schema::DirMeta::new();
|
||||
let fake_uuid = &[0u8; 16][..];
|
||||
meta.db_uuid.extend_from_slice(fake_uuid);
|
||||
meta.dir_uuid.extend_from_slice(fake_uuid);
|
||||
{
|
||||
let o = meta.last_complete_open.set_default();
|
||||
o.id = u32::max_value();
|
||||
o.uuid.extend_from_slice(fake_uuid);
|
||||
}
|
||||
{
|
||||
let o = meta.in_progress_open.set_default();
|
||||
o.id = u32::max_value();
|
||||
o.uuid.extend_from_slice(fake_uuid);
|
||||
}
|
||||
let data = meta
|
||||
.write_length_delimited_to_bytes()
|
||||
.expect("proto3->vec is infallible");
|
||||
assert!(
|
||||
data.len() <= FIXED_DIR_META_LEN,
|
||||
"{} vs {}",
|
||||
data.len(),
|
||||
FIXED_DIR_META_LEN
|
||||
);
|
||||
}
|
||||
}
|
||||
372
server/db/dir/reader.rs
Normal file
372
server/db/dir/reader.rs
Normal file
@@ -0,0 +1,372 @@
|
||||
// This file is part of Moonfire NVR, a security camera network video recorder.
|
||||
// Copyright (C) 2021 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt.
|
||||
// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception
|
||||
|
||||
//! Reads sample files in a dedicated thread.
|
||||
//!
|
||||
//! Typically sample files are on spinning disk where IO operations take
|
||||
//! ~10 ms on success. When disks fail, operations can stall for arbitrarily
|
||||
//! long. POSIX doesn't have good support for asynchronous disk IO,
|
||||
//! so it's desirable to do this from a dedicated thread for each disk rather
|
||||
//! than stalling the tokio IO threads or (as when using `tokio::fs`) creating
|
||||
//! unbounded numbers of workers.
|
||||
//!
|
||||
//! This also has some minor theoretical efficiency advantages over
|
||||
//! `tokio::fs::File`:
|
||||
//! * it uses `mmap`, which means fewer system calls and a somewhat faster
|
||||
//! userspace `memcpy` implementation (see [Why mmap is faster than system
|
||||
//! calls](https://sasha-f.medium.com/why-mmap-is-faster-than-system-calls-24718e75ab37).)
|
||||
//! * it has fewer thread handoffs because it batches operations on open
|
||||
//! (open, fstat, mmap, madvise, close, memcpy first chunk) and close
|
||||
//! (memcpy last chunk, munmap).
|
||||
|
||||
use std::convert::TryFrom;
|
||||
use std::os::unix::prelude::AsRawFd;
|
||||
use std::{ops::Range, pin::Pin, sync::Arc, task::{Context, Poll}};
|
||||
use std::future::Future;
|
||||
|
||||
use base::bail_t;
|
||||
use base::clock::{RealClocks, TimerGuard};
|
||||
use base::{Error, ErrorKind, ResultExt, format_err_t};
|
||||
use nix::{fcntl::OFlag, sys::stat::Mode};
|
||||
|
||||
use crate::CompositeId;
|
||||
|
||||
/// Handle for a reader thread, used to send it commands.
|
||||
///
|
||||
/// The reader will shut down after the last handle is closed.
|
||||
#[derive(Clone, Debug)]
|
||||
pub(super) struct Reader(
|
||||
// This is an unbounded channel because FileStream::drop can't block.
|
||||
// There is backpressure elsewhere: typically the caller will await the
|
||||
// open/read reply before sending another request.
|
||||
tokio::sync::mpsc::UnboundedSender<ReaderCommand>
|
||||
);
|
||||
|
||||
impl Reader {
|
||||
pub(super) fn spawn(path: &str, dir: Arc<super::Fd>) -> Self {
|
||||
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let page_size = usize::try_from(
|
||||
nix::unistd::sysconf(nix::unistd::SysconfVar::PAGE_SIZE)
|
||||
.expect("PAGE_SIZE fetch must succeed")
|
||||
.expect("PAGE_SIZE must be defined")
|
||||
).expect("PAGE_SIZE fits in usize");
|
||||
assert_eq!(page_size.count_ones(), 1, "invalid page size {}", page_size);
|
||||
std::thread::Builder::new()
|
||||
.name(format!("r-{}", path))
|
||||
.spawn(move || ReaderInt { dir, page_size }.run(rx))
|
||||
.expect("unable to create reader thread");
|
||||
Self(tx)
|
||||
}
|
||||
|
||||
pub(super) fn open_file(&self, composite_id: CompositeId, range: Range<u64>) -> FileStream {
|
||||
if range.is_empty() {
|
||||
return FileStream {
|
||||
state: FileStreamState::Invalid,
|
||||
reader: Reader(self.0.clone()),
|
||||
}
|
||||
}
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
self.send(ReaderCommand::OpenFile {
|
||||
composite_id,
|
||||
range,
|
||||
tx,
|
||||
});
|
||||
FileStream {
|
||||
state: FileStreamState::Reading(rx),
|
||||
reader: Reader(self.0.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
fn send(&self, cmd: ReaderCommand) {
|
||||
self.0.send(cmd).map_err(|_| ()).expect("reader thread panicked; see logs.");
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FileStream {
|
||||
state: FileStreamState,
|
||||
reader: Reader,
|
||||
}
|
||||
|
||||
type ReadReceiver = tokio::sync::oneshot::Receiver<Result<(Option<OpenFile>, Vec<u8>), Error>>;
|
||||
|
||||
enum FileStreamState {
|
||||
Idle(OpenFile),
|
||||
Reading(ReadReceiver),
|
||||
Invalid,
|
||||
}
|
||||
|
||||
impl FileStream {
|
||||
/// Helper for reading during `poll_next`.
|
||||
fn read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
mut rx: ReadReceiver,
|
||||
) -> Poll<Option<Result<Vec<u8>, Error>>> {
|
||||
match Pin::new(&mut rx).poll(cx) {
|
||||
Poll::Ready(Err(_)) => {
|
||||
self.state = FileStreamState::Invalid;
|
||||
Poll::Ready(Some(Err(format_err_t!(Internal, "reader thread panicked; see logs"))))
|
||||
},
|
||||
Poll::Ready(Ok(Err(e))) => {
|
||||
self.state = FileStreamState::Invalid;
|
||||
Poll::Ready(Some(Err(e)))
|
||||
},
|
||||
Poll::Ready(Ok(Ok((Some(file), chunk)))) => {
|
||||
self.state = FileStreamState::Idle(file);
|
||||
Poll::Ready(Some(Ok(chunk)))
|
||||
},
|
||||
Poll::Ready(Ok(Ok((None, chunk)))) => {
|
||||
self.state = FileStreamState::Invalid;
|
||||
Poll::Ready(Some(Ok(chunk)))
|
||||
},
|
||||
Poll::Pending => {
|
||||
self.state = FileStreamState::Reading(rx);
|
||||
Poll::Pending
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl futures::stream::Stream for FileStream {
|
||||
type Item = Result<Vec<u8>, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
match std::mem::replace(&mut self.state, FileStreamState::Invalid) {
|
||||
FileStreamState::Idle(file) => {
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
self.reader.send(ReaderCommand::ReadNextChunk {
|
||||
file,
|
||||
tx,
|
||||
});
|
||||
|
||||
// Try reading right away. It probably will return pending, but Receiver
|
||||
// needs to see the waker.
|
||||
self.read(cx, rx)
|
||||
},
|
||||
FileStreamState::Reading(rx) => self.read(cx, rx),
|
||||
FileStreamState::Invalid => Poll::Ready(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for FileStream {
|
||||
fn drop(&mut self) {
|
||||
use FileStreamState::{Idle, Invalid};
|
||||
if let Idle(file) = std::mem::replace(&mut self.state, Invalid) {
|
||||
// This will succeed unless reader has panicked. If that happened,
|
||||
// the logfiles will be loud anyway; no need to add additional
|
||||
// error messages.
|
||||
let _ = self.reader.0.send(ReaderCommand::CloseFile(file));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An open, `mmap()`ed file.
|
||||
///
|
||||
/// This is only actually used by the reader thread, but ownership is passed
|
||||
/// around between it and the [FileStream] to avoid maintaining extra data
|
||||
/// structures.
|
||||
struct OpenFile {
|
||||
composite_id: CompositeId,
|
||||
|
||||
/// The memory-mapped region backed by the file. Valid up to length `map_len`.
|
||||
map_ptr: *mut libc::c_void,
|
||||
|
||||
/// The position within the memory mapping. Invariant: `map_pos < map_len`.
|
||||
map_pos: usize,
|
||||
|
||||
/// The length of the memory mapping. This may be less than the length of
|
||||
/// the file.
|
||||
map_len: usize,
|
||||
}
|
||||
|
||||
// Rust makes us manually state these because of the `*mut` ptr above.
|
||||
unsafe impl Send for OpenFile {}
|
||||
unsafe impl Sync for OpenFile {}
|
||||
|
||||
impl Drop for OpenFile {
|
||||
fn drop(&mut self) {
|
||||
if let Err(e) = unsafe {
|
||||
nix::sys::mman::munmap(self.map_ptr as *mut std::ffi::c_void, self.map_len)
|
||||
} {
|
||||
// This should never happen.
|
||||
log::error!("unable to munmap {}, {:?} len {}: {}",
|
||||
self.composite_id, self.map_ptr, self.map_len, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum ReaderCommand {
|
||||
/// Opens a file and reads the first chunk.
|
||||
OpenFile {
|
||||
composite_id: CompositeId,
|
||||
range: std::ops::Range<u64>,
|
||||
tx: tokio::sync::oneshot::Sender<Result<(Option<OpenFile>, Vec<u8>), Error>>,
|
||||
},
|
||||
|
||||
/// Reads the next chunk of the file.
|
||||
ReadNextChunk {
|
||||
file: OpenFile,
|
||||
tx: tokio::sync::oneshot::Sender<Result<(Option<OpenFile>, Vec<u8>), Error>>,
|
||||
},
|
||||
|
||||
/// Closes the file early, as when the [FileStream] is dropped before completing.
|
||||
CloseFile(OpenFile),
|
||||
}
|
||||
|
||||
struct ReaderInt {
|
||||
/// File descriptor of the sample file directory.
|
||||
dir: Arc<super::Fd>,
|
||||
|
||||
/// The page size as returned by `sysconf`; guaranteed to be a power of two.
|
||||
page_size: usize,
|
||||
}
|
||||
|
||||
impl ReaderInt {
|
||||
fn run(self, mut rx: tokio::sync::mpsc::UnboundedReceiver<ReaderCommand>) {
|
||||
while let Some(cmd) = rx.blocking_recv() {
|
||||
// OpenFile's Drop implementation takes care of closing the file on error paths and
|
||||
// the CloseFile operation.
|
||||
match cmd {
|
||||
ReaderCommand::OpenFile { composite_id, range, tx } => {
|
||||
if tx.is_closed() { // avoid spending effort on expired commands
|
||||
continue;
|
||||
}
|
||||
let _guard = TimerGuard::new(&RealClocks {}, || format!("open {}", composite_id));
|
||||
let _ = tx.send(self.open(composite_id, range));
|
||||
},
|
||||
ReaderCommand::ReadNextChunk { file, tx } => {
|
||||
if tx.is_closed() { // avoid spending effort on expired commands
|
||||
continue;
|
||||
}
|
||||
let composite_id = file.composite_id;
|
||||
let _guard = TimerGuard::new(
|
||||
&RealClocks {},
|
||||
|| format!("read from {}", composite_id),
|
||||
);
|
||||
let _ = tx.send(Ok(self.chunk(file)));
|
||||
},
|
||||
ReaderCommand::CloseFile(_) => {},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn open(
|
||||
&self,
|
||||
composite_id: CompositeId,
|
||||
range: Range<u64>
|
||||
) -> Result<(Option<OpenFile>, Vec<u8>), Error> {
|
||||
let p = super::CompositeIdPath::from(composite_id);
|
||||
|
||||
// Reader::open_file checks for an empty range, but check again right
|
||||
// before the unsafe block to make it easier to audit the safety constraints.
|
||||
assert!(range.start < range.end);
|
||||
|
||||
// mmap offsets must be aligned to page size boundaries.
|
||||
let unaligned = (range.start as usize) & (self.page_size - 1);
|
||||
let offset = libc::off_t::try_from(range.start).expect("range.start fits in off_t") -
|
||||
libc::off_t::try_from(unaligned).expect("usize fits in off_t");
|
||||
|
||||
// Recordings from very high bitrate streams could theoretically exceed exhaust a 32-bit
|
||||
// machine's address space, causing either this usize::MAX error or mmap
|
||||
// failure. If that happens in practice, we'll have to stop mmap()ing
|
||||
// the whole range.
|
||||
let map_len = usize::try_from(
|
||||
range.end - range.start + u64::try_from(unaligned).expect("usize fits in u64"))
|
||||
.map_err(|_| format_err_t!(
|
||||
OutOfRange, "file {}'s range {:?} len exceeds usize::MAX",
|
||||
composite_id, range))?;
|
||||
|
||||
let file = crate::fs::openat(self.dir.0, &p, OFlag::O_RDONLY, Mode::empty())
|
||||
.err_kind(ErrorKind::Unknown)?;
|
||||
|
||||
// Check the actual on-disk file length. It's an error (a bug or filesystem corruption)
|
||||
// for it to be less than the requested read. Check for this now rather than crashing
|
||||
// with a SIGBUS or reading bad data at the end of the last page later.
|
||||
let metadata = file.metadata().err_kind(ErrorKind::Unknown)?;
|
||||
if metadata.len() < u64::try_from(offset).unwrap() + u64::try_from(map_len).unwrap() {
|
||||
bail_t!(Internal, "file {}, range {:?}, len {}", composite_id, range, metadata.len());
|
||||
}
|
||||
let map_ptr = unsafe {
|
||||
nix::sys::mman::mmap(
|
||||
std::ptr::null_mut(),
|
||||
map_len,
|
||||
nix::sys::mman::ProtFlags::PROT_READ,
|
||||
nix::sys::mman::MapFlags::MAP_SHARED,
|
||||
file.as_raw_fd(),
|
||||
offset
|
||||
)
|
||||
}.map_err(|e| format_err_t!(Internal, "mmap failed for {} off={} len={}: {}",
|
||||
composite_id, offset, map_len, e))?;
|
||||
|
||||
if let Err(e) = unsafe {
|
||||
nix::sys::mman::madvise(
|
||||
map_ptr as *mut libc::c_void,
|
||||
map_len,
|
||||
nix::sys::mman::MmapAdvise::MADV_SEQUENTIAL,
|
||||
)
|
||||
} {
|
||||
// This shouldn't happen but is "just" a performance problem.
|
||||
log::warn!("madvise(MADV_SEQUENTIAL) failed for {} off={} len={}: {}",
|
||||
composite_id, offset, map_len, e);
|
||||
}
|
||||
|
||||
Ok(self.chunk(OpenFile {
|
||||
composite_id,
|
||||
map_ptr,
|
||||
map_pos: unaligned,
|
||||
map_len,
|
||||
}))
|
||||
}
|
||||
|
||||
fn chunk(&self, mut file: OpenFile) -> (Option<OpenFile>, Vec<u8>) {
|
||||
// Read a chunk that's large enough to minimize thread handoffs but
|
||||
// short enough to keep memory usage under control. It's hopefully
|
||||
// unnecessary to worry about disk seeks; the madvise call should cause
|
||||
// the kernel to read ahead.
|
||||
let end = std::cmp::min(file.map_len, file.map_pos.saturating_add(1<<16));
|
||||
let mut chunk = Vec::new();
|
||||
let len = end.checked_sub(file.map_pos).unwrap();
|
||||
chunk.reserve_exact(len);
|
||||
|
||||
// SAFETY: [map_pos, map_pos + len) is verified to be within map_ptr.
|
||||
//
|
||||
// If the read is out of bounds of the file, we'll get a SIGBUS.
|
||||
// That's not a safety violation. It also shouldn't happen because the
|
||||
// length was set properly at open time, Moonfire NVR is a closed
|
||||
// system (nothing else ever touches its files), and sample files are
|
||||
// never truncated (only appended to or unlinked).
|
||||
unsafe {
|
||||
std::ptr::copy_nonoverlapping(
|
||||
file.map_ptr.add(file.map_pos) as *const u8,
|
||||
chunk.as_mut_ptr(),
|
||||
len,
|
||||
);
|
||||
chunk.set_len(len);
|
||||
}
|
||||
let file = if end == file.map_len {
|
||||
None
|
||||
} else {
|
||||
file.map_pos = end;
|
||||
Some(file)
|
||||
};
|
||||
(file, chunk)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::TryStreamExt;
|
||||
|
||||
#[tokio::test]
|
||||
async fn basic() {
|
||||
crate::testutil::init();
|
||||
let tmpdir = tempfile::Builder::new().prefix("moonfire-db-test-reader").tempdir().unwrap();
|
||||
let fd = std::sync::Arc::new(super::super::Fd::open(tmpdir.path(), false).unwrap());
|
||||
let reader = super::Reader::spawn("/path/goes/here", fd);
|
||||
std::fs::write(tmpdir.path().join("0123456789abcdef"), b"blah blah").unwrap();
|
||||
let f = reader.open_file(crate::CompositeId(0x01234567_89abcdef), 1..8);
|
||||
assert_eq!(f.try_concat().await.unwrap(), b"lah bla");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user