From 8d9939603ef91d36160686f4776c1038da9916aa Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Fri, 23 Feb 2018 13:35:25 -0800 Subject: [PATCH] fix repeated deletions within a flush When list_oldest_recordings was called twice with no intervening flush, it returned the same rows twice. This led to trying to delete it twice and all following flushes failing with a "no such recording x/y" message. Now, return each row only once, and track how many bytes have been returned. I think dir.rs's logic is still wrong for how many bytes to delete when multiple recordings are flushed at once (it ignores the bytes added by the first when computing the bytes to delete for the second), but this is progress. --- db/db.rs | 166 ++++++++++++++++++++++++++---------------------------- db/dir.rs | 83 ++++++++++++++------------- db/raw.rs | 100 ++++++++++++++++++++++++-------- 3 files changed, 198 insertions(+), 151 deletions(-) diff --git a/db/db.rs b/db/db.rs index a0f71ec..8ecafd7 100644 --- a/db/db.rs +++ b/db/db.rs @@ -96,21 +96,6 @@ const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#" const UPDATE_NEXT_RECORDING_ID_SQL: &'static str = "update stream set next_recording_id = :next_recording_id where id = :stream_id"; -const LIST_OLDEST_SAMPLE_FILES_SQL: &'static str = r#" - select - composite_id, - start_time_90k, - duration_90k, - sample_file_bytes - from - recording - where - :start <= composite_id and - composite_id < :end - order by - composite_id -"#; - const LIST_RECORDINGS_BY_ID_SQL: &'static str = r#" select recording.composite_id, @@ -223,12 +208,12 @@ pub(crate) struct RecordingToInsert { pub sample_file_sha1: [u8; 20], } -/// A row used in `list_oldest_sample_files`. -#[derive(Debug)] -pub(crate) struct ListOldestSampleFilesRow { +/// A row used in `raw::list_oldest_recordings` and `db::delete_oldest_recordings`. +#[derive(Copy, Clone, Debug)] +pub(crate) struct ListOldestRecordingsRow { pub id: CompositeId, - pub sample_file_dir_id: i32, - pub time: Range, + pub start: recording::Time, + pub duration: i32, pub sample_file_bytes: i32, } @@ -371,6 +356,12 @@ pub struct Stream { pub range: Option>, pub sample_file_bytes: i64, + /// On flush, delete the following recordings. Note they must be the oldest recordings. + to_delete: Vec, + + /// The total bytes to delete. + pub bytes_to_delete: i64, + /// The total duration of recorded data. This may not be `range.end - range.start` due to /// gaps and overlap. pub duration: recording::Duration, @@ -578,10 +569,6 @@ pub struct LockedDatabase { list_recordings_by_time_sql: String, video_index_cache: RefCell, fnv::FnvBuildHasher>>, on_flush: Vec>, - - /// Recordings which have been enqueued for deletion via `LockedDatabase::delete_recordings` - /// but have yet to be committed. - to_delete: Vec, } /// Represents a row of the `open` database table. @@ -719,6 +706,8 @@ impl StreamStateChanger { flush_if: recording::Duration(sc.flush_if_sec * recording::TIME_UNITS_PER_SEC), range: None, sample_file_bytes: 0, + to_delete: Vec::new(), + bytes_to_delete: 0, duration: recording::Duration(0), days: BTreeMap::new(), record: sc.record, @@ -779,10 +768,6 @@ impl LockedDatabase { Ok((id, recording)) } - pub(crate) fn delete_recordings(&mut self, rows: &mut Vec) { - self.to_delete.append(rows); - } - pub(crate) fn delete_garbage(&mut self, dir_id: i32, ids: &mut Vec) -> Result<(), Error> { let dir = match self.sample_file_dirs_by_id.get_mut(&dir_id) { @@ -797,7 +782,7 @@ impl LockedDatabase { /// /// * commits any recordings added with `add_recording` that have since been marked as /// synced. - /// * moves old recordings to the garbage table as requested by `delete_recordings`. + /// * moves old recordings to the garbage table as requested by `delete_oldest_recordings`. /// * removes entries from the garbage table as requested by `mark_sample_files_deleted`. /// /// On success, for each affected sample file directory with a flush watcher set, sends a @@ -810,20 +795,10 @@ impl LockedDatabase { let tx = self.conn.transaction()?; let mut mods = FnvHashMap::with_capacity_and_hasher(self.streams_by_id.len(), Default::default()); - raw::delete_recordings(&tx, &self.to_delete)?; - for row in &self.to_delete { - // Add a placeholder for recomputing the range. - mods.entry(row.id.stream()).or_insert_with(StreamModification::default); - - let dir = match self.sample_file_dirs_by_id.get_mut(&row.sample_file_dir_id) { - None => bail!("Row refers to nonexistent sample file dir: {:#?}", row), - Some(d) => d, - }; - dir.garbage.insert(row.id); - } { let mut stmt = tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?; for (&stream_id, s) in &self.streams_by_id { + // Process additions. let mut i = 0; for recording in &s.uncommitted { let l = recording.lock(); @@ -842,6 +817,18 @@ impl LockedDatabase { (":next_recording_id", &(s.next_recording_id + i)), ])?; } + + // Process deletions. + if let Some(l) = s.to_delete.last() { + // Add a placeholder for recomputing the range. + mods.entry(stream_id).or_insert_with(StreamModification::default); + let dir = match s.sample_file_dir_id { + None => bail!("stream {} has no directory!", stream_id), + Some(d) => d, + }; + let end = CompositeId(l.id.0 + 1); + raw::delete_recordings(&tx, dir, CompositeId::new(stream_id, 0) .. end)?; + } } } for dir in self.sample_file_dirs_by_id.values() { @@ -852,15 +839,6 @@ impl LockedDatabase { } tx.commit()?; - // Process delete_recordings. - let deleted = self.to_delete.len(); - for row in self.to_delete.drain(..) { - let s = self.streams_by_id.get_mut(&row.id.stream()).unwrap(); - s.duration -= row.time.end - row.time.start; - s.sample_file_bytes -= row.sample_file_bytes as i64; - adjust_days(row.time, -1, &mut s.days); - } - // Process delete_garbage. let mut gced = 0; for dir in self.sample_file_dirs_by_id.values_mut() { @@ -870,10 +848,24 @@ impl LockedDatabase { } } - // Process add_recordings. let mut added = 0; + let mut deleted = 0; for (stream_id, m) in mods.drain() { let s = self.streams_by_id.get_mut(&stream_id).unwrap(); + let d = self.sample_file_dirs_by_id.get_mut(&s.sample_file_dir_id.unwrap()).unwrap(); + + // Process delete_oldest_recordings. + deleted += s.to_delete.len(); + s.sample_file_bytes -= s.bytes_to_delete; + s.bytes_to_delete = 0; + for row in s.to_delete.drain(..) { + d.garbage.insert(row.id); + let d = recording::Duration(row.duration as i64); + s.duration -= d; + adjust_days(row.start .. row.start + d, -1, &mut s.days); + } + + // Process add_recordings. s.next_recording_id += m.num_recordings_to_commit; added += m.num_recordings_to_commit; for _ in 0..m.num_recordings_to_commit { @@ -883,6 +875,8 @@ impl LockedDatabase { s.add_recording(r.time.clone(), r.sample_file_bytes); } } + + // Fix the range. s.range = m.range; } info!("Flush due to {}: added {} recordings, deleted {}, marked {} files GCed.", @@ -1145,40 +1139,27 @@ impl LockedDatabase { Err(format_err!("no such recording {}", id)) } - /// Lists the oldest sample files (to delete to free room). - /// `f` should return true as long as further rows are desired. - pub(crate) fn list_oldest_sample_files( - &self, stream_id: i32, f: &mut FnMut(ListOldestSampleFilesRow) -> bool) + /// Deletes the oldest recordings that aren't already queued for deletion. + /// `f` should return true for each row that should be deleted. + pub(crate) fn delete_oldest_recordings( + &mut self, stream_id: i32, f: &mut FnMut(&ListOldestRecordingsRow) -> bool) -> Result<(), Error> { - let s = match self.streams_by_id.get(&stream_id) { + let s = match self.streams_by_id.get_mut(&stream_id) { None => bail!("no stream {}", stream_id), Some(s) => s, }; - let sample_file_dir_id = match s.sample_file_dir_id { - None => bail!("stream {} has no dir", stream_id), - Some(d) => d, + let end = match s.to_delete.last() { + None => 0, + Some(row) => row.id.recording() + 1, }; - let mut stmt = self.conn.prepare_cached(LIST_OLDEST_SAMPLE_FILES_SQL)?; - let mut rows = stmt.query_named(&[ - (":start", &CompositeId::new(stream_id, 0).0), - (":end", &CompositeId::new(stream_id + 1, 0).0), - ])?; - while let Some(row) = rows.next() { - let row = row?; - let id = CompositeId(row.get_checked(0)?); - let start = recording::Time(row.get_checked(1)?); - let duration = recording::Duration(row.get_checked(2)?); - let should_continue = f(ListOldestSampleFilesRow{ - id, - sample_file_dir_id, - time: start .. start + duration, - sample_file_bytes: row.get_checked(3)?, - }); - if !should_continue { - break; + raw::list_oldest_recordings(&self.conn, CompositeId::new(stream_id, end), &mut |r| { + if f(&r) { + s.to_delete.push(r); + s.bytes_to_delete += r.sample_file_bytes as i64; + return true; } - } - Ok(()) + false + }) } /// Initializes the video_sample_entries. To be called during construction. @@ -1341,6 +1322,8 @@ impl LockedDatabase { flush_if: recording::Duration(flush_if_sec * recording::TIME_UNITS_PER_SEC), range: None, sample_file_bytes: 0, + to_delete: Vec::new(), + bytes_to_delete: 0, duration: recording::Duration(0), days: BTreeMap::new(), next_recording_id: row.get_checked(7)?, @@ -1717,7 +1700,6 @@ impl Database { video_sample_entries: BTreeMap::new(), video_index_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())), list_recordings_by_time_sql, - to_delete: Vec::new(), on_flush: Vec::new(), }))); { @@ -1847,10 +1829,11 @@ mod tests { assert_eq!(1, rows); rows = 0; - db.lock().list_oldest_sample_files(stream_id, &mut |row| { + raw::list_oldest_recordings(&db.lock().conn, CompositeId::new(stream_id, 0), &mut |row| { rows += 1; assert_eq!(recording_id, Some(row.id)); - assert_eq!(r.time, row.time); + assert_eq!(r.time.start, row.start); + assert_eq!(r.time.end - r.time.start, recording::Duration(row.duration as i64)); assert_eq!(r.sample_file_bytes, row.sample_file_bytes); true }).unwrap(); @@ -2053,11 +2036,24 @@ mod tests { // Deleting a recording should succeed, update the min/max times, and mark it as garbage. { let mut db = db.lock(); - let mut v = Vec::new(); - db.list_oldest_sample_files(stream_id, &mut |r| { v.push(r); true }).unwrap(); - assert_eq!(1, v.len()); - db.delete_recordings(&mut v); + let mut n = 0; + db.delete_oldest_recordings(stream_id, &mut |_| { n += 1; true }).unwrap(); + assert_eq!(n, 1); + { + let s = db.streams_by_id().get(&stream_id).unwrap(); + assert_eq!(s.sample_file_bytes, 42); + assert_eq!(s.bytes_to_delete, 42); + } + n = 0; + + // A second run + db.delete_oldest_recordings(stream_id, &mut |_| { n += 1; true }).unwrap(); + assert_eq!(n, 0); + assert_eq!(db.streams_by_id().get(&stream_id).unwrap().bytes_to_delete, 42); db.flush("delete test").unwrap(); + let s = db.streams_by_id().get(&stream_id).unwrap(); + assert_eq!(s.sample_file_bytes, 0); + assert_eq!(s.bytes_to_delete, 0); } assert_no_recordings(&db, camera_uuid); let g: Vec<_> = db.lock() diff --git a/db/dir.rs b/db/dir.rs index 5cdbbfb..8f07445 100644 --- a/db/dir.rs +++ b/db/dir.rs @@ -357,43 +357,52 @@ pub fn lower_retention(db: Arc, dir_id: i32, limits: &[NewLimit]) let db2 = db.clone(); let (mut syncer, _) = Syncer::new(&db.lock(), db2, dir_id)?; syncer.do_rotation(|db| { - let mut to_delete = Vec::new(); for l in limits { - let before = to_delete.len(); - let stream = db.streams_by_id().get(&l.stream_id) - .ok_or_else(|| format_err!("no such stream {}", l.stream_id))?; - if l.limit >= stream.sample_file_bytes { continue } - get_rows_to_delete(db, l.stream_id, stream, stream.retain_bytes - l.limit, - &mut to_delete)?; - info!("stream {}, {}->{}, deleting {} rows", stream.id, - stream.sample_file_bytes, l.limit, to_delete.len() - before); + let (bytes_before, extra); + { + let stream = db.streams_by_id().get(&l.stream_id) + .ok_or_else(|| format_err!("no such stream {}", l.stream_id))?; + bytes_before = stream.sample_file_bytes - stream.bytes_to_delete; + extra = stream.retain_bytes - l.limit; + } + if l.limit >= bytes_before { continue } + delete_recordings(db, l.stream_id, extra)?; + let stream = db.streams_by_id().get(&l.stream_id).unwrap(); + info!("stream {}, deleting: {}->{}", l.stream_id, bytes_before, + stream.sample_file_bytes - stream.bytes_to_delete); } - Ok(to_delete) + Ok(()) }) } -/// Gets rows to delete to bring a stream's disk usage within bounds. -fn get_rows_to_delete(db: &db::LockedDatabase, stream_id: i32, - stream: &db::Stream, extra_bytes_needed: i64, - to_delete: &mut Vec) -> Result<(), Error> { - let bytes_needed = stream.sample_file_bytes + extra_bytes_needed - stream.retain_bytes; +/// Deletes recordings to bring a stream's disk usage within bounds. +fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32, + extra_bytes_needed: i64) -> Result<(), Error> { + let bytes_needed = { + let stream = match db.streams_by_id().get(&stream_id) { + None => bail!("no stream {}", stream_id), + Some(s) => s, + }; + stream.sample_file_bytes - stream.bytes_to_delete + extra_bytes_needed + - stream.retain_bytes + }; let mut bytes_to_delete = 0; if bytes_needed <= 0 { - debug!("{}: have remaining quota of {}", stream.id, -bytes_needed); + debug!("{}: have remaining quota of {}", stream_id, -bytes_needed); return Ok(()); } let mut n = 0; - db.list_oldest_sample_files(stream_id, &mut |row| { - bytes_to_delete += row.sample_file_bytes as i64; - to_delete.push(row); + db.delete_oldest_recordings(stream_id, &mut |row| { n += 1; - bytes_needed > bytes_to_delete // continue as long as more deletions are needed. + if bytes_needed >= bytes_to_delete { + bytes_to_delete += row.sample_file_bytes as i64; + n += 1; + return true; + } + false })?; - if bytes_needed > bytes_to_delete { - bail!("{}: couldn't find enough files to delete: {} left.", stream.id, bytes_needed); - } info!("{}: deleting {} bytes in {} recordings ({} bytes needed)", - stream.id, bytes_to_delete, n, bytes_needed); + stream_id, bytes_to_delete, n, bytes_needed); Ok(()) } @@ -498,20 +507,19 @@ impl Syncer { /// Rotates files for all streams and deletes stale files from previous runs. fn initial_rotation(&mut self) -> Result<(), Error> { self.do_rotation(|db| { - let mut to_delete = Vec::new(); - for (stream_id, stream) in db.streams_by_id() { - get_rows_to_delete(&db, *stream_id, stream, 0, &mut to_delete)?; + let streams: Vec = db.streams_by_id().keys().map(|&id| id).collect(); + for &stream_id in &streams { + delete_recordings(db, stream_id, 0)?; } - Ok(to_delete) + Ok(()) }) } - fn do_rotation(&mut self, get_rows_to_delete: F) -> Result<(), Error> - where F: FnOnce(&db::LockedDatabase) -> Result, Error> { + fn do_rotation(&mut self, delete_recordings: F) -> Result<(), Error> + where F: FnOnce(&mut db::LockedDatabase) -> Result<(), Error> { { let mut db = self.db.lock(); - let mut to_delete = get_rows_to_delete(&*db)?; - db.delete_recordings(&mut to_delete); + delete_recordings(&mut *db)?; db.flush("synchronous deletion")?; } self.collect_garbage(false)?; @@ -576,16 +584,7 @@ impl Syncer { let stream_id = id.stream(); // Free up a like number of bytes. - { - let mut to_delete = Vec::new(); - let len = recording.lock().recording.as_ref().unwrap().sample_file_bytes as i64; - let mut db = self.db.lock(); - { - let stream = db.streams_by_id().get(&stream_id).unwrap(); - get_rows_to_delete(&db, stream_id, stream, len, &mut to_delete).unwrap(); - } - db.delete_recordings(&mut to_delete); - } + delete_recordings(&mut self.db.lock(), stream_id, 0).unwrap(); f.sync_all().unwrap(); self.dir.sync().unwrap(); diff --git a/db/raw.rs b/db/raw.rs index 9586d14..6e839de 100644 --- a/db/raw.rs +++ b/db/raw.rs @@ -72,6 +72,21 @@ const STREAM_MAX_START_SQL: &'static str = r#" order by start_time_90k desc; "#; +const LIST_OLDEST_RECORDINGS_SQL: &'static str = r#" + select + composite_id, + start_time_90k, + duration_90k, + sample_file_bytes + from + recording + where + :start <= composite_id and + composite_id < :end + order by + composite_id +"#; + /// Inserts the specified recording (for from `try_flush` only). pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: CompositeId, r: &db::RecordingToInsert) -> Result<(), Error> { @@ -105,33 +120,45 @@ pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: Com Ok(()) } -/// Deletes the given recordings from the `recording` and `recording_playback` tables. -/// Note they are not fully removed from the database; the ids are transferred to the -/// `garbage` table. -pub(crate) fn delete_recordings(tx: &rusqlite::Transaction, rows: &[db::ListOldestSampleFilesRow]) +/// Tranfers the given recording range from the `recording` and `recording_playback` tables to the +/// `garbage` table. `sample_file_dir_id` is assumed to be correct. +pub(crate) fn delete_recordings(tx: &rusqlite::Transaction, sample_file_dir_id: i32, + ids: Range) -> Result<(), Error> { - let mut del1 = tx.prepare_cached( - "delete from recording_playback where composite_id = :composite_id")?; - let mut del2 = tx.prepare_cached( - "delete from recording where composite_id = :composite_id")?; let mut insert = tx.prepare_cached(r#" - insert into garbage (sample_file_dir_id, composite_id) - values (:sample_file_dir_id, :composite_id) + insert into garbage (sample_file_dir_id, composite_id) + select + :sample_file_dir_id, + composite_id + from + recording + where + :start <= composite_id and + composite_id < :end "#)?; - for row in rows { - let changes = del1.execute_named(&[(":composite_id", &row.id.0)])?; - if changes != 1 { - bail!("no such recording_playback {}", row.id); - } - let changes = del2.execute_named(&[(":composite_id", &row.id.0)])?; - if changes != 1 { - bail!("no such recording {}", row.id); - } - insert.execute_named(&[ - (":sample_file_dir_id", &row.sample_file_dir_id), - (":composite_id", &row.id.0)], - )?; - } + let mut del1 = tx.prepare_cached(r#" + delete from recording_playback + where + :start <= composite_id and + composite_id < :end + "#)?; + let mut del2 = tx.prepare_cached(r#" + delete from recording + where + :start <= composite_id and + composite_id < :end + "#)?; + insert.execute_named(&[ + (":sample_file_dir_id", &sample_file_dir_id), + (":start", &ids.start.0), + (":end", &ids.end.0), + ])?; + let p: &[(&str, &rusqlite::types::ToSql)] = &[ + (":start", &ids.start.0), + (":end", &ids.end.0), + ]; + del1.execute_named(p)?; + del2.execute_named(p)?; Ok(()) } @@ -202,3 +229,28 @@ pub(crate) fn list_garbage(conn: &rusqlite::Connection, dir_id: i32) } Ok(garbage) } + +/// Lists the oldest recordings for a stream, starting with the given id. +/// `f` should return true as long as further rows are desired. +pub(crate) fn list_oldest_recordings(conn: &rusqlite::Connection, start: CompositeId, + f: &mut FnMut(db::ListOldestRecordingsRow) -> bool) + -> Result<(), Error> { + let mut stmt = conn.prepare_cached(LIST_OLDEST_RECORDINGS_SQL)?; + let mut rows = stmt.query_named(&[ + (":start", &start.0), + (":end", &CompositeId::new(start.stream() + 1, 0).0), + ])?; + while let Some(row) = rows.next() { + let row = row?; + let should_continue = f(db::ListOldestRecordingsRow { + id: CompositeId(row.get_checked(0)?), + start: recording::Time(row.get_checked(1)?), + duration: row.get_checked(2)?, + sample_file_bytes: row.get_checked(3)?, + }); + if !should_continue { + break; + } + } + Ok(()) +}