cleanup: s/unflushed sample/unindexed sample/

db/writer.rs used the word "unflushed" in two ways:

* something which has been communicated to the LockedDatabase object but
  not yet committed to disk with SQLite.

* a video sample (aka video frame) which has been written to the sample
  file but not yet included in the video index. This happens because the
  duration of a frame isn't known until the following frame. These are
  always also unflushed in the other sense of the word (as unfinished
  recordings are never committed). But they can't be seen by clients at
  all, where indexed but uncommitted video frames can.

Replace the latter with "unindexed" to make things more clear. And a
couple minor other style cleanups.
This commit is contained in:
Scott Lamb 2020-04-14 22:56:20 -07:00
parent 3ed397bacd
commit 9d7cdc0954

View File

@ -40,12 +40,11 @@ use failure::{Error, bail, format_err};
use fnv::FnvHashMap;
use parking_lot::Mutex;
use log::{debug, trace, warn};
use std::cmp::Ordering;
use std::cmp;
use std::convert::TryFrom;
use std::cmp::{self, Ordering};
use std::io;
use std::mem;
use std::sync::Arc;
use std::sync::mpsc;
use std::sync::{Arc, mpsc};
use std::thread;
use std::time::Duration as StdDuration;
use time::{Duration, Timespec};
@ -228,7 +227,7 @@ fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32,
let mut n = 0;
db.delete_oldest_recordings(stream_id, &mut |row| {
if bytes_needed >= bytes_to_delete {
bytes_to_delete += row.sample_file_bytes as i64;
bytes_to_delete += i64::from(row.sample_file_bytes);
n += 1;
return true;
}
@ -580,7 +579,7 @@ struct InnerWriter<F: FileWriter> {
/// closed uncleanly (with a zero duration, which the `.mp4` format allows only at the end).
///
/// Invariant: this should always be `Some` (briefly violated during `write` call only).
unflushed_sample: Option<UnflushedSample>,
unindexed_sample: Option<UnindexedSample>,
}
/// Adjusts durations given by the camera to correct its clock frequency error.
@ -606,8 +605,8 @@ impl ClockAdjuster {
let (every_minus_1, ndir) = match local_time_delta {
Some(d) if d <= -2700 => (1999, 1),
Some(d) if d >= 2700 => (1999, -1),
Some(d) if d < -60 => ((60 * 90000) / -(d as i32) - 1, 1),
Some(d) if d > 60 => ((60 * 90000) / (d as i32) - 1, -1),
Some(d) if d < -60 => ((60 * 90000) / -i32::try_from(d).unwrap() - 1, 1),
Some(d) if d > 60 => ((60 * 90000) / i32::try_from(d).unwrap() - 1, -1),
_ => (i32::max_value(), 0),
};
ClockAdjuster{
@ -633,9 +632,9 @@ impl ClockAdjuster {
}
#[derive(Copy, Clone)]
struct UnflushedSample {
struct UnindexedSample {
local_time: recording::Time,
pts_90k: i64, // relative to the start of the stream, not a single recording.
pts_90k: i64, // relative to the start of the run, not a single recording.
len: i32,
is_key: bool,
}
@ -664,7 +663,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
/// Opens a new writer.
/// On successful return, `self.state` will be `WriterState::Open(w)` with `w` violating the
/// invariant that `unflushed_sample` is `Some`. The caller (`write`) is responsible for
/// invariant that `unindexed_sample` is `Some`. The caller (`write`) is responsible for
/// correcting this.
fn open(&mut self) -> Result<(), Error> {
let prev = match self.state {
@ -690,7 +689,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
hasher: blake3::Hasher::new(),
local_start: recording::Time(i64::max_value()),
adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)),
unflushed_sample: None,
unindexed_sample: None,
});
Ok(())
}
@ -713,24 +712,24 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
_ => unreachable!(),
};
// Note w's invariant that `unflushed_sample` is `None` may currently be violated.
// Note w's invariant that `unindexed_sample` is `None` may currently be violated.
// We must restore it on all success or error paths.
if let Some(unflushed) = w.unflushed_sample.take() {
let duration = (pts_90k - unflushed.pts_90k as i64) as i32;
if let Some(unindexed) = w.unindexed_sample.take() {
let duration = i32::try_from(pts_90k - i64::from(unindexed.pts_90k))?;
if duration <= 0 {
// Restore invariant.
w.unflushed_sample = Some(unflushed);
w.unindexed_sample = Some(unindexed);
bail!("pts not monotonically increasing; got {} then {}",
unflushed.pts_90k, pts_90k);
unindexed.pts_90k, pts_90k);
}
let duration = w.adjuster.adjust(duration);
let d = match w.add_sample(duration, unflushed.len, unflushed.is_key,
unflushed.local_time) {
let d = match w.add_sample(duration, unindexed.len, unindexed.is_key,
unindexed.local_time) {
Ok(d) => d,
Err(e) => {
// Restore invariant.
w.unflushed_sample = Some(unflushed);
w.unindexed_sample = Some(unindexed);
return Err(e);
},
};
@ -750,10 +749,10 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> {
let written = clock::retry_forever(&self.db.clocks(), &mut || w.f.write(remaining));
remaining = &remaining[written..];
}
w.unflushed_sample = Some(UnflushedSample {
w.unindexed_sample = Some(UnindexedSample {
local_time,
pts_90k,
len: pkt.len() as i32,
len: i32::try_from(pkt.len()).unwrap(),
is_key,
});
w.hasher.update(pkt);
@ -781,7 +780,7 @@ impl<F: FileWriter> InnerWriter<F> {
pkt_local_time: recording::Time) -> Result<i32, Error> {
let mut l = self.r.lock();
self.e.add_sample(duration_90k, bytes, is_key, &mut l)?;
let new = pkt_local_time - recording::Duration(l.duration_90k as i64);
let new = pkt_local_time - recording::Duration(i64::from(l.duration_90k));
self.local_start = cmp::min(self.local_start, new);
if l.run_offset == 0 { // start time isn't anchored to previous recording's end; adjust.
l.start = self.local_start;
@ -791,15 +790,15 @@ impl<F: FileWriter> InnerWriter<F> {
fn close<C: Clocks + Clone>(mut self, channel: &SyncerChannel<F>, next_pts: Option<i64>,
db: &db::Database<C>, stream_id: i32) -> Result<PreviousWriter, Error> {
let unflushed = self.unflushed_sample.take().expect("should always be an unflushed sample");
let unindexed = self.unindexed_sample.take().expect("should always be an unindexed sample");
let (last_sample_duration, flags) = match next_pts {
None => (self.adjuster.adjust(0), db::RecordingFlags::TrailingZero as i32),
Some(p) => (self.adjuster.adjust((p - unflushed.pts_90k) as i32), 0),
Some(p) => (self.adjuster.adjust(i32::try_from(p - unindexed.pts_90k)?), 0),
};
let blake3 = self.hasher.finalize();
let (local_time_delta, run_offset, end);
let d = self.add_sample(last_sample_duration, unflushed.len, unflushed.is_key,
unflushed.local_time)?;
let d = self.add_sample(last_sample_duration, unindexed.len, unindexed.is_key,
unindexed.local_time)?;
// This always ends a live segment.
db.lock().send_live_segment(stream_id, db::LiveSegment {
@ -813,7 +812,7 @@ impl<F: FileWriter> InnerWriter<F> {
local_time_delta = self.local_start - l.start;
l.local_time_delta = local_time_delta;
l.sample_file_blake3 = Some(blake3.as_bytes().clone());
total_duration = recording::Duration(l.duration_90k as i64);
total_duration = recording::Duration(i64::from(l.duration_90k));
run_offset = l.run_offset;
end = l.start + total_duration;
}