diff --git a/db/writer.rs b/db/writer.rs index 2792e53..9956cae 100644 --- a/db/writer.rs +++ b/db/writer.rs @@ -40,12 +40,11 @@ use failure::{Error, bail, format_err}; use fnv::FnvHashMap; use parking_lot::Mutex; use log::{debug, trace, warn}; -use std::cmp::Ordering; -use std::cmp; +use std::convert::TryFrom; +use std::cmp::{self, Ordering}; use std::io; use std::mem; -use std::sync::Arc; -use std::sync::mpsc; +use std::sync::{Arc, mpsc}; use std::thread; use std::time::Duration as StdDuration; use time::{Duration, Timespec}; @@ -228,7 +227,7 @@ fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32, let mut n = 0; db.delete_oldest_recordings(stream_id, &mut |row| { if bytes_needed >= bytes_to_delete { - bytes_to_delete += row.sample_file_bytes as i64; + bytes_to_delete += i64::from(row.sample_file_bytes); n += 1; return true; } @@ -580,7 +579,7 @@ struct InnerWriter { /// closed uncleanly (with a zero duration, which the `.mp4` format allows only at the end). /// /// Invariant: this should always be `Some` (briefly violated during `write` call only). - unflushed_sample: Option, + unindexed_sample: Option, } /// Adjusts durations given by the camera to correct its clock frequency error. @@ -606,8 +605,8 @@ impl ClockAdjuster { let (every_minus_1, ndir) = match local_time_delta { Some(d) if d <= -2700 => (1999, 1), Some(d) if d >= 2700 => (1999, -1), - Some(d) if d < -60 => ((60 * 90000) / -(d as i32) - 1, 1), - Some(d) if d > 60 => ((60 * 90000) / (d as i32) - 1, -1), + Some(d) if d < -60 => ((60 * 90000) / -i32::try_from(d).unwrap() - 1, 1), + Some(d) if d > 60 => ((60 * 90000) / i32::try_from(d).unwrap() - 1, -1), _ => (i32::max_value(), 0), }; ClockAdjuster{ @@ -633,9 +632,9 @@ impl ClockAdjuster { } #[derive(Copy, Clone)] -struct UnflushedSample { +struct UnindexedSample { local_time: recording::Time, - pts_90k: i64, // relative to the start of the stream, not a single recording. + pts_90k: i64, // relative to the start of the run, not a single recording. len: i32, is_key: bool, } @@ -664,7 +663,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { /// Opens a new writer. /// On successful return, `self.state` will be `WriterState::Open(w)` with `w` violating the - /// invariant that `unflushed_sample` is `Some`. The caller (`write`) is responsible for + /// invariant that `unindexed_sample` is `Some`. The caller (`write`) is responsible for /// correcting this. fn open(&mut self) -> Result<(), Error> { let prev = match self.state { @@ -690,7 +689,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { hasher: blake3::Hasher::new(), local_start: recording::Time(i64::max_value()), adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)), - unflushed_sample: None, + unindexed_sample: None, }); Ok(()) } @@ -713,24 +712,24 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { _ => unreachable!(), }; - // Note w's invariant that `unflushed_sample` is `None` may currently be violated. + // Note w's invariant that `unindexed_sample` is `None` may currently be violated. // We must restore it on all success or error paths. - if let Some(unflushed) = w.unflushed_sample.take() { - let duration = (pts_90k - unflushed.pts_90k as i64) as i32; + if let Some(unindexed) = w.unindexed_sample.take() { + let duration = i32::try_from(pts_90k - i64::from(unindexed.pts_90k))?; if duration <= 0 { // Restore invariant. - w.unflushed_sample = Some(unflushed); + w.unindexed_sample = Some(unindexed); bail!("pts not monotonically increasing; got {} then {}", - unflushed.pts_90k, pts_90k); + unindexed.pts_90k, pts_90k); } let duration = w.adjuster.adjust(duration); - let d = match w.add_sample(duration, unflushed.len, unflushed.is_key, - unflushed.local_time) { + let d = match w.add_sample(duration, unindexed.len, unindexed.is_key, + unindexed.local_time) { Ok(d) => d, Err(e) => { // Restore invariant. - w.unflushed_sample = Some(unflushed); + w.unindexed_sample = Some(unindexed); return Err(e); }, }; @@ -750,10 +749,10 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { let written = clock::retry_forever(&self.db.clocks(), &mut || w.f.write(remaining)); remaining = &remaining[written..]; } - w.unflushed_sample = Some(UnflushedSample { + w.unindexed_sample = Some(UnindexedSample { local_time, pts_90k, - len: pkt.len() as i32, + len: i32::try_from(pkt.len()).unwrap(), is_key, }); w.hasher.update(pkt); @@ -781,7 +780,7 @@ impl InnerWriter { pkt_local_time: recording::Time) -> Result { let mut l = self.r.lock(); self.e.add_sample(duration_90k, bytes, is_key, &mut l)?; - let new = pkt_local_time - recording::Duration(l.duration_90k as i64); + let new = pkt_local_time - recording::Duration(i64::from(l.duration_90k)); self.local_start = cmp::min(self.local_start, new); if l.run_offset == 0 { // start time isn't anchored to previous recording's end; adjust. l.start = self.local_start; @@ -791,15 +790,15 @@ impl InnerWriter { fn close(mut self, channel: &SyncerChannel, next_pts: Option, db: &db::Database, stream_id: i32) -> Result { - let unflushed = self.unflushed_sample.take().expect("should always be an unflushed sample"); + let unindexed = self.unindexed_sample.take().expect("should always be an unindexed sample"); let (last_sample_duration, flags) = match next_pts { None => (self.adjuster.adjust(0), db::RecordingFlags::TrailingZero as i32), - Some(p) => (self.adjuster.adjust((p - unflushed.pts_90k) as i32), 0), + Some(p) => (self.adjuster.adjust(i32::try_from(p - unindexed.pts_90k)?), 0), }; let blake3 = self.hasher.finalize(); let (local_time_delta, run_offset, end); - let d = self.add_sample(last_sample_duration, unflushed.len, unflushed.is_key, - unflushed.local_time)?; + let d = self.add_sample(last_sample_duration, unindexed.len, unindexed.is_key, + unindexed.local_time)?; // This always ends a live segment. db.lock().send_live_segment(stream_id, db::LiveSegment { @@ -813,7 +812,7 @@ impl InnerWriter { local_time_delta = self.local_start - l.start; l.local_time_delta = local_time_delta; l.sample_file_blake3 = Some(blake3.as_bytes().clone()); - total_duration = recording::Duration(l.duration_90k as i64); + total_duration = recording::Duration(i64::from(l.duration_90k)); run_offset = l.run_offset; end = l.start + total_duration; }