From 0b0f4ec9ed871b055e13617c3acdd9d911f5ec70 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Fri, 28 Dec 2018 15:06:32 -0600 Subject: [PATCH] NLL-inspired simplifications to db.rs * remove intermediate bool from adjust_day. * rewrite LockedDatabase::list_aggregate_recordings. I started by collapsing the flush into the first part of the if, in a similar way to adjust_day. But then I refactored more and ended up with a structure that probably would have been allowed with the old lexical borrow checker. I think it's more readable, and it does 1 btree operation per row where before it did 2 or 3. --- db/db.rs | 105 ++++++++++++++++++++++++++++--------------------------- 1 file changed, 54 insertions(+), 51 deletions(-) diff --git a/db/db.rs b/db/db.rs index 03fd085..1a6626b 100644 --- a/db/db.rs +++ b/db/db.rs @@ -166,6 +166,27 @@ pub struct ListAggregatedRecordingsRow { pub growing: bool, } +impl ListAggregatedRecordingsRow { + fn from(row: ListRecordingsRow) -> Self { + let recording_id = row.id.recording(); + let uncommitted = (row.flags & RecordingFlags::Uncommitted as i32) != 0; + let growing = (row.flags & RecordingFlags::Growing as i32) != 0; + ListAggregatedRecordingsRow { + time: row.start .. recording::Time(row.start.0 + row.duration_90k as i64), + ids: recording_id .. recording_id+1, + video_samples: row.video_samples as i64, + video_sync_samples: row.video_sync_samples as i64, + sample_file_bytes: row.sample_file_bytes as i64, + video_sample_entry_id: row.video_sample_entry_id, + stream_id: row.id.stream(), + run_start_id: recording_id - row.run_offset, + open_id: row.open_id, + first_uncommitted: if uncommitted { Some(recording_id) } else { None }, + growing, + } + } +} + /// Select fields from the `recordings_playback` table. Retrieve with `with_recording_playback`. #[derive(Debug)] pub struct RecordingPlayback<'a> { @@ -454,13 +475,10 @@ fn adjust_day(day: StreamDayKey, delta: StreamDayValue, match m.entry(day) { Entry::Vacant(e) => { e.insert(delta); }, Entry::Occupied(mut e) => { - let remove = { - let v = e.get_mut(); - v.recordings += delta.recordings; - v.duration += delta.duration; - v.recordings == 0 - }; - if remove { + let v = e.get_mut(); + v.recordings += delta.recordings; + v.duration += delta.duration; + if v.recordings == 0 { e.remove_entry(); } }, @@ -1124,58 +1142,43 @@ impl LockedDatabase { self.list_recordings_by_time(stream_id, desired_time, &mut |row| { let recording_id = row.id.recording(); let run_start_id = recording_id - row.run_offset; - let needs_flush = if let Some(a) = aggs.get(&run_start_id) { - let new_dur = a.time.end - a.time.start + - recording::Duration(row.duration_90k as i64); - a.ids.end != recording_id || row.video_sample_entry_id != a.video_sample_entry_id || - new_dur >= forced_split - } else { - false - }; - if needs_flush { - let a = aggs.remove(&run_start_id).expect("needs_flush when agg is None"); - f(&a)?; - } let uncommitted = (row.flags & RecordingFlags::Uncommitted as i32) != 0; let growing = (row.flags & RecordingFlags::Growing as i32) != 0; use std::collections::btree_map::Entry; match aggs.entry(run_start_id) { Entry::Occupied(mut e) => { let a = e.get_mut(); - if a.time.end != row.start { - bail!("stream {} recording {} ends at {}; {} starts at {}; expected same", - stream_id, a.ids.end - 1, a.time.end, row.id, row.start); + let new_dur = a.time.end - a.time.start + + recording::Duration(row.duration_90k as i64); + let needs_flush = + a.ids.end != recording_id || + row.video_sample_entry_id != a.video_sample_entry_id || + new_dur >= forced_split; + if needs_flush { // flush then start a new entry. + f(a)?; + *a = ListAggregatedRecordingsRow::from(row); + } else { // append. + if a.time.end != row.start { + bail!("stream {} recording {} ends at {} but {} starts at {}", + stream_id, a.ids.end - 1, a.time.end, row.id, row.start); + } + if a.open_id != row.open_id { + bail!("stream {} recording {} has open id {} but {} has {}", + stream_id, a.ids.end - 1, a.open_id, row.id, row.open_id); + } + a.time.end.0 += row.duration_90k as i64; + a.ids.end = recording_id + 1; + a.video_samples += row.video_samples as i64; + a.video_sync_samples += row.video_sync_samples as i64; + a.sample_file_bytes += row.sample_file_bytes as i64; + if uncommitted { + a.first_uncommitted = a.first_uncommitted.or(Some(recording_id)); + } + a.growing = growing; } - if a.open_id != row.open_id { - bail!("stream {} recording {} has open id {}; {} has {}; expected same", - stream_id, a.ids.end - 1, a.open_id, row.id, row.open_id); - } - a.time.end.0 += row.duration_90k as i64; - a.ids.end = recording_id + 1; - a.video_samples += row.video_samples as i64; - a.video_sync_samples += row.video_sync_samples as i64; - a.sample_file_bytes += row.sample_file_bytes as i64; - if uncommitted { - a.first_uncommitted = a.first_uncommitted.or(Some(recording_id)); - } - a.growing = growing; }, - Entry::Vacant(e) => { - e.insert(ListAggregatedRecordingsRow { - time: row.start .. recording::Time(row.start.0 + row.duration_90k as i64), - ids: recording_id .. recording_id+1, - video_samples: row.video_samples as i64, - video_sync_samples: row.video_sync_samples as i64, - sample_file_bytes: row.sample_file_bytes as i64, - video_sample_entry_id: row.video_sample_entry_id, - stream_id, - run_start_id, - open_id: row.open_id, - first_uncommitted: if uncommitted { Some(recording_id) } else { None }, - growing, - }); - }, - }; + Entry::Vacant(e) => { e.insert(ListAggregatedRecordingsRow::from(row)); }, + } Ok(()) })?; for a in aggs.values() {