diff --git a/server/base/clock.rs b/server/base/clock.rs index 57219e2..00cf519 100644 --- a/server/base/clock.rs +++ b/server/base/clock.rs @@ -7,8 +7,8 @@ //! Note these types are in a more standard nanosecond-based format, where //! [`crate::time`] uses Moonfire's 90 kHz time base. +use crate::Mutex; use nix::sys::time::{TimeSpec, TimeValLike as _}; -use std::sync::Mutex; use std::sync::{mpsc, Arc}; use std::thread; pub use std::time::Duration; @@ -220,15 +220,15 @@ impl SimulatedClocks { impl Clocks for SimulatedClocks { fn realtime(&self) -> SystemTime { - self.0.boot + *self.0.uptime.lock().unwrap() + self.0.boot + *self.0.uptime.lock() } fn monotonic(&self) -> Instant { - Instant(TimeSpec::from(*self.0.uptime.lock().unwrap())) + Instant(TimeSpec::from(*self.0.uptime.lock())) } /// Advances the clock by the specified amount without actually sleeping. fn sleep(&self, how_long: Duration) { - let mut l = self.0.uptime.lock().unwrap(); + let mut l = self.0.uptime.lock(); *l += how_long; } diff --git a/server/base/lib.rs b/server/base/lib.rs index 457d6fa..bf4198d 100644 --- a/server/base/lib.rs +++ b/server/base/lib.rs @@ -14,3 +14,64 @@ pub use crate::error::{Error, ErrorBuilder, ErrorKind, ResultExt}; pub use ahash::RandomState; pub type FastHashMap = std::collections::HashMap; pub type FastHashSet = std::collections::HashSet; + +const NOT_POISONED: &str = + "not poisoned; this is a consequence of an earlier panic while holding this mutex; see logs."; + +/// [`std::sync::Mutex`] wrapper which always panics on encountering poison. +#[derive(Default)] +pub struct Mutex(std::sync::Mutex); + +impl Mutex { + #[inline] + pub const fn new(value: T) -> Self { + Mutex(std::sync::Mutex::new(value)) + } + + #[track_caller] + #[inline] + pub fn lock(&self) -> std::sync::MutexGuard { + self.0.lock().expect(NOT_POISONED) + } + + #[track_caller] + #[inline] + pub fn into_inner(self) -> T { + self.0.into_inner().expect(NOT_POISONED) + } +} + +/// [`std::sync::Condvar`] wrapper which always panics on encountering poison. +#[derive(Default)] +pub struct Condvar(std::sync::Condvar); + +impl Condvar { + #[inline] + pub const fn new() -> Self { + Self(std::sync::Condvar::new()) + } + + #[track_caller] + #[inline] + pub fn wait_timeout_while<'a, T, F>( + &self, + guard: std::sync::MutexGuard<'a, T>, + dur: std::time::Duration, + condition: F, + ) -> (std::sync::MutexGuard<'a, T>, std::sync::WaitTimeoutResult) + where + F: FnMut(&mut T) -> bool, + { + self.0 + .wait_timeout_while(guard, dur, condition) + .expect(NOT_POISONED) + } +} + +impl std::ops::Deref for Condvar { + type Target = std::sync::Condvar; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/server/base/shutdown.rs b/server/base/shutdown.rs index 78011c1..c1b0aee 100644 --- a/server/base/shutdown.rs +++ b/server/base/shutdown.rs @@ -15,9 +15,10 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, Waker}; +use crate::Condvar; +use crate::Mutex; use futures::Future; use slab::Slab; -use std::sync::{Condvar, Mutex}; #[derive(Debug)] pub struct ShutdownError; @@ -47,7 +48,6 @@ impl Drop for Sender { .0 .wakers .lock() - .unwrap() .take() .expect("only the single Sender takes the slab"); for w in wakers.drain() { @@ -78,7 +78,7 @@ const NO_WAKER: usize = usize::MAX; impl Receiver { pub fn check(&self) -> Result<(), ShutdownError> { - if self.0.wakers.lock().unwrap().is_none() { + if self.0.wakers.lock().is_none() { Err(ShutdownError) } else { Ok(()) @@ -107,12 +107,11 @@ impl Receiver { } pub fn wait_for(&self, timeout: std::time::Duration) -> Result<(), ShutdownError> { - let l = self.0.wakers.lock().unwrap(); + let l = self.0.wakers.lock(); let result = self .0 .condvar - .wait_timeout_while(l, timeout, |wakers| wakers.is_some()) - .unwrap(); + .wait_timeout_while(l, timeout, |wakers| wakers.is_some()); if result.1.timed_out() { Ok(()) } else { @@ -122,7 +121,7 @@ impl Receiver { } fn poll_impl(inner: &Inner, waker_i: &mut usize, cx: &mut Context<'_>) -> Poll<()> { - let mut l = inner.wakers.lock().unwrap(); + let mut l = inner.wakers.lock(); let wakers = match &mut *l { None => return Poll::Ready(()), Some(w) => w, @@ -152,7 +151,7 @@ impl Drop for ReceiverRefFuture<'_> { if self.waker_i == NO_WAKER { return; } - let mut l = self.receiver.0.wakers.lock().unwrap(); + let mut l = self.receiver.0.wakers.lock(); if let Some(wakers) = &mut *l { wakers.remove(self.waker_i); } @@ -173,7 +172,7 @@ impl Drop for ReceiverFuture { if self.waker_i == NO_WAKER { return; } - let mut l = self.receiver.wakers.lock().unwrap(); + let mut l = self.receiver.wakers.lock(); if let Some(wakers) = &mut *l { wakers.remove(self.waker_i); } diff --git a/server/db/db.rs b/server/db/db.rs index 011236c..849323d 100644 --- a/server/db/db.rs +++ b/server/db/db.rs @@ -36,6 +36,7 @@ use crate::schema; use crate::signal; use base::clock::{self, Clocks}; use base::strutil::encode_size; +use base::Mutex; use base::{bail, err, Error}; use base::{FastHashMap, FastHashSet}; use hashlink::LinkedHashMap; @@ -52,7 +53,7 @@ use std::path::PathBuf; use std::str; use std::string::String; use std::sync::Arc; -use std::sync::{Mutex, MutexGuard}; +use std::sync::MutexGuard; use std::vec::Vec; use tracing::warn; use tracing::{error, info, trace}; @@ -562,7 +563,7 @@ impl Stream { pub fn days(&self) -> days::Map { let mut days = self.committed_days.clone(); for u in &self.uncommitted { - let l = u.lock().unwrap(); + let l = u.lock(); days.adjust( l.start..l.start + recording::Duration(i64::from(l.wall_duration_90k)), 1, @@ -896,7 +897,7 @@ impl LockedDatabase { ); match stream.uncommitted.back() { Some(s) => { - let l = s.lock().unwrap(); + let l = s.lock(); r.prev_media_duration = l.prev_media_duration + recording::Duration(l.media_duration_90k.into()); r.prev_runs = l.prev_runs + if l.run_offset == 0 { 1 } else { 0 }; @@ -936,7 +937,7 @@ impl LockedDatabase { msg("can't sync un-added recording {id}") ); } - let l = stream.uncommitted[stream.synced_recordings].lock().unwrap(); + let l = stream.uncommitted[stream.synced_recordings].lock(); let bytes = i64::from(l.sample_file_bytes); stream.bytes_to_add += bytes; stream.fs_bytes_to_add += round_up(bytes); @@ -1014,7 +1015,7 @@ impl LockedDatabase { let mut new_duration = 0; let mut new_runs = 0; for i in 0..s.synced_recordings { - let l = s.uncommitted[i].lock().unwrap(); + let l = s.uncommitted[i].lock(); raw::insert_recording( &tx, o, @@ -1141,7 +1142,7 @@ impl LockedDatabase { let u = s.uncommitted.pop_front().unwrap(); log.added .push(CompositeId::new(stream_id, s.cum_recordings)); - let l = u.lock().unwrap(); + let l = u.lock(); s.cum_recordings += 1; let wall_dur = recording::Duration(l.wall_duration_90k.into()); let media_dur = recording::Duration(l.media_duration_90k.into()); @@ -1310,7 +1311,7 @@ impl LockedDatabase { raw::list_recordings_by_time(&self.conn, stream_id, desired_time.clone(), f)?; for (i, u) in s.uncommitted.iter().enumerate() { let row = { - let l = u.lock().unwrap(); + let l = u.lock(); if l.video_samples > 0 { let end = l.start + recording::Duration(l.wall_duration_90k as i64); if l.start > desired_time.end || end < desired_time.start { @@ -1351,7 +1352,7 @@ impl LockedDatabase { ); for i in start..end { let row = { - let l = s.uncommitted[i].lock().unwrap(); + let l = s.uncommitted[i].lock(); if l.video_samples > 0 { l.to_list_row( CompositeId::new(stream_id, s.cum_recordings + i as i32), @@ -1489,7 +1490,7 @@ impl LockedDatabase { ), ); } - let l = s.uncommitted[i as usize].lock().unwrap(); + let l = s.uncommitted[i as usize].lock(); return f(&RecordingPlayback { video_index: &l.video_index, }); @@ -2319,7 +2320,7 @@ impl Drop for Database { return; // don't flush while panicking. } if let Some(m) = self.db.take() { - if let Err(e) = m.into_inner().unwrap().flush(&self.clocks, "drop") { + if let Err(e) = m.into_inner().flush(&self.clocks, "drop") { error!(err = %e.chain(), "final database flush failed"); } } @@ -2418,7 +2419,7 @@ impl Database { /// operations. pub fn lock(&self) -> DatabaseGuard { let timer = clock::TimerGuard::new(&self.clocks, acquisition); - let db = self.db.as_ref().unwrap().lock().unwrap(); + let db = self.db.as_ref().unwrap().lock(); drop(timer); let _timer = clock::TimerGuard:: &'static str>::new( &self.clocks, @@ -2435,7 +2436,7 @@ impl Database { /// This allows verification that a newly opened database is in an acceptable state. #[cfg(test)] fn close(mut self) -> rusqlite::Connection { - self.db.take().unwrap().into_inner().unwrap().conn + self.db.take().unwrap().into_inner().conn } } diff --git a/server/db/writer.rs b/server/db/writer.rs index cf70509..b2ee6d3 100644 --- a/server/db/writer.rs +++ b/server/db/writer.rs @@ -10,13 +10,13 @@ use crate::recording::{self, MAX_RECORDING_WALL_DURATION}; use base::clock::{self, Clocks}; use base::shutdown::ShutdownError; use base::FastHashMap; +use base::Mutex; use base::{bail, err, Error}; use std::cmp::{self, Ordering}; use std::convert::TryFrom; use std::io; use std::mem; use std::path::PathBuf; -use std::sync::Mutex; use std::sync::{mpsc, Arc}; use std::thread; use tracing::{debug, trace, warn}; @@ -880,7 +880,7 @@ impl InnerWriter { db: &db::Database, stream_id: i32, ) -> Result<(), Error> { - let mut l = self.r.lock().unwrap(); + let mut l = self.r.lock(); // design/time.md explains these time manipulations in detail. let prev_media_duration_90k = l.media_duration_90k; @@ -969,7 +969,7 @@ impl InnerWriter { // This always ends a live segment. let wall_duration; { - let mut l = self.r.lock().unwrap(); + let mut l = self.r.lock(); l.flags = flags; l.local_time_delta = self.local_start - l.start; l.sample_file_blake3 = Some(*blake3.as_bytes()); @@ -1012,11 +1012,11 @@ mod tests { use crate::recording; use crate::testutil; use base::clock::{Clocks, SimulatedClocks}; + use base::Mutex; use std::collections::VecDeque; use std::io; use std::sync::mpsc; use std::sync::Arc; - use std::sync::Mutex; use tracing::{trace, warn}; #[derive(Clone)] @@ -1039,10 +1039,10 @@ mod tests { MockDir(Arc::new(Mutex::new(VecDeque::new()))) } fn expect(&self, action: MockDirAction) { - self.0.lock().unwrap().push_back(action); + self.0.lock().push_back(action); } fn ensure_done(&self) { - assert_eq!(self.0.lock().unwrap().len(), 0); + assert_eq!(self.0.lock().len(), 0); } } @@ -1053,7 +1053,6 @@ mod tests { match self .0 .lock() - .unwrap() .pop_front() .expect("got create_file with no expectation") { @@ -1068,7 +1067,6 @@ mod tests { match self .0 .lock() - .unwrap() .pop_front() .expect("got sync with no expectation") { @@ -1080,7 +1078,6 @@ mod tests { match self .0 .lock() - .unwrap() .pop_front() .expect("got unlink_file with no expectation") { @@ -1096,7 +1093,7 @@ mod tests { impl Drop for MockDir { fn drop(&mut self) { if !::std::thread::panicking() { - assert_eq!(self.0.lock().unwrap().len(), 0); + assert_eq!(self.0.lock().len(), 0); } } } @@ -1116,10 +1113,10 @@ mod tests { MockFile(Arc::new(Mutex::new(VecDeque::new()))) } fn expect(&self, action: MockFileAction) { - self.0.lock().unwrap().push_back(action); + self.0.lock().push_back(action); } fn ensure_done(&self) { - assert_eq!(self.0.lock().unwrap().len(), 0); + assert_eq!(self.0.lock().len(), 0); } } @@ -1128,7 +1125,6 @@ mod tests { match self .0 .lock() - .unwrap() .pop_front() .expect("got sync_all with no expectation") { @@ -1140,7 +1136,6 @@ mod tests { match self .0 .lock() - .unwrap() .pop_front() .expect("got write with no expectation") { diff --git a/server/src/cmds/config/dirs.rs b/server/src/cmds/config/dirs.rs index f0801b1..0ccf741 100644 --- a/server/src/cmds/config/dirs.rs +++ b/server/src/cmds/config/dirs.rs @@ -4,6 +4,7 @@ use base::strutil::{decode_size, encode_size}; use base::Error; +use base::Mutex; use cursive::traits::{Nameable, Resizable}; use cursive::view::Scrollable; use cursive::Cursive; @@ -11,7 +12,7 @@ use cursive::{views, With}; use db::writer; use std::collections::BTreeMap; use std::path::Path; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use tracing::{debug, trace}; use super::tab_complete::TabCompleteEditView; @@ -119,7 +120,7 @@ fn confirm_deletion(model: &Mutex, siv: &mut Cursive, to_delete: i64) { } fn actually_delete(model: &Mutex, siv: &mut Cursive) { - let model = model.lock().unwrap(); + let model = model.lock(); let new_limits: Vec<_> = model .streams .iter() @@ -147,7 +148,7 @@ fn actually_delete(model: &Mutex, siv: &mut Cursive) { fn press_change(model: &Arc>, siv: &mut Cursive) { let to_delete = { - let l = model.lock().unwrap(); + let l = model.lock(); if l.errors > 0 { return; } @@ -185,7 +186,7 @@ fn press_change(model: &Arc>, siv: &mut Cursive) { siv.add_layer(dialog); } else { siv.pop_layer(); - update_limits(&model.lock().unwrap(), siv); + update_limits(&model.lock(), siv); } } @@ -384,13 +385,13 @@ fn edit_dir_dialog(db: &Arc, siv: &mut Cursive, dir_id: i32) { .child(views::TextView::new("usage").fixed_width(BYTES_WIDTH)) .child(views::TextView::new("limit").fixed_width(BYTES_WIDTH)), ); - let l = model.lock().unwrap(); + let l = model.lock(); for (&id, stream) in &l.streams { let mut record_cb = views::Checkbox::new(); record_cb.set_checked(stream.record); record_cb.set_on_change({ let model = model.clone(); - move |_siv, record| edit_record(&mut model.lock().unwrap(), id, record) + move |_siv, record| edit_record(&mut model.lock(), id, record) }); list.add_child( &stream.label, @@ -403,7 +404,7 @@ fn edit_dir_dialog(db: &Arc, siv: &mut Cursive, dir_id: i32) { .on_edit({ let model = model.clone(); move |siv, content, _pos| { - edit_limit(&mut model.lock().unwrap(), siv, id, content) + edit_limit(&mut model.lock(), siv, id, content) } }) .on_submit({ diff --git a/server/src/cmds/config/tab_complete.rs b/server/src/cmds/config/tab_complete.rs index 42a7ef1..48a74f1 100644 --- a/server/src/cmds/config/tab_complete.rs +++ b/server/src/cmds/config/tab_complete.rs @@ -2,7 +2,8 @@ // Copyright (C) 2020 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt. // SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception. -use std::sync::{Arc, Mutex}; +use base::Mutex; +use std::sync::Arc; use cursive::{ direction::Direction, @@ -37,25 +38,25 @@ impl TabCompleteEditView { } pub fn get_content(&self) -> Arc { - self.edit_view.lock().unwrap().get_content() + self.edit_view.lock().get_content() } } impl View for TabCompleteEditView { fn draw(&self, printer: &Printer) { - self.edit_view.lock().unwrap().draw(printer) + self.edit_view.lock().draw(printer) } fn layout(&mut self, size: Vec2) { - self.edit_view.lock().unwrap().layout(size) + self.edit_view.lock().layout(size) } fn take_focus(&mut self, source: Direction) -> Result { - self.edit_view.lock().unwrap().take_focus(source) + self.edit_view.lock().take_focus(source) } fn on_event(&mut self, event: Event) -> EventResult { - if !self.edit_view.lock().unwrap().is_enabled() { + if !self.edit_view.lock().is_enabled() { return EventResult::Ignored; } @@ -66,12 +67,12 @@ impl View for TabCompleteEditView { EventResult::consumed() } } else { - self.edit_view.lock().unwrap().on_event(event) + self.edit_view.lock().on_event(event) } } fn important_area(&self, view_size: Vec2) -> Rect { - self.edit_view.lock().unwrap().important_area(view_size) + self.edit_view.lock().important_area(view_size) } } @@ -80,10 +81,10 @@ fn tab_complete( tab_completer: TabCompleteFn, autofill_one: bool, ) -> EventResult { - let completions = tab_completer(edit_view.lock().unwrap().get_content().as_str()); + let completions = tab_completer(edit_view.lock().get_content().as_str()); EventResult::with_cb_once(move |siv| match *completions { [] => {} - [ref completion] if autofill_one => edit_view.lock().unwrap().set_content(completion)(siv), + [ref completion] if autofill_one => edit_view.lock().set_content(completion)(siv), [..] => { siv.add_layer(TabCompletePopup { popup: views::MenuPopup::new(Arc::new({ @@ -91,7 +92,7 @@ fn tab_complete( for completion in completions { let edit_view = edit_view.clone(); tree.add_leaf(completion.clone(), move |siv| { - edit_view.lock().unwrap().set_content(&completion)(siv) + edit_view.lock().set_content(&completion)(siv) }) } }) @@ -114,7 +115,7 @@ impl TabCompletePopup { let tab_completer = self.tab_completer.clone(); EventResult::with_cb_once(move |s| { s.pop_layer(); - edit_view.lock().unwrap().on_event(event).process(s); + edit_view.lock().on_event(event).process(s); tab_complete(edit_view, tab_completer, false).process(s); }) } diff --git a/server/src/streamer.rs b/server/src/streamer.rs index 4fdce52..1424f41 100644 --- a/server/src/streamer.rs +++ b/server/src/streamer.rs @@ -288,12 +288,12 @@ where mod tests { use crate::stream::{self, Stream}; use base::clock::{self, Clocks}; + use base::Mutex; use base::{bail, Error}; use db::{recording, testutil, CompositeId}; use std::cmp; use std::convert::TryFrom; use std::sync::Arc; - use std::sync::Mutex; use tracing::trace; struct ProxyingStream { @@ -388,7 +388,7 @@ mod tests { _options: stream::Options, ) -> Result, Error> { assert_eq!(&url, &self.expected_url); - let mut l = self.streams.lock().unwrap(); + let mut l = self.streams.lock(); match l.pop() { Some(stream) => { trace!("MockOpener returning next stream"); @@ -396,7 +396,7 @@ mod tests { } None => { trace!("MockOpener shutting down"); - self.shutdown_tx.lock().unwrap().take(); + self.shutdown_tx.lock().take(); bail!(Cancelled, msg("done")) } } @@ -478,7 +478,7 @@ mod tests { .unwrap(); } stream.run(); - assert!(opener.streams.lock().unwrap().is_empty()); + assert!(opener.streams.lock().is_empty()); db.syncer_channel.flush(); let db = db.db.lock();