massive error overhaul

* fully stop using ancient `failure` crate in favor of own error type
* set an `ErrorKind` on everything
This commit is contained in:
Scott Lamb
2023-07-09 22:04:17 -07:00
parent 6a5b751bd6
commit 64ca096ff3
54 changed files with 1493 additions and 1108 deletions

View File

@@ -9,7 +9,7 @@ use crate::dir;
use crate::recording::{self, MAX_RECORDING_WALL_DURATION};
use base::clock::{self, Clocks};
use base::shutdown::ShutdownError;
use failure::{bail, format_err, Error};
use base::{bail, err, Error};
use fnv::FnvHashMap;
use std::cmp::{self, Ordering};
use std::convert::TryFrom;
@@ -218,10 +218,9 @@ pub fn lower_retention(
for l in limits {
let (fs_bytes_before, extra);
{
let stream = db
.streams_by_id()
.get(&l.stream_id)
.ok_or_else(|| format_err!("no such stream {}", l.stream_id))?;
let Some(stream) = db.streams_by_id().get(&l.stream_id) else {
bail!(NotFound, msg("no such stream {}", l.stream_id));
};
fs_bytes_before =
stream.fs_bytes + stream.fs_bytes_to_add - stream.fs_bytes_to_delete;
extra = stream.config.retain_bytes - l.limit;
@@ -245,7 +244,7 @@ fn delete_recordings(
) -> Result<(), Error> {
let fs_bytes_needed = {
let stream = match db.streams_by_id().get(&stream_id) {
None => bail!("no stream {}", stream_id),
None => bail!(NotFound, msg("no stream {stream_id}")),
Some(s) => s,
};
stream.fs_bytes + stream.fs_bytes_to_add - stream.fs_bytes_to_delete + extra_bytes_needed
@@ -326,7 +325,7 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
let d = l
.sample_file_dirs_by_id()
.get(&dir_id)
.ok_or_else(|| format_err!("no dir {}", dir_id))?;
.ok_or_else(|| err!(NotFound, msg("no dir {dir_id}")))?;
let dir = d.get()?;
// Abandon files.
@@ -345,17 +344,20 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
let to_abandon = list_files_to_abandon(&dir, streams_to_next)?;
let mut undeletable = 0;
for &id in &to_abandon {
if let Err(e) = dir.unlink_file(id) {
if e == nix::Error::ENOENT {
warn!("dir: abandoned recording {} already deleted!", id);
if let Err(err) = dir.unlink_file(id) {
if err == nix::Error::ENOENT {
warn!(%id, "dir: abandoned recording already deleted");
} else {
warn!("dir: Unable to unlink abandoned recording {}: {}", id, e);
warn!(%err, %id, "dir: unable to unlink abandoned recording");
undeletable += 1;
}
}
}
if undeletable > 0 {
bail!("Unable to delete {} abandoned recordings.", undeletable);
bail!(
Unknown,
msg("unable to delete {undeletable} abandoned recordings; see logs")
);
}
Ok((
@@ -410,8 +412,8 @@ impl<C: Clocks + Clone> Syncer<C, Arc<dir::SampleFileDir>> {
}
if errors > 0 {
bail!(
"Unable to unlink {} files (see earlier warning messages for details)",
errors
Unknown,
msg("unable to unlink {errors} files (see earlier warning messages for details)"),
);
}
self.dir.sync()?;
@@ -718,7 +720,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
WriterState::Unopened => None,
WriterState::Open(ref o) => {
if o.video_sample_entry_id != video_sample_entry_id {
bail!("inconsistent video_sample_entry_id");
bail!(Internal, msg("inconsistent video_sample_entry_id"));
}
return Ok(());
}
@@ -738,7 +740,8 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
)?;
let f = clock::retry(&self.db.clocks(), shutdown_rx, &mut || {
self.dir.create_file(id)
})?;
})
.map_err(|e| err!(Cancelled, source(e)))?;
self.state = WriterState::Open(InnerWriter {
f,
@@ -757,7 +760,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
Ok(match self.state {
WriterState::Unopened => false,
WriterState::Closed(_) => true,
WriterState::Open(_) => bail!("open!"),
WriterState::Open(_) => bail!(Internal, msg("open!")),
})
}
@@ -786,9 +789,12 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
if duration <= 0 {
w.unindexed_sample = Some(unindexed); // restore invariant.
bail!(
"pts not monotonically increasing; got {} then {}",
unindexed.pts_90k,
pts_90k
InvalidArgument,
msg(
"pts not monotonically increasing; got {} then {}",
unindexed.pts_90k,
pts_90k,
),
);
}
let duration = match i32::try_from(duration) {
@@ -796,9 +802,12 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
Err(_) => {
w.unindexed_sample = Some(unindexed); // restore invariant.
bail!(
"excessive pts jump from {} to {}",
unindexed.pts_90k,
pts_90k
InvalidArgument,
msg(
"excessive pts jump from {} to {}",
unindexed.pts_90k,
pts_90k,
),
)
}
};
@@ -822,10 +831,10 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
Err(e) => {
// close() will do nothing because unindexed_sample will be None.
tracing::warn!(
"Abandoning incompletely written recording {} on shutdown",
"abandoning incompletely written recording {} on shutdown",
w.id
);
return Err(e.into());
bail!(Cancelled, source(e));
}
};
remaining = &remaining[written..];
@@ -894,9 +903,8 @@ impl<F: FileWriter> InnerWriter<F> {
+ i32::try_from(clamp(local_start.0 - start.0, -limit, limit)).unwrap();
if wall_duration_90k > i32::try_from(MAX_RECORDING_WALL_DURATION).unwrap() {
bail!(
"Duration {} exceeds maximum {}",
wall_duration_90k,
MAX_RECORDING_WALL_DURATION
OutOfRange,
msg("Duration {wall_duration_90k} exceeds maximum {MAX_RECORDING_WALL_DURATION}"),
);
}
l.wall_duration_90k = wall_duration_90k;
@@ -926,14 +934,29 @@ impl<F: FileWriter> InnerWriter<F> {
reason: Option<String>,
) -> Result<PreviousWriter, Error> {
let unindexed = self.unindexed_sample.take().ok_or_else(|| {
format_err!(
"Unable to add recording {} to database due to aborted write",
self.id
err!(
FailedPrecondition,
msg(
"unable to add recording {} to database due to aborted write",
self.id,
),
)
})?;
let (last_sample_duration, flags) = match next_pts {
None => (0, db::RecordingFlags::TrailingZero as i32),
Some(p) => (i32::try_from(p - unindexed.pts_90k)?, 0),
Some(p) => (
i32::try_from(p - unindexed.pts_90k).map_err(|_| {
err!(
OutOfRange,
msg(
"pts {} following {} creates invalid duration",
p,
unindexed.pts_90k
)
)
})?,
0,
),
};
let blake3 = self.hasher.finalize();
let (run_offset, end);