From bb69d1488efc0dbbd387a7013ecb8df6d8bfd22f Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Fri, 4 Jun 2021 20:25:19 -0700 Subject: [PATCH] cargo fmt --- server/db/dir/mod.rs | 5 +- server/db/dir/reader.rs | 155 +++++++++++++++++++++++++--------------- server/src/mp4.rs | 19 ++--- 3 files changed, 111 insertions(+), 68 deletions(-) diff --git a/server/db/dir/mod.rs b/server/db/dir/mod.rs index 1b3ade7..fb0c257 100644 --- a/server/db/dir/mod.rs +++ b/server/db/dir/mod.rs @@ -314,10 +314,7 @@ impl SampleFileDir { fn open_self(path: &str, create: bool) -> Result, Error> { let fd = Arc::new(Fd::open(path, create)?); let reader = reader::Reader::spawn(path, fd.clone()); - Ok(Arc::new(SampleFileDir { - fd, - reader, - })) + Ok(Arc::new(SampleFileDir { fd, reader })) } /// Opens the given sample file for reading. diff --git a/server/db/dir/reader.rs b/server/db/dir/reader.rs index e73a149..738b664 100644 --- a/server/db/dir/reader.rs +++ b/server/db/dir/reader.rs @@ -21,13 +21,18 @@ //! (memcpy last chunk, munmap). use std::convert::TryFrom; -use std::os::unix::prelude::AsRawFd; -use std::{ops::Range, pin::Pin, sync::Arc, task::{Context, Poll}}; use std::future::Future; +use std::os::unix::prelude::AsRawFd; +use std::{ + ops::Range, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; use base::bail_t; use base::clock::{RealClocks, TimerGuard}; -use base::{Error, ErrorKind, ResultExt, format_err_t}; +use base::{format_err_t, Error, ErrorKind, ResultExt}; use nix::{fcntl::OFlag, sys::stat::Mode}; use crate::CompositeId; @@ -36,21 +41,17 @@ use crate::CompositeId; /// /// The reader will shut down after the last handle is closed. #[derive(Clone, Debug)] -pub(super) struct Reader( - // This is an unbounded channel because FileStream::drop can't block. - // There is backpressure elsewhere: typically the caller will await the - // open/read reply before sending another request. - tokio::sync::mpsc::UnboundedSender -); +pub(super) struct Reader(tokio::sync::mpsc::UnboundedSender); impl Reader { pub(super) fn spawn(path: &str, dir: Arc) -> Self { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let page_size = usize::try_from( nix::unistd::sysconf(nix::unistd::SysconfVar::PAGE_SIZE) - .expect("PAGE_SIZE fetch must succeed") - .expect("PAGE_SIZE must be defined") - ).expect("PAGE_SIZE fits in usize"); + .expect("PAGE_SIZE fetch must succeed") + .expect("PAGE_SIZE must be defined"), + ) + .expect("PAGE_SIZE fits in usize"); assert_eq!(page_size.count_ones(), 1, "invalid page size {}", page_size); std::thread::Builder::new() .name(format!("r-{}", path)) @@ -64,7 +65,7 @@ impl Reader { return FileStream { state: FileStreamState::Invalid, reader: Reader(self.0.clone()), - } + }; } let (tx, rx) = tokio::sync::oneshot::channel(); self.send(ReaderCommand::OpenFile { @@ -79,7 +80,10 @@ impl Reader { } fn send(&self, cmd: ReaderCommand) { - self.0.send(cmd).map_err(|_| ()).expect("reader thread panicked; see logs."); + self.0 + .send(cmd) + .map_err(|_| ()) + .expect("reader thread panicked; see logs."); } } @@ -106,24 +110,27 @@ impl FileStream { match Pin::new(&mut rx).poll(cx) { Poll::Ready(Err(_)) => { self.state = FileStreamState::Invalid; - Poll::Ready(Some(Err(format_err_t!(Internal, "reader thread panicked; see logs")))) - }, + Poll::Ready(Some(Err(format_err_t!( + Internal, + "reader thread panicked; see logs" + )))) + } Poll::Ready(Ok(Err(e))) => { self.state = FileStreamState::Invalid; Poll::Ready(Some(Err(e))) - }, + } Poll::Ready(Ok(Ok((Some(file), chunk)))) => { self.state = FileStreamState::Idle(file); Poll::Ready(Some(Ok(chunk))) - }, + } Poll::Ready(Ok(Ok((None, chunk)))) => { self.state = FileStreamState::Invalid; Poll::Ready(Some(Ok(chunk))) - }, + } Poll::Pending => { self.state = FileStreamState::Reading(rx); Poll::Pending - }, + } } } } @@ -135,15 +142,12 @@ impl futures::stream::Stream for FileStream { match std::mem::replace(&mut self.state, FileStreamState::Invalid) { FileStreamState::Idle(file) => { let (tx, rx) = tokio::sync::oneshot::channel(); - self.reader.send(ReaderCommand::ReadNextChunk { - file, - tx, - }); + self.reader.send(ReaderCommand::ReadNextChunk { file, tx }); // Try reading right away. It probably will return pending, but Receiver // needs to see the waker. self.read(cx, rx) - }, + } FileStreamState::Reading(rx) => self.read(cx, rx), FileStreamState::Invalid => Poll::Ready(None), } @@ -187,12 +191,17 @@ unsafe impl Sync for OpenFile {} impl Drop for OpenFile { fn drop(&mut self) { - if let Err(e) = unsafe { - nix::sys::mman::munmap(self.map_ptr as *mut std::ffi::c_void, self.map_len) - } { + if let Err(e) = + unsafe { nix::sys::mman::munmap(self.map_ptr as *mut std::ffi::c_void, self.map_len) } + { // This should never happen. - log::error!("unable to munmap {}, {:?} len {}: {}", - self.composite_id, self.map_ptr, self.map_len, e); + log::error!( + "unable to munmap {}, {:?} len {}: {}", + self.composite_id, + self.map_ptr, + self.map_len, + e + ); } } } @@ -229,25 +238,30 @@ impl ReaderInt { // OpenFile's Drop implementation takes care of closing the file on error paths and // the CloseFile operation. match cmd { - ReaderCommand::OpenFile { composite_id, range, tx } => { - if tx.is_closed() { // avoid spending effort on expired commands + ReaderCommand::OpenFile { + composite_id, + range, + tx, + } => { + if tx.is_closed() { + // avoid spending effort on expired commands continue; } - let _guard = TimerGuard::new(&RealClocks {}, || format!("open {}", composite_id)); + let _guard = + TimerGuard::new(&RealClocks {}, || format!("open {}", composite_id)); let _ = tx.send(self.open(composite_id, range)); - }, + } ReaderCommand::ReadNextChunk { file, tx } => { - if tx.is_closed() { // avoid spending effort on expired commands + if tx.is_closed() { + // avoid spending effort on expired commands continue; } let composite_id = file.composite_id; - let _guard = TimerGuard::new( - &RealClocks {}, - || format!("read from {}", composite_id), - ); + let _guard = + TimerGuard::new(&RealClocks {}, || format!("read from {}", composite_id)); let _ = tx.send(Ok(self.chunk(file))); - }, - ReaderCommand::CloseFile(_) => {}, + } + ReaderCommand::CloseFile(_) => {} } } } @@ -255,7 +269,7 @@ impl ReaderInt { fn open( &self, composite_id: CompositeId, - range: Range + range: Range, ) -> Result<(Option, Vec), Error> { let p = super::CompositeIdPath::from(composite_id); @@ -265,18 +279,24 @@ impl ReaderInt { // mmap offsets must be aligned to page size boundaries. let unaligned = (range.start as usize) & (self.page_size - 1); - let offset = libc::off_t::try_from(range.start).expect("range.start fits in off_t") - - libc::off_t::try_from(unaligned).expect("usize fits in off_t"); + let offset = libc::off_t::try_from(range.start).expect("range.start fits in off_t") + - libc::off_t::try_from(unaligned).expect("usize fits in off_t"); // Recordings from very high bitrate streams could theoretically exceed exhaust a 32-bit // machine's address space, causing either this usize::MAX error or mmap // failure. If that happens in practice, we'll have to stop mmap()ing // the whole range. let map_len = usize::try_from( - range.end - range.start + u64::try_from(unaligned).expect("usize fits in u64")) - .map_err(|_| format_err_t!( - OutOfRange, "file {}'s range {:?} len exceeds usize::MAX", - composite_id, range))?; + range.end - range.start + u64::try_from(unaligned).expect("usize fits in u64"), + ) + .map_err(|_| { + format_err_t!( + OutOfRange, + "file {}'s range {:?} len exceeds usize::MAX", + composite_id, + range + ) + })?; let file = crate::fs::openat(self.dir.0, &p, OFlag::O_RDONLY, Mode::empty()) .err_kind(ErrorKind::Unknown)?; @@ -286,7 +306,13 @@ impl ReaderInt { // with a SIGBUS or reading bad data at the end of the last page later. let metadata = file.metadata().err_kind(ErrorKind::Unknown)?; if metadata.len() < u64::try_from(offset).unwrap() + u64::try_from(map_len).unwrap() { - bail_t!(Internal, "file {}, range {:?}, len {}", composite_id, range, metadata.len()); + bail_t!( + Internal, + "file {}, range {:?}, len {}", + composite_id, + range, + metadata.len() + ); } let map_ptr = unsafe { nix::sys::mman::mmap( @@ -295,10 +321,19 @@ impl ReaderInt { nix::sys::mman::ProtFlags::PROT_READ, nix::sys::mman::MapFlags::MAP_SHARED, file.as_raw_fd(), - offset + offset, ) - }.map_err(|e| format_err_t!(Internal, "mmap failed for {} off={} len={}: {}", - composite_id, offset, map_len, e))?; + } + .map_err(|e| { + format_err_t!( + Internal, + "mmap failed for {} off={} len={}: {}", + composite_id, + offset, + map_len, + e + ) + })?; if let Err(e) = unsafe { nix::sys::mman::madvise( @@ -308,8 +343,13 @@ impl ReaderInt { ) } { // This shouldn't happen but is "just" a performance problem. - log::warn!("madvise(MADV_SEQUENTIAL) failed for {} off={} len={}: {}", - composite_id, offset, map_len, e); + log::warn!( + "madvise(MADV_SEQUENTIAL) failed for {} off={} len={}: {}", + composite_id, + offset, + map_len, + e + ); } Ok(self.chunk(OpenFile { @@ -325,7 +365,7 @@ impl ReaderInt { // short enough to keep memory usage under control. It's hopefully // unnecessary to worry about disk seeks; the madvise call should cause // the kernel to read ahead. - let end = std::cmp::min(file.map_len, file.map_pos.saturating_add(1<<16)); + let end = std::cmp::min(file.map_len, file.map_pos.saturating_add(1 << 16)); let mut chunk = Vec::new(); let len = end.checked_sub(file.map_pos).unwrap(); chunk.reserve_exact(len); @@ -362,7 +402,10 @@ mod tests { #[tokio::test] async fn basic() { crate::testutil::init(); - let tmpdir = tempfile::Builder::new().prefix("moonfire-db-test-reader").tempdir().unwrap(); + let tmpdir = tempfile::Builder::new() + .prefix("moonfire-db-test-reader") + .tempdir() + .unwrap(); let fd = std::sync::Arc::new(super::super::Fd::open(tmpdir.path(), false).unwrap()); let reader = super::Reader::spawn("/path/goes/here", fd); std::fs::write(tmpdir.path().join("0123456789abcdef"), b"blah blah").unwrap(); diff --git a/server/src/mp4.rs b/server/src/mp4.rs index abdc2b4..80021d7 100644 --- a/server/src/mp4.rs +++ b/server/src/mp4.rs @@ -1799,19 +1799,22 @@ impl FileInner { /// /// * If the backing file is truncated, the program will crash with `SIGBUS`. This shouldn't /// happen because nothing should be touching Moonfire NVR's files but itself. - fn get_video_sample_data(&self, i: usize, r: Range) -> Box> + Send + Sync> { + fn get_video_sample_data( + &self, + i: usize, + r: Range, + ) -> Box> + Send + Sync> { let s = &self.segments[i]; let sr = s.s.sample_file_range(); let f = match self.dirs_by_stream_id.get(&s.s.id.stream()) { - None => return Box::new(stream::iter(std::iter::once(Err( - wrap_error(format_err_t!(NotFound, "{}: stream not found", s.s.id)) - )))), + None => { + return Box::new(stream::iter(std::iter::once(Err(wrap_error( + format_err_t!(NotFound, "{}: stream not found", s.s.id), + ))))) + } Some(d) => d.open_file(s.s.id, (r.start + sr.start)..(r.end + sr.start - r.start)), }; - Box::new(f - .map_ok(Chunk::from) - .map_err(wrap_error) - ) + Box::new(f.map_ok(Chunk::from).map_err(wrap_error)) } fn get_subtitle_sample_data(&self, i: usize, r: Range, len: u64) -> Result {