fix repeated deletions within a flush

When list_oldest_recordings was called twice with no intervening flush, it
returned the same rows twice. This led to trying to delete it twice and all
following flushes failing with a "no such recording x/y" message. Now, return
each row only once, and track how many bytes have been returned.

I think dir.rs's logic is still wrong for how many bytes to delete when
multiple recordings are flushed at once (it ignores the bytes added by the
first when computing the bytes to delete for the second), but this is
progress.
This commit is contained in:
Scott Lamb 2018-02-23 13:35:25 -08:00
parent 843e1b49c8
commit 8d9939603e
3 changed files with 198 additions and 151 deletions

166
db/db.rs
View File

@ -96,21 +96,6 @@ const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#"
const UPDATE_NEXT_RECORDING_ID_SQL: &'static str =
"update stream set next_recording_id = :next_recording_id where id = :stream_id";
const LIST_OLDEST_SAMPLE_FILES_SQL: &'static str = r#"
select
composite_id,
start_time_90k,
duration_90k,
sample_file_bytes
from
recording
where
:start <= composite_id and
composite_id < :end
order by
composite_id
"#;
const LIST_RECORDINGS_BY_ID_SQL: &'static str = r#"
select
recording.composite_id,
@ -223,12 +208,12 @@ pub(crate) struct RecordingToInsert {
pub sample_file_sha1: [u8; 20],
}
/// A row used in `list_oldest_sample_files`.
#[derive(Debug)]
pub(crate) struct ListOldestSampleFilesRow {
/// A row used in `raw::list_oldest_recordings` and `db::delete_oldest_recordings`.
#[derive(Copy, Clone, Debug)]
pub(crate) struct ListOldestRecordingsRow {
pub id: CompositeId,
pub sample_file_dir_id: i32,
pub time: Range<recording::Time>,
pub start: recording::Time,
pub duration: i32,
pub sample_file_bytes: i32,
}
@ -371,6 +356,12 @@ pub struct Stream {
pub range: Option<Range<recording::Time>>,
pub sample_file_bytes: i64,
/// On flush, delete the following recordings. Note they must be the oldest recordings.
to_delete: Vec<ListOldestRecordingsRow>,
/// The total bytes to delete.
pub bytes_to_delete: i64,
/// The total duration of recorded data. This may not be `range.end - range.start` due to
/// gaps and overlap.
pub duration: recording::Duration,
@ -578,10 +569,6 @@ pub struct LockedDatabase {
list_recordings_by_time_sql: String,
video_index_cache: RefCell<LruCache<i64, Box<[u8]>, fnv::FnvBuildHasher>>,
on_flush: Vec<Box<Fn() + Send>>,
/// Recordings which have been enqueued for deletion via `LockedDatabase::delete_recordings`
/// but have yet to be committed.
to_delete: Vec<ListOldestSampleFilesRow>,
}
/// Represents a row of the `open` database table.
@ -719,6 +706,8 @@ impl StreamStateChanger {
flush_if: recording::Duration(sc.flush_if_sec * recording::TIME_UNITS_PER_SEC),
range: None,
sample_file_bytes: 0,
to_delete: Vec::new(),
bytes_to_delete: 0,
duration: recording::Duration(0),
days: BTreeMap::new(),
record: sc.record,
@ -779,10 +768,6 @@ impl LockedDatabase {
Ok((id, recording))
}
pub(crate) fn delete_recordings(&mut self, rows: &mut Vec<ListOldestSampleFilesRow>) {
self.to_delete.append(rows);
}
pub(crate) fn delete_garbage(&mut self, dir_id: i32, ids: &mut Vec<CompositeId>)
-> Result<(), Error> {
let dir = match self.sample_file_dirs_by_id.get_mut(&dir_id) {
@ -797,7 +782,7 @@ impl LockedDatabase {
///
/// * commits any recordings added with `add_recording` that have since been marked as
/// synced.
/// * moves old recordings to the garbage table as requested by `delete_recordings`.
/// * moves old recordings to the garbage table as requested by `delete_oldest_recordings`.
/// * removes entries from the garbage table as requested by `mark_sample_files_deleted`.
///
/// On success, for each affected sample file directory with a flush watcher set, sends a
@ -810,20 +795,10 @@ impl LockedDatabase {
let tx = self.conn.transaction()?;
let mut mods = FnvHashMap::with_capacity_and_hasher(self.streams_by_id.len(),
Default::default());
raw::delete_recordings(&tx, &self.to_delete)?;
for row in &self.to_delete {
// Add a placeholder for recomputing the range.
mods.entry(row.id.stream()).or_insert_with(StreamModification::default);
let dir = match self.sample_file_dirs_by_id.get_mut(&row.sample_file_dir_id) {
None => bail!("Row refers to nonexistent sample file dir: {:#?}", row),
Some(d) => d,
};
dir.garbage.insert(row.id);
}
{
let mut stmt = tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?;
for (&stream_id, s) in &self.streams_by_id {
// Process additions.
let mut i = 0;
for recording in &s.uncommitted {
let l = recording.lock();
@ -842,6 +817,18 @@ impl LockedDatabase {
(":next_recording_id", &(s.next_recording_id + i)),
])?;
}
// Process deletions.
if let Some(l) = s.to_delete.last() {
// Add a placeholder for recomputing the range.
mods.entry(stream_id).or_insert_with(StreamModification::default);
let dir = match s.sample_file_dir_id {
None => bail!("stream {} has no directory!", stream_id),
Some(d) => d,
};
let end = CompositeId(l.id.0 + 1);
raw::delete_recordings(&tx, dir, CompositeId::new(stream_id, 0) .. end)?;
}
}
}
for dir in self.sample_file_dirs_by_id.values() {
@ -852,15 +839,6 @@ impl LockedDatabase {
}
tx.commit()?;
// Process delete_recordings.
let deleted = self.to_delete.len();
for row in self.to_delete.drain(..) {
let s = self.streams_by_id.get_mut(&row.id.stream()).unwrap();
s.duration -= row.time.end - row.time.start;
s.sample_file_bytes -= row.sample_file_bytes as i64;
adjust_days(row.time, -1, &mut s.days);
}
// Process delete_garbage.
let mut gced = 0;
for dir in self.sample_file_dirs_by_id.values_mut() {
@ -870,10 +848,24 @@ impl LockedDatabase {
}
}
// Process add_recordings.
let mut added = 0;
let mut deleted = 0;
for (stream_id, m) in mods.drain() {
let s = self.streams_by_id.get_mut(&stream_id).unwrap();
let d = self.sample_file_dirs_by_id.get_mut(&s.sample_file_dir_id.unwrap()).unwrap();
// Process delete_oldest_recordings.
deleted += s.to_delete.len();
s.sample_file_bytes -= s.bytes_to_delete;
s.bytes_to_delete = 0;
for row in s.to_delete.drain(..) {
d.garbage.insert(row.id);
let d = recording::Duration(row.duration as i64);
s.duration -= d;
adjust_days(row.start .. row.start + d, -1, &mut s.days);
}
// Process add_recordings.
s.next_recording_id += m.num_recordings_to_commit;
added += m.num_recordings_to_commit;
for _ in 0..m.num_recordings_to_commit {
@ -883,6 +875,8 @@ impl LockedDatabase {
s.add_recording(r.time.clone(), r.sample_file_bytes);
}
}
// Fix the range.
s.range = m.range;
}
info!("Flush due to {}: added {} recordings, deleted {}, marked {} files GCed.",
@ -1145,40 +1139,27 @@ impl LockedDatabase {
Err(format_err!("no such recording {}", id))
}
/// Lists the oldest sample files (to delete to free room).
/// `f` should return true as long as further rows are desired.
pub(crate) fn list_oldest_sample_files(
&self, stream_id: i32, f: &mut FnMut(ListOldestSampleFilesRow) -> bool)
/// Deletes the oldest recordings that aren't already queued for deletion.
/// `f` should return true for each row that should be deleted.
pub(crate) fn delete_oldest_recordings(
&mut self, stream_id: i32, f: &mut FnMut(&ListOldestRecordingsRow) -> bool)
-> Result<(), Error> {
let s = match self.streams_by_id.get(&stream_id) {
let s = match self.streams_by_id.get_mut(&stream_id) {
None => bail!("no stream {}", stream_id),
Some(s) => s,
};
let sample_file_dir_id = match s.sample_file_dir_id {
None => bail!("stream {} has no dir", stream_id),
Some(d) => d,
let end = match s.to_delete.last() {
None => 0,
Some(row) => row.id.recording() + 1,
};
let mut stmt = self.conn.prepare_cached(LIST_OLDEST_SAMPLE_FILES_SQL)?;
let mut rows = stmt.query_named(&[
(":start", &CompositeId::new(stream_id, 0).0),
(":end", &CompositeId::new(stream_id + 1, 0).0),
])?;
while let Some(row) = rows.next() {
let row = row?;
let id = CompositeId(row.get_checked(0)?);
let start = recording::Time(row.get_checked(1)?);
let duration = recording::Duration(row.get_checked(2)?);
let should_continue = f(ListOldestSampleFilesRow{
id,
sample_file_dir_id,
time: start .. start + duration,
sample_file_bytes: row.get_checked(3)?,
});
if !should_continue {
break;
raw::list_oldest_recordings(&self.conn, CompositeId::new(stream_id, end), &mut |r| {
if f(&r) {
s.to_delete.push(r);
s.bytes_to_delete += r.sample_file_bytes as i64;
return true;
}
}
Ok(())
false
})
}
/// Initializes the video_sample_entries. To be called during construction.
@ -1341,6 +1322,8 @@ impl LockedDatabase {
flush_if: recording::Duration(flush_if_sec * recording::TIME_UNITS_PER_SEC),
range: None,
sample_file_bytes: 0,
to_delete: Vec::new(),
bytes_to_delete: 0,
duration: recording::Duration(0),
days: BTreeMap::new(),
next_recording_id: row.get_checked(7)?,
@ -1717,7 +1700,6 @@ impl Database {
video_sample_entries: BTreeMap::new(),
video_index_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())),
list_recordings_by_time_sql,
to_delete: Vec::new(),
on_flush: Vec::new(),
})));
{
@ -1847,10 +1829,11 @@ mod tests {
assert_eq!(1, rows);
rows = 0;
db.lock().list_oldest_sample_files(stream_id, &mut |row| {
raw::list_oldest_recordings(&db.lock().conn, CompositeId::new(stream_id, 0), &mut |row| {
rows += 1;
assert_eq!(recording_id, Some(row.id));
assert_eq!(r.time, row.time);
assert_eq!(r.time.start, row.start);
assert_eq!(r.time.end - r.time.start, recording::Duration(row.duration as i64));
assert_eq!(r.sample_file_bytes, row.sample_file_bytes);
true
}).unwrap();
@ -2053,11 +2036,24 @@ mod tests {
// Deleting a recording should succeed, update the min/max times, and mark it as garbage.
{
let mut db = db.lock();
let mut v = Vec::new();
db.list_oldest_sample_files(stream_id, &mut |r| { v.push(r); true }).unwrap();
assert_eq!(1, v.len());
db.delete_recordings(&mut v);
let mut n = 0;
db.delete_oldest_recordings(stream_id, &mut |_| { n += 1; true }).unwrap();
assert_eq!(n, 1);
{
let s = db.streams_by_id().get(&stream_id).unwrap();
assert_eq!(s.sample_file_bytes, 42);
assert_eq!(s.bytes_to_delete, 42);
}
n = 0;
// A second run
db.delete_oldest_recordings(stream_id, &mut |_| { n += 1; true }).unwrap();
assert_eq!(n, 0);
assert_eq!(db.streams_by_id().get(&stream_id).unwrap().bytes_to_delete, 42);
db.flush("delete test").unwrap();
let s = db.streams_by_id().get(&stream_id).unwrap();
assert_eq!(s.sample_file_bytes, 0);
assert_eq!(s.bytes_to_delete, 0);
}
assert_no_recordings(&db, camera_uuid);
let g: Vec<_> = db.lock()

View File

@ -357,43 +357,52 @@ pub fn lower_retention(db: Arc<db::Database>, dir_id: i32, limits: &[NewLimit])
let db2 = db.clone();
let (mut syncer, _) = Syncer::new(&db.lock(), db2, dir_id)?;
syncer.do_rotation(|db| {
let mut to_delete = Vec::new();
for l in limits {
let before = to_delete.len();
let stream = db.streams_by_id().get(&l.stream_id)
.ok_or_else(|| format_err!("no such stream {}", l.stream_id))?;
if l.limit >= stream.sample_file_bytes { continue }
get_rows_to_delete(db, l.stream_id, stream, stream.retain_bytes - l.limit,
&mut to_delete)?;
info!("stream {}, {}->{}, deleting {} rows", stream.id,
stream.sample_file_bytes, l.limit, to_delete.len() - before);
let (bytes_before, extra);
{
let stream = db.streams_by_id().get(&l.stream_id)
.ok_or_else(|| format_err!("no such stream {}", l.stream_id))?;
bytes_before = stream.sample_file_bytes - stream.bytes_to_delete;
extra = stream.retain_bytes - l.limit;
}
if l.limit >= bytes_before { continue }
delete_recordings(db, l.stream_id, extra)?;
let stream = db.streams_by_id().get(&l.stream_id).unwrap();
info!("stream {}, deleting: {}->{}", l.stream_id, bytes_before,
stream.sample_file_bytes - stream.bytes_to_delete);
}
Ok(to_delete)
Ok(())
})
}
/// Gets rows to delete to bring a stream's disk usage within bounds.
fn get_rows_to_delete(db: &db::LockedDatabase, stream_id: i32,
stream: &db::Stream, extra_bytes_needed: i64,
to_delete: &mut Vec<db::ListOldestSampleFilesRow>) -> Result<(), Error> {
let bytes_needed = stream.sample_file_bytes + extra_bytes_needed - stream.retain_bytes;
/// Deletes recordings to bring a stream's disk usage within bounds.
fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32,
extra_bytes_needed: i64) -> Result<(), Error> {
let bytes_needed = {
let stream = match db.streams_by_id().get(&stream_id) {
None => bail!("no stream {}", stream_id),
Some(s) => s,
};
stream.sample_file_bytes - stream.bytes_to_delete + extra_bytes_needed
- stream.retain_bytes
};
let mut bytes_to_delete = 0;
if bytes_needed <= 0 {
debug!("{}: have remaining quota of {}", stream.id, -bytes_needed);
debug!("{}: have remaining quota of {}", stream_id, -bytes_needed);
return Ok(());
}
let mut n = 0;
db.list_oldest_sample_files(stream_id, &mut |row| {
bytes_to_delete += row.sample_file_bytes as i64;
to_delete.push(row);
db.delete_oldest_recordings(stream_id, &mut |row| {
n += 1;
bytes_needed > bytes_to_delete // continue as long as more deletions are needed.
if bytes_needed >= bytes_to_delete {
bytes_to_delete += row.sample_file_bytes as i64;
n += 1;
return true;
}
false
})?;
if bytes_needed > bytes_to_delete {
bail!("{}: couldn't find enough files to delete: {} left.", stream.id, bytes_needed);
}
info!("{}: deleting {} bytes in {} recordings ({} bytes needed)",
stream.id, bytes_to_delete, n, bytes_needed);
stream_id, bytes_to_delete, n, bytes_needed);
Ok(())
}
@ -498,20 +507,19 @@ impl Syncer {
/// Rotates files for all streams and deletes stale files from previous runs.
fn initial_rotation(&mut self) -> Result<(), Error> {
self.do_rotation(|db| {
let mut to_delete = Vec::new();
for (stream_id, stream) in db.streams_by_id() {
get_rows_to_delete(&db, *stream_id, stream, 0, &mut to_delete)?;
let streams: Vec<i32> = db.streams_by_id().keys().map(|&id| id).collect();
for &stream_id in &streams {
delete_recordings(db, stream_id, 0)?;
}
Ok(to_delete)
Ok(())
})
}
fn do_rotation<F>(&mut self, get_rows_to_delete: F) -> Result<(), Error>
where F: FnOnce(&db::LockedDatabase) -> Result<Vec<db::ListOldestSampleFilesRow>, Error> {
fn do_rotation<F>(&mut self, delete_recordings: F) -> Result<(), Error>
where F: FnOnce(&mut db::LockedDatabase) -> Result<(), Error> {
{
let mut db = self.db.lock();
let mut to_delete = get_rows_to_delete(&*db)?;
db.delete_recordings(&mut to_delete);
delete_recordings(&mut *db)?;
db.flush("synchronous deletion")?;
}
self.collect_garbage(false)?;
@ -576,16 +584,7 @@ impl Syncer {
let stream_id = id.stream();
// Free up a like number of bytes.
{
let mut to_delete = Vec::new();
let len = recording.lock().recording.as_ref().unwrap().sample_file_bytes as i64;
let mut db = self.db.lock();
{
let stream = db.streams_by_id().get(&stream_id).unwrap();
get_rows_to_delete(&db, stream_id, stream, len, &mut to_delete).unwrap();
}
db.delete_recordings(&mut to_delete);
}
delete_recordings(&mut self.db.lock(), stream_id, 0).unwrap();
f.sync_all().unwrap();
self.dir.sync().unwrap();

100
db/raw.rs
View File

@ -72,6 +72,21 @@ const STREAM_MAX_START_SQL: &'static str = r#"
order by start_time_90k desc;
"#;
const LIST_OLDEST_RECORDINGS_SQL: &'static str = r#"
select
composite_id,
start_time_90k,
duration_90k,
sample_file_bytes
from
recording
where
:start <= composite_id and
composite_id < :end
order by
composite_id
"#;
/// Inserts the specified recording (for from `try_flush` only).
pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: CompositeId,
r: &db::RecordingToInsert) -> Result<(), Error> {
@ -105,33 +120,45 @@ pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: Com
Ok(())
}
/// Deletes the given recordings from the `recording` and `recording_playback` tables.
/// Note they are not fully removed from the database; the ids are transferred to the
/// `garbage` table.
pub(crate) fn delete_recordings(tx: &rusqlite::Transaction, rows: &[db::ListOldestSampleFilesRow])
/// Tranfers the given recording range from the `recording` and `recording_playback` tables to the
/// `garbage` table. `sample_file_dir_id` is assumed to be correct.
pub(crate) fn delete_recordings(tx: &rusqlite::Transaction, sample_file_dir_id: i32,
ids: Range<CompositeId>)
-> Result<(), Error> {
let mut del1 = tx.prepare_cached(
"delete from recording_playback where composite_id = :composite_id")?;
let mut del2 = tx.prepare_cached(
"delete from recording where composite_id = :composite_id")?;
let mut insert = tx.prepare_cached(r#"
insert into garbage (sample_file_dir_id, composite_id)
values (:sample_file_dir_id, :composite_id)
insert into garbage (sample_file_dir_id, composite_id)
select
:sample_file_dir_id,
composite_id
from
recording
where
:start <= composite_id and
composite_id < :end
"#)?;
for row in rows {
let changes = del1.execute_named(&[(":composite_id", &row.id.0)])?;
if changes != 1 {
bail!("no such recording_playback {}", row.id);
}
let changes = del2.execute_named(&[(":composite_id", &row.id.0)])?;
if changes != 1 {
bail!("no such recording {}", row.id);
}
insert.execute_named(&[
(":sample_file_dir_id", &row.sample_file_dir_id),
(":composite_id", &row.id.0)],
)?;
}
let mut del1 = tx.prepare_cached(r#"
delete from recording_playback
where
:start <= composite_id and
composite_id < :end
"#)?;
let mut del2 = tx.prepare_cached(r#"
delete from recording
where
:start <= composite_id and
composite_id < :end
"#)?;
insert.execute_named(&[
(":sample_file_dir_id", &sample_file_dir_id),
(":start", &ids.start.0),
(":end", &ids.end.0),
])?;
let p: &[(&str, &rusqlite::types::ToSql)] = &[
(":start", &ids.start.0),
(":end", &ids.end.0),
];
del1.execute_named(p)?;
del2.execute_named(p)?;
Ok(())
}
@ -202,3 +229,28 @@ pub(crate) fn list_garbage(conn: &rusqlite::Connection, dir_id: i32)
}
Ok(garbage)
}
/// Lists the oldest recordings for a stream, starting with the given id.
/// `f` should return true as long as further rows are desired.
pub(crate) fn list_oldest_recordings(conn: &rusqlite::Connection, start: CompositeId,
f: &mut FnMut(db::ListOldestRecordingsRow) -> bool)
-> Result<(), Error> {
let mut stmt = conn.prepare_cached(LIST_OLDEST_RECORDINGS_SQL)?;
let mut rows = stmt.query_named(&[
(":start", &start.0),
(":end", &CompositeId::new(start.stream() + 1, 0).0),
])?;
while let Some(row) = rows.next() {
let row = row?;
let should_continue = f(db::ListOldestRecordingsRow {
id: CompositeId(row.get_checked(0)?),
start: recording::Time(row.get_checked(1)?),
duration: row.get_checked(2)?,
sample_file_bytes: row.get_checked(3)?,
});
if !should_continue {
break;
}
}
Ok(())
}