wrap Mutex and Condvar to handle poison

This centralizes a major source of `.unwrap()` throughout the code,
and one that would otherwise grow with upcoming changes. The new
error message should be more clear.
This commit is contained in:
Scott Lamb
2025-04-03 08:36:56 -07:00
parent 2903b680df
commit 0ccc6d0769
8 changed files with 120 additions and 62 deletions

View File

@@ -36,6 +36,7 @@ use crate::schema;
use crate::signal;
use base::clock::{self, Clocks};
use base::strutil::encode_size;
use base::Mutex;
use base::{bail, err, Error};
use base::{FastHashMap, FastHashSet};
use hashlink::LinkedHashMap;
@@ -52,7 +53,7 @@ use std::path::PathBuf;
use std::str;
use std::string::String;
use std::sync::Arc;
use std::sync::{Mutex, MutexGuard};
use std::sync::MutexGuard;
use std::vec::Vec;
use tracing::warn;
use tracing::{error, info, trace};
@@ -562,7 +563,7 @@ impl Stream {
pub fn days(&self) -> days::Map<days::StreamValue> {
let mut days = self.committed_days.clone();
for u in &self.uncommitted {
let l = u.lock().unwrap();
let l = u.lock();
days.adjust(
l.start..l.start + recording::Duration(i64::from(l.wall_duration_90k)),
1,
@@ -896,7 +897,7 @@ impl LockedDatabase {
);
match stream.uncommitted.back() {
Some(s) => {
let l = s.lock().unwrap();
let l = s.lock();
r.prev_media_duration =
l.prev_media_duration + recording::Duration(l.media_duration_90k.into());
r.prev_runs = l.prev_runs + if l.run_offset == 0 { 1 } else { 0 };
@@ -936,7 +937,7 @@ impl LockedDatabase {
msg("can't sync un-added recording {id}")
);
}
let l = stream.uncommitted[stream.synced_recordings].lock().unwrap();
let l = stream.uncommitted[stream.synced_recordings].lock();
let bytes = i64::from(l.sample_file_bytes);
stream.bytes_to_add += bytes;
stream.fs_bytes_to_add += round_up(bytes);
@@ -1014,7 +1015,7 @@ impl LockedDatabase {
let mut new_duration = 0;
let mut new_runs = 0;
for i in 0..s.synced_recordings {
let l = s.uncommitted[i].lock().unwrap();
let l = s.uncommitted[i].lock();
raw::insert_recording(
&tx,
o,
@@ -1141,7 +1142,7 @@ impl LockedDatabase {
let u = s.uncommitted.pop_front().unwrap();
log.added
.push(CompositeId::new(stream_id, s.cum_recordings));
let l = u.lock().unwrap();
let l = u.lock();
s.cum_recordings += 1;
let wall_dur = recording::Duration(l.wall_duration_90k.into());
let media_dur = recording::Duration(l.media_duration_90k.into());
@@ -1310,7 +1311,7 @@ impl LockedDatabase {
raw::list_recordings_by_time(&self.conn, stream_id, desired_time.clone(), f)?;
for (i, u) in s.uncommitted.iter().enumerate() {
let row = {
let l = u.lock().unwrap();
let l = u.lock();
if l.video_samples > 0 {
let end = l.start + recording::Duration(l.wall_duration_90k as i64);
if l.start > desired_time.end || end < desired_time.start {
@@ -1351,7 +1352,7 @@ impl LockedDatabase {
);
for i in start..end {
let row = {
let l = s.uncommitted[i].lock().unwrap();
let l = s.uncommitted[i].lock();
if l.video_samples > 0 {
l.to_list_row(
CompositeId::new(stream_id, s.cum_recordings + i as i32),
@@ -1489,7 +1490,7 @@ impl LockedDatabase {
),
);
}
let l = s.uncommitted[i as usize].lock().unwrap();
let l = s.uncommitted[i as usize].lock();
return f(&RecordingPlayback {
video_index: &l.video_index,
});
@@ -2319,7 +2320,7 @@ impl<C: Clocks + Clone> Drop for Database<C> {
return; // don't flush while panicking.
}
if let Some(m) = self.db.take() {
if let Err(e) = m.into_inner().unwrap().flush(&self.clocks, "drop") {
if let Err(e) = m.into_inner().flush(&self.clocks, "drop") {
error!(err = %e.chain(), "final database flush failed");
}
}
@@ -2418,7 +2419,7 @@ impl<C: Clocks + Clone> Database<C> {
/// operations.
pub fn lock(&self) -> DatabaseGuard<C> {
let timer = clock::TimerGuard::new(&self.clocks, acquisition);
let db = self.db.as_ref().unwrap().lock().unwrap();
let db = self.db.as_ref().unwrap().lock();
drop(timer);
let _timer = clock::TimerGuard::<C, &'static str, fn() -> &'static str>::new(
&self.clocks,
@@ -2435,7 +2436,7 @@ impl<C: Clocks + Clone> Database<C> {
/// This allows verification that a newly opened database is in an acceptable state.
#[cfg(test)]
fn close(mut self) -> rusqlite::Connection {
self.db.take().unwrap().into_inner().unwrap().conn
self.db.take().unwrap().into_inner().conn
}
}