cargo fmt

This commit is contained in:
Scott Lamb 2021-06-04 20:25:19 -07:00
parent 23d77693de
commit bb69d1488e
3 changed files with 111 additions and 68 deletions

View File

@ -314,10 +314,7 @@ impl SampleFileDir {
fn open_self(path: &str, create: bool) -> Result<Arc<SampleFileDir>, Error> { fn open_self(path: &str, create: bool) -> Result<Arc<SampleFileDir>, Error> {
let fd = Arc::new(Fd::open(path, create)?); let fd = Arc::new(Fd::open(path, create)?);
let reader = reader::Reader::spawn(path, fd.clone()); let reader = reader::Reader::spawn(path, fd.clone());
Ok(Arc::new(SampleFileDir { Ok(Arc::new(SampleFileDir { fd, reader }))
fd,
reader,
}))
} }
/// Opens the given sample file for reading. /// Opens the given sample file for reading.

View File

@ -21,13 +21,18 @@
//! (memcpy last chunk, munmap). //! (memcpy last chunk, munmap).
use std::convert::TryFrom; use std::convert::TryFrom;
use std::os::unix::prelude::AsRawFd;
use std::{ops::Range, pin::Pin, sync::Arc, task::{Context, Poll}};
use std::future::Future; use std::future::Future;
use std::os::unix::prelude::AsRawFd;
use std::{
ops::Range,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use base::bail_t; use base::bail_t;
use base::clock::{RealClocks, TimerGuard}; use base::clock::{RealClocks, TimerGuard};
use base::{Error, ErrorKind, ResultExt, format_err_t}; use base::{format_err_t, Error, ErrorKind, ResultExt};
use nix::{fcntl::OFlag, sys::stat::Mode}; use nix::{fcntl::OFlag, sys::stat::Mode};
use crate::CompositeId; use crate::CompositeId;
@ -36,21 +41,17 @@ use crate::CompositeId;
/// ///
/// The reader will shut down after the last handle is closed. /// The reader will shut down after the last handle is closed.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(super) struct Reader( pub(super) struct Reader(tokio::sync::mpsc::UnboundedSender<ReaderCommand>);
// This is an unbounded channel because FileStream::drop can't block.
// There is backpressure elsewhere: typically the caller will await the
// open/read reply before sending another request.
tokio::sync::mpsc::UnboundedSender<ReaderCommand>
);
impl Reader { impl Reader {
pub(super) fn spawn(path: &str, dir: Arc<super::Fd>) -> Self { pub(super) fn spawn(path: &str, dir: Arc<super::Fd>) -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let page_size = usize::try_from( let page_size = usize::try_from(
nix::unistd::sysconf(nix::unistd::SysconfVar::PAGE_SIZE) nix::unistd::sysconf(nix::unistd::SysconfVar::PAGE_SIZE)
.expect("PAGE_SIZE fetch must succeed") .expect("PAGE_SIZE fetch must succeed")
.expect("PAGE_SIZE must be defined") .expect("PAGE_SIZE must be defined"),
).expect("PAGE_SIZE fits in usize"); )
.expect("PAGE_SIZE fits in usize");
assert_eq!(page_size.count_ones(), 1, "invalid page size {}", page_size); assert_eq!(page_size.count_ones(), 1, "invalid page size {}", page_size);
std::thread::Builder::new() std::thread::Builder::new()
.name(format!("r-{}", path)) .name(format!("r-{}", path))
@ -64,7 +65,7 @@ impl Reader {
return FileStream { return FileStream {
state: FileStreamState::Invalid, state: FileStreamState::Invalid,
reader: Reader(self.0.clone()), reader: Reader(self.0.clone()),
} };
} }
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tokio::sync::oneshot::channel();
self.send(ReaderCommand::OpenFile { self.send(ReaderCommand::OpenFile {
@ -79,7 +80,10 @@ impl Reader {
} }
fn send(&self, cmd: ReaderCommand) { fn send(&self, cmd: ReaderCommand) {
self.0.send(cmd).map_err(|_| ()).expect("reader thread panicked; see logs."); self.0
.send(cmd)
.map_err(|_| ())
.expect("reader thread panicked; see logs.");
} }
} }
@ -106,24 +110,27 @@ impl FileStream {
match Pin::new(&mut rx).poll(cx) { match Pin::new(&mut rx).poll(cx) {
Poll::Ready(Err(_)) => { Poll::Ready(Err(_)) => {
self.state = FileStreamState::Invalid; self.state = FileStreamState::Invalid;
Poll::Ready(Some(Err(format_err_t!(Internal, "reader thread panicked; see logs")))) Poll::Ready(Some(Err(format_err_t!(
}, Internal,
"reader thread panicked; see logs"
))))
}
Poll::Ready(Ok(Err(e))) => { Poll::Ready(Ok(Err(e))) => {
self.state = FileStreamState::Invalid; self.state = FileStreamState::Invalid;
Poll::Ready(Some(Err(e))) Poll::Ready(Some(Err(e)))
}, }
Poll::Ready(Ok(Ok((Some(file), chunk)))) => { Poll::Ready(Ok(Ok((Some(file), chunk)))) => {
self.state = FileStreamState::Idle(file); self.state = FileStreamState::Idle(file);
Poll::Ready(Some(Ok(chunk))) Poll::Ready(Some(Ok(chunk)))
}, }
Poll::Ready(Ok(Ok((None, chunk)))) => { Poll::Ready(Ok(Ok((None, chunk)))) => {
self.state = FileStreamState::Invalid; self.state = FileStreamState::Invalid;
Poll::Ready(Some(Ok(chunk))) Poll::Ready(Some(Ok(chunk)))
}, }
Poll::Pending => { Poll::Pending => {
self.state = FileStreamState::Reading(rx); self.state = FileStreamState::Reading(rx);
Poll::Pending Poll::Pending
}, }
} }
} }
} }
@ -135,15 +142,12 @@ impl futures::stream::Stream for FileStream {
match std::mem::replace(&mut self.state, FileStreamState::Invalid) { match std::mem::replace(&mut self.state, FileStreamState::Invalid) {
FileStreamState::Idle(file) => { FileStreamState::Idle(file) => {
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tokio::sync::oneshot::channel();
self.reader.send(ReaderCommand::ReadNextChunk { self.reader.send(ReaderCommand::ReadNextChunk { file, tx });
file,
tx,
});
// Try reading right away. It probably will return pending, but Receiver // Try reading right away. It probably will return pending, but Receiver
// needs to see the waker. // needs to see the waker.
self.read(cx, rx) self.read(cx, rx)
}, }
FileStreamState::Reading(rx) => self.read(cx, rx), FileStreamState::Reading(rx) => self.read(cx, rx),
FileStreamState::Invalid => Poll::Ready(None), FileStreamState::Invalid => Poll::Ready(None),
} }
@ -187,12 +191,17 @@ unsafe impl Sync for OpenFile {}
impl Drop for OpenFile { impl Drop for OpenFile {
fn drop(&mut self) { fn drop(&mut self) {
if let Err(e) = unsafe { if let Err(e) =
nix::sys::mman::munmap(self.map_ptr as *mut std::ffi::c_void, self.map_len) unsafe { nix::sys::mman::munmap(self.map_ptr as *mut std::ffi::c_void, self.map_len) }
} { {
// This should never happen. // This should never happen.
log::error!("unable to munmap {}, {:?} len {}: {}", log::error!(
self.composite_id, self.map_ptr, self.map_len, e); "unable to munmap {}, {:?} len {}: {}",
self.composite_id,
self.map_ptr,
self.map_len,
e
);
} }
} }
} }
@ -229,25 +238,30 @@ impl ReaderInt {
// OpenFile's Drop implementation takes care of closing the file on error paths and // OpenFile's Drop implementation takes care of closing the file on error paths and
// the CloseFile operation. // the CloseFile operation.
match cmd { match cmd {
ReaderCommand::OpenFile { composite_id, range, tx } => { ReaderCommand::OpenFile {
if tx.is_closed() { // avoid spending effort on expired commands composite_id,
range,
tx,
} => {
if tx.is_closed() {
// avoid spending effort on expired commands
continue; continue;
} }
let _guard = TimerGuard::new(&RealClocks {}, || format!("open {}", composite_id)); let _guard =
TimerGuard::new(&RealClocks {}, || format!("open {}", composite_id));
let _ = tx.send(self.open(composite_id, range)); let _ = tx.send(self.open(composite_id, range));
}, }
ReaderCommand::ReadNextChunk { file, tx } => { ReaderCommand::ReadNextChunk { file, tx } => {
if tx.is_closed() { // avoid spending effort on expired commands if tx.is_closed() {
// avoid spending effort on expired commands
continue; continue;
} }
let composite_id = file.composite_id; let composite_id = file.composite_id;
let _guard = TimerGuard::new( let _guard =
&RealClocks {}, TimerGuard::new(&RealClocks {}, || format!("read from {}", composite_id));
|| format!("read from {}", composite_id),
);
let _ = tx.send(Ok(self.chunk(file))); let _ = tx.send(Ok(self.chunk(file)));
}, }
ReaderCommand::CloseFile(_) => {}, ReaderCommand::CloseFile(_) => {}
} }
} }
} }
@ -255,7 +269,7 @@ impl ReaderInt {
fn open( fn open(
&self, &self,
composite_id: CompositeId, composite_id: CompositeId,
range: Range<u64> range: Range<u64>,
) -> Result<(Option<OpenFile>, Vec<u8>), Error> { ) -> Result<(Option<OpenFile>, Vec<u8>), Error> {
let p = super::CompositeIdPath::from(composite_id); let p = super::CompositeIdPath::from(composite_id);
@ -265,18 +279,24 @@ impl ReaderInt {
// mmap offsets must be aligned to page size boundaries. // mmap offsets must be aligned to page size boundaries.
let unaligned = (range.start as usize) & (self.page_size - 1); let unaligned = (range.start as usize) & (self.page_size - 1);
let offset = libc::off_t::try_from(range.start).expect("range.start fits in off_t") - let offset = libc::off_t::try_from(range.start).expect("range.start fits in off_t")
libc::off_t::try_from(unaligned).expect("usize fits in off_t"); - libc::off_t::try_from(unaligned).expect("usize fits in off_t");
// Recordings from very high bitrate streams could theoretically exceed exhaust a 32-bit // Recordings from very high bitrate streams could theoretically exceed exhaust a 32-bit
// machine's address space, causing either this usize::MAX error or mmap // machine's address space, causing either this usize::MAX error or mmap
// failure. If that happens in practice, we'll have to stop mmap()ing // failure. If that happens in practice, we'll have to stop mmap()ing
// the whole range. // the whole range.
let map_len = usize::try_from( let map_len = usize::try_from(
range.end - range.start + u64::try_from(unaligned).expect("usize fits in u64")) range.end - range.start + u64::try_from(unaligned).expect("usize fits in u64"),
.map_err(|_| format_err_t!( )
OutOfRange, "file {}'s range {:?} len exceeds usize::MAX", .map_err(|_| {
composite_id, range))?; format_err_t!(
OutOfRange,
"file {}'s range {:?} len exceeds usize::MAX",
composite_id,
range
)
})?;
let file = crate::fs::openat(self.dir.0, &p, OFlag::O_RDONLY, Mode::empty()) let file = crate::fs::openat(self.dir.0, &p, OFlag::O_RDONLY, Mode::empty())
.err_kind(ErrorKind::Unknown)?; .err_kind(ErrorKind::Unknown)?;
@ -286,7 +306,13 @@ impl ReaderInt {
// with a SIGBUS or reading bad data at the end of the last page later. // with a SIGBUS or reading bad data at the end of the last page later.
let metadata = file.metadata().err_kind(ErrorKind::Unknown)?; let metadata = file.metadata().err_kind(ErrorKind::Unknown)?;
if metadata.len() < u64::try_from(offset).unwrap() + u64::try_from(map_len).unwrap() { if metadata.len() < u64::try_from(offset).unwrap() + u64::try_from(map_len).unwrap() {
bail_t!(Internal, "file {}, range {:?}, len {}", composite_id, range, metadata.len()); bail_t!(
Internal,
"file {}, range {:?}, len {}",
composite_id,
range,
metadata.len()
);
} }
let map_ptr = unsafe { let map_ptr = unsafe {
nix::sys::mman::mmap( nix::sys::mman::mmap(
@ -295,10 +321,19 @@ impl ReaderInt {
nix::sys::mman::ProtFlags::PROT_READ, nix::sys::mman::ProtFlags::PROT_READ,
nix::sys::mman::MapFlags::MAP_SHARED, nix::sys::mman::MapFlags::MAP_SHARED,
file.as_raw_fd(), file.as_raw_fd(),
offset offset,
) )
}.map_err(|e| format_err_t!(Internal, "mmap failed for {} off={} len={}: {}", }
composite_id, offset, map_len, e))?; .map_err(|e| {
format_err_t!(
Internal,
"mmap failed for {} off={} len={}: {}",
composite_id,
offset,
map_len,
e
)
})?;
if let Err(e) = unsafe { if let Err(e) = unsafe {
nix::sys::mman::madvise( nix::sys::mman::madvise(
@ -308,8 +343,13 @@ impl ReaderInt {
) )
} { } {
// This shouldn't happen but is "just" a performance problem. // This shouldn't happen but is "just" a performance problem.
log::warn!("madvise(MADV_SEQUENTIAL) failed for {} off={} len={}: {}", log::warn!(
composite_id, offset, map_len, e); "madvise(MADV_SEQUENTIAL) failed for {} off={} len={}: {}",
composite_id,
offset,
map_len,
e
);
} }
Ok(self.chunk(OpenFile { Ok(self.chunk(OpenFile {
@ -325,7 +365,7 @@ impl ReaderInt {
// short enough to keep memory usage under control. It's hopefully // short enough to keep memory usage under control. It's hopefully
// unnecessary to worry about disk seeks; the madvise call should cause // unnecessary to worry about disk seeks; the madvise call should cause
// the kernel to read ahead. // the kernel to read ahead.
let end = std::cmp::min(file.map_len, file.map_pos.saturating_add(1<<16)); let end = std::cmp::min(file.map_len, file.map_pos.saturating_add(1 << 16));
let mut chunk = Vec::new(); let mut chunk = Vec::new();
let len = end.checked_sub(file.map_pos).unwrap(); let len = end.checked_sub(file.map_pos).unwrap();
chunk.reserve_exact(len); chunk.reserve_exact(len);
@ -362,7 +402,10 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn basic() { async fn basic() {
crate::testutil::init(); crate::testutil::init();
let tmpdir = tempfile::Builder::new().prefix("moonfire-db-test-reader").tempdir().unwrap(); let tmpdir = tempfile::Builder::new()
.prefix("moonfire-db-test-reader")
.tempdir()
.unwrap();
let fd = std::sync::Arc::new(super::super::Fd::open(tmpdir.path(), false).unwrap()); let fd = std::sync::Arc::new(super::super::Fd::open(tmpdir.path(), false).unwrap());
let reader = super::Reader::spawn("/path/goes/here", fd); let reader = super::Reader::spawn("/path/goes/here", fd);
std::fs::write(tmpdir.path().join("0123456789abcdef"), b"blah blah").unwrap(); std::fs::write(tmpdir.path().join("0123456789abcdef"), b"blah blah").unwrap();

View File

@ -1799,19 +1799,22 @@ impl FileInner {
/// ///
/// * If the backing file is truncated, the program will crash with `SIGBUS`. This shouldn't /// * If the backing file is truncated, the program will crash with `SIGBUS`. This shouldn't
/// happen because nothing should be touching Moonfire NVR's files but itself. /// happen because nothing should be touching Moonfire NVR's files but itself.
fn get_video_sample_data(&self, i: usize, r: Range<u64>) -> Box<dyn Stream<Item = Result<Chunk, BoxedError>> + Send + Sync> { fn get_video_sample_data(
&self,
i: usize,
r: Range<u64>,
) -> Box<dyn Stream<Item = Result<Chunk, BoxedError>> + Send + Sync> {
let s = &self.segments[i]; let s = &self.segments[i];
let sr = s.s.sample_file_range(); let sr = s.s.sample_file_range();
let f = match self.dirs_by_stream_id.get(&s.s.id.stream()) { let f = match self.dirs_by_stream_id.get(&s.s.id.stream()) {
None => return Box::new(stream::iter(std::iter::once(Err( None => {
wrap_error(format_err_t!(NotFound, "{}: stream not found", s.s.id)) return Box::new(stream::iter(std::iter::once(Err(wrap_error(
)))), format_err_t!(NotFound, "{}: stream not found", s.s.id),
)))))
}
Some(d) => d.open_file(s.s.id, (r.start + sr.start)..(r.end + sr.start - r.start)), Some(d) => d.open_file(s.s.id, (r.start + sr.start)..(r.end + sr.start - r.start)),
}; };
Box::new(f Box::new(f.map_ok(Chunk::from).map_err(wrap_error))
.map_ok(Chunk::from)
.map_err(wrap_error)
)
} }
fn get_subtitle_sample_data(&self, i: usize, r: Range<u64>, len: u64) -> Result<Chunk, Error> { fn get_subtitle_sample_data(&self, i: usize, r: Range<u64>, len: u64) -> Result<Chunk, Error> {