NLL-inspired simplifications to db.rs

* remove intermediate bool from adjust_day.

* rewrite LockedDatabase::list_aggregate_recordings.
  I started by collapsing the flush into the first part of the if, in a
  similar way to adjust_day. But then I refactored more and ended up
  with a structure that probably would have been allowed with the old
  lexical borrow checker. I think it's more readable, and it does 1
  btree operation per row where before it did 2 or 3.
This commit is contained in:
Scott Lamb 2018-12-28 15:06:32 -06:00
parent 699ec87968
commit 0b0f4ec9ed

View File

@ -166,6 +166,27 @@ pub struct ListAggregatedRecordingsRow {
pub growing: bool,
}
impl ListAggregatedRecordingsRow {
fn from(row: ListRecordingsRow) -> Self {
let recording_id = row.id.recording();
let uncommitted = (row.flags & RecordingFlags::Uncommitted as i32) != 0;
let growing = (row.flags & RecordingFlags::Growing as i32) != 0;
ListAggregatedRecordingsRow {
time: row.start .. recording::Time(row.start.0 + row.duration_90k as i64),
ids: recording_id .. recording_id+1,
video_samples: row.video_samples as i64,
video_sync_samples: row.video_sync_samples as i64,
sample_file_bytes: row.sample_file_bytes as i64,
video_sample_entry_id: row.video_sample_entry_id,
stream_id: row.id.stream(),
run_start_id: recording_id - row.run_offset,
open_id: row.open_id,
first_uncommitted: if uncommitted { Some(recording_id) } else { None },
growing,
}
}
}
/// Select fields from the `recordings_playback` table. Retrieve with `with_recording_playback`.
#[derive(Debug)]
pub struct RecordingPlayback<'a> {
@ -454,13 +475,10 @@ fn adjust_day(day: StreamDayKey, delta: StreamDayValue,
match m.entry(day) {
Entry::Vacant(e) => { e.insert(delta); },
Entry::Occupied(mut e) => {
let remove = {
let v = e.get_mut();
v.recordings += delta.recordings;
v.duration += delta.duration;
v.recordings == 0
};
if remove {
if v.recordings == 0 {
e.remove_entry();
}
},
@ -1124,30 +1142,28 @@ impl LockedDatabase {
self.list_recordings_by_time(stream_id, desired_time, &mut |row| {
let recording_id = row.id.recording();
let run_start_id = recording_id - row.run_offset;
let needs_flush = if let Some(a) = aggs.get(&run_start_id) {
let new_dur = a.time.end - a.time.start +
recording::Duration(row.duration_90k as i64);
a.ids.end != recording_id || row.video_sample_entry_id != a.video_sample_entry_id ||
new_dur >= forced_split
} else {
false
};
if needs_flush {
let a = aggs.remove(&run_start_id).expect("needs_flush when agg is None");
f(&a)?;
}
let uncommitted = (row.flags & RecordingFlags::Uncommitted as i32) != 0;
let growing = (row.flags & RecordingFlags::Growing as i32) != 0;
use std::collections::btree_map::Entry;
match aggs.entry(run_start_id) {
Entry::Occupied(mut e) => {
let a = e.get_mut();
let new_dur = a.time.end - a.time.start +
recording::Duration(row.duration_90k as i64);
let needs_flush =
a.ids.end != recording_id ||
row.video_sample_entry_id != a.video_sample_entry_id ||
new_dur >= forced_split;
if needs_flush { // flush then start a new entry.
f(a)?;
*a = ListAggregatedRecordingsRow::from(row);
} else { // append.
if a.time.end != row.start {
bail!("stream {} recording {} ends at {}; {} starts at {}; expected same",
bail!("stream {} recording {} ends at {} but {} starts at {}",
stream_id, a.ids.end - 1, a.time.end, row.id, row.start);
}
if a.open_id != row.open_id {
bail!("stream {} recording {} has open id {}; {} has {}; expected same",
bail!("stream {} recording {} has open id {} but {} has {}",
stream_id, a.ids.end - 1, a.open_id, row.id, row.open_id);
}
a.time.end.0 += row.duration_90k as i64;
@ -1159,23 +1175,10 @@ impl LockedDatabase {
a.first_uncommitted = a.first_uncommitted.or(Some(recording_id));
}
a.growing = growing;
}
},
Entry::Vacant(e) => {
e.insert(ListAggregatedRecordingsRow {
time: row.start .. recording::Time(row.start.0 + row.duration_90k as i64),
ids: recording_id .. recording_id+1,
video_samples: row.video_samples as i64,
video_sync_samples: row.video_sync_samples as i64,
sample_file_bytes: row.sample_file_bytes as i64,
video_sample_entry_id: row.video_sample_entry_id,
stream_id,
run_start_id,
open_id: row.open_id,
first_uncommitted: if uncommitted { Some(recording_id) } else { None },
growing,
});
},
};
Entry::Vacant(e) => { e.insert(ListAggregatedRecordingsRow::from(row)); },
}
Ok(())
})?;
for a in aggs.values() {