diff --git a/db/db.rs b/db/db.rs index a0f71ec..8ecafd7 100644 --- a/db/db.rs +++ b/db/db.rs @@ -96,21 +96,6 @@ const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#" const UPDATE_NEXT_RECORDING_ID_SQL: &'static str = "update stream set next_recording_id = :next_recording_id where id = :stream_id"; -const LIST_OLDEST_SAMPLE_FILES_SQL: &'static str = r#" - select - composite_id, - start_time_90k, - duration_90k, - sample_file_bytes - from - recording - where - :start <= composite_id and - composite_id < :end - order by - composite_id -"#; - const LIST_RECORDINGS_BY_ID_SQL: &'static str = r#" select recording.composite_id, @@ -223,12 +208,12 @@ pub(crate) struct RecordingToInsert { pub sample_file_sha1: [u8; 20], } -/// A row used in `list_oldest_sample_files`. -#[derive(Debug)] -pub(crate) struct ListOldestSampleFilesRow { +/// A row used in `raw::list_oldest_recordings` and `db::delete_oldest_recordings`. +#[derive(Copy, Clone, Debug)] +pub(crate) struct ListOldestRecordingsRow { pub id: CompositeId, - pub sample_file_dir_id: i32, - pub time: Range, + pub start: recording::Time, + pub duration: i32, pub sample_file_bytes: i32, } @@ -371,6 +356,12 @@ pub struct Stream { pub range: Option>, pub sample_file_bytes: i64, + /// On flush, delete the following recordings. Note they must be the oldest recordings. + to_delete: Vec, + + /// The total bytes to delete. + pub bytes_to_delete: i64, + /// The total duration of recorded data. This may not be `range.end - range.start` due to /// gaps and overlap. pub duration: recording::Duration, @@ -578,10 +569,6 @@ pub struct LockedDatabase { list_recordings_by_time_sql: String, video_index_cache: RefCell, fnv::FnvBuildHasher>>, on_flush: Vec>, - - /// Recordings which have been enqueued for deletion via `LockedDatabase::delete_recordings` - /// but have yet to be committed. - to_delete: Vec, } /// Represents a row of the `open` database table. @@ -719,6 +706,8 @@ impl StreamStateChanger { flush_if: recording::Duration(sc.flush_if_sec * recording::TIME_UNITS_PER_SEC), range: None, sample_file_bytes: 0, + to_delete: Vec::new(), + bytes_to_delete: 0, duration: recording::Duration(0), days: BTreeMap::new(), record: sc.record, @@ -779,10 +768,6 @@ impl LockedDatabase { Ok((id, recording)) } - pub(crate) fn delete_recordings(&mut self, rows: &mut Vec) { - self.to_delete.append(rows); - } - pub(crate) fn delete_garbage(&mut self, dir_id: i32, ids: &mut Vec) -> Result<(), Error> { let dir = match self.sample_file_dirs_by_id.get_mut(&dir_id) { @@ -797,7 +782,7 @@ impl LockedDatabase { /// /// * commits any recordings added with `add_recording` that have since been marked as /// synced. - /// * moves old recordings to the garbage table as requested by `delete_recordings`. + /// * moves old recordings to the garbage table as requested by `delete_oldest_recordings`. /// * removes entries from the garbage table as requested by `mark_sample_files_deleted`. /// /// On success, for each affected sample file directory with a flush watcher set, sends a @@ -810,20 +795,10 @@ impl LockedDatabase { let tx = self.conn.transaction()?; let mut mods = FnvHashMap::with_capacity_and_hasher(self.streams_by_id.len(), Default::default()); - raw::delete_recordings(&tx, &self.to_delete)?; - for row in &self.to_delete { - // Add a placeholder for recomputing the range. - mods.entry(row.id.stream()).or_insert_with(StreamModification::default); - - let dir = match self.sample_file_dirs_by_id.get_mut(&row.sample_file_dir_id) { - None => bail!("Row refers to nonexistent sample file dir: {:#?}", row), - Some(d) => d, - }; - dir.garbage.insert(row.id); - } { let mut stmt = tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?; for (&stream_id, s) in &self.streams_by_id { + // Process additions. let mut i = 0; for recording in &s.uncommitted { let l = recording.lock(); @@ -842,6 +817,18 @@ impl LockedDatabase { (":next_recording_id", &(s.next_recording_id + i)), ])?; } + + // Process deletions. + if let Some(l) = s.to_delete.last() { + // Add a placeholder for recomputing the range. + mods.entry(stream_id).or_insert_with(StreamModification::default); + let dir = match s.sample_file_dir_id { + None => bail!("stream {} has no directory!", stream_id), + Some(d) => d, + }; + let end = CompositeId(l.id.0 + 1); + raw::delete_recordings(&tx, dir, CompositeId::new(stream_id, 0) .. end)?; + } } } for dir in self.sample_file_dirs_by_id.values() { @@ -852,15 +839,6 @@ impl LockedDatabase { } tx.commit()?; - // Process delete_recordings. - let deleted = self.to_delete.len(); - for row in self.to_delete.drain(..) { - let s = self.streams_by_id.get_mut(&row.id.stream()).unwrap(); - s.duration -= row.time.end - row.time.start; - s.sample_file_bytes -= row.sample_file_bytes as i64; - adjust_days(row.time, -1, &mut s.days); - } - // Process delete_garbage. let mut gced = 0; for dir in self.sample_file_dirs_by_id.values_mut() { @@ -870,10 +848,24 @@ impl LockedDatabase { } } - // Process add_recordings. let mut added = 0; + let mut deleted = 0; for (stream_id, m) in mods.drain() { let s = self.streams_by_id.get_mut(&stream_id).unwrap(); + let d = self.sample_file_dirs_by_id.get_mut(&s.sample_file_dir_id.unwrap()).unwrap(); + + // Process delete_oldest_recordings. + deleted += s.to_delete.len(); + s.sample_file_bytes -= s.bytes_to_delete; + s.bytes_to_delete = 0; + for row in s.to_delete.drain(..) { + d.garbage.insert(row.id); + let d = recording::Duration(row.duration as i64); + s.duration -= d; + adjust_days(row.start .. row.start + d, -1, &mut s.days); + } + + // Process add_recordings. s.next_recording_id += m.num_recordings_to_commit; added += m.num_recordings_to_commit; for _ in 0..m.num_recordings_to_commit { @@ -883,6 +875,8 @@ impl LockedDatabase { s.add_recording(r.time.clone(), r.sample_file_bytes); } } + + // Fix the range. s.range = m.range; } info!("Flush due to {}: added {} recordings, deleted {}, marked {} files GCed.", @@ -1145,40 +1139,27 @@ impl LockedDatabase { Err(format_err!("no such recording {}", id)) } - /// Lists the oldest sample files (to delete to free room). - /// `f` should return true as long as further rows are desired. - pub(crate) fn list_oldest_sample_files( - &self, stream_id: i32, f: &mut FnMut(ListOldestSampleFilesRow) -> bool) + /// Deletes the oldest recordings that aren't already queued for deletion. + /// `f` should return true for each row that should be deleted. + pub(crate) fn delete_oldest_recordings( + &mut self, stream_id: i32, f: &mut FnMut(&ListOldestRecordingsRow) -> bool) -> Result<(), Error> { - let s = match self.streams_by_id.get(&stream_id) { + let s = match self.streams_by_id.get_mut(&stream_id) { None => bail!("no stream {}", stream_id), Some(s) => s, }; - let sample_file_dir_id = match s.sample_file_dir_id { - None => bail!("stream {} has no dir", stream_id), - Some(d) => d, + let end = match s.to_delete.last() { + None => 0, + Some(row) => row.id.recording() + 1, }; - let mut stmt = self.conn.prepare_cached(LIST_OLDEST_SAMPLE_FILES_SQL)?; - let mut rows = stmt.query_named(&[ - (":start", &CompositeId::new(stream_id, 0).0), - (":end", &CompositeId::new(stream_id + 1, 0).0), - ])?; - while let Some(row) = rows.next() { - let row = row?; - let id = CompositeId(row.get_checked(0)?); - let start = recording::Time(row.get_checked(1)?); - let duration = recording::Duration(row.get_checked(2)?); - let should_continue = f(ListOldestSampleFilesRow{ - id, - sample_file_dir_id, - time: start .. start + duration, - sample_file_bytes: row.get_checked(3)?, - }); - if !should_continue { - break; + raw::list_oldest_recordings(&self.conn, CompositeId::new(stream_id, end), &mut |r| { + if f(&r) { + s.to_delete.push(r); + s.bytes_to_delete += r.sample_file_bytes as i64; + return true; } - } - Ok(()) + false + }) } /// Initializes the video_sample_entries. To be called during construction. @@ -1341,6 +1322,8 @@ impl LockedDatabase { flush_if: recording::Duration(flush_if_sec * recording::TIME_UNITS_PER_SEC), range: None, sample_file_bytes: 0, + to_delete: Vec::new(), + bytes_to_delete: 0, duration: recording::Duration(0), days: BTreeMap::new(), next_recording_id: row.get_checked(7)?, @@ -1717,7 +1700,6 @@ impl Database { video_sample_entries: BTreeMap::new(), video_index_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())), list_recordings_by_time_sql, - to_delete: Vec::new(), on_flush: Vec::new(), }))); { @@ -1847,10 +1829,11 @@ mod tests { assert_eq!(1, rows); rows = 0; - db.lock().list_oldest_sample_files(stream_id, &mut |row| { + raw::list_oldest_recordings(&db.lock().conn, CompositeId::new(stream_id, 0), &mut |row| { rows += 1; assert_eq!(recording_id, Some(row.id)); - assert_eq!(r.time, row.time); + assert_eq!(r.time.start, row.start); + assert_eq!(r.time.end - r.time.start, recording::Duration(row.duration as i64)); assert_eq!(r.sample_file_bytes, row.sample_file_bytes); true }).unwrap(); @@ -2053,11 +2036,24 @@ mod tests { // Deleting a recording should succeed, update the min/max times, and mark it as garbage. { let mut db = db.lock(); - let mut v = Vec::new(); - db.list_oldest_sample_files(stream_id, &mut |r| { v.push(r); true }).unwrap(); - assert_eq!(1, v.len()); - db.delete_recordings(&mut v); + let mut n = 0; + db.delete_oldest_recordings(stream_id, &mut |_| { n += 1; true }).unwrap(); + assert_eq!(n, 1); + { + let s = db.streams_by_id().get(&stream_id).unwrap(); + assert_eq!(s.sample_file_bytes, 42); + assert_eq!(s.bytes_to_delete, 42); + } + n = 0; + + // A second run + db.delete_oldest_recordings(stream_id, &mut |_| { n += 1; true }).unwrap(); + assert_eq!(n, 0); + assert_eq!(db.streams_by_id().get(&stream_id).unwrap().bytes_to_delete, 42); db.flush("delete test").unwrap(); + let s = db.streams_by_id().get(&stream_id).unwrap(); + assert_eq!(s.sample_file_bytes, 0); + assert_eq!(s.bytes_to_delete, 0); } assert_no_recordings(&db, camera_uuid); let g: Vec<_> = db.lock() diff --git a/db/dir.rs b/db/dir.rs index 5cdbbfb..8f07445 100644 --- a/db/dir.rs +++ b/db/dir.rs @@ -357,43 +357,52 @@ pub fn lower_retention(db: Arc, dir_id: i32, limits: &[NewLimit]) let db2 = db.clone(); let (mut syncer, _) = Syncer::new(&db.lock(), db2, dir_id)?; syncer.do_rotation(|db| { - let mut to_delete = Vec::new(); for l in limits { - let before = to_delete.len(); - let stream = db.streams_by_id().get(&l.stream_id) - .ok_or_else(|| format_err!("no such stream {}", l.stream_id))?; - if l.limit >= stream.sample_file_bytes { continue } - get_rows_to_delete(db, l.stream_id, stream, stream.retain_bytes - l.limit, - &mut to_delete)?; - info!("stream {}, {}->{}, deleting {} rows", stream.id, - stream.sample_file_bytes, l.limit, to_delete.len() - before); + let (bytes_before, extra); + { + let stream = db.streams_by_id().get(&l.stream_id) + .ok_or_else(|| format_err!("no such stream {}", l.stream_id))?; + bytes_before = stream.sample_file_bytes - stream.bytes_to_delete; + extra = stream.retain_bytes - l.limit; + } + if l.limit >= bytes_before { continue } + delete_recordings(db, l.stream_id, extra)?; + let stream = db.streams_by_id().get(&l.stream_id).unwrap(); + info!("stream {}, deleting: {}->{}", l.stream_id, bytes_before, + stream.sample_file_bytes - stream.bytes_to_delete); } - Ok(to_delete) + Ok(()) }) } -/// Gets rows to delete to bring a stream's disk usage within bounds. -fn get_rows_to_delete(db: &db::LockedDatabase, stream_id: i32, - stream: &db::Stream, extra_bytes_needed: i64, - to_delete: &mut Vec) -> Result<(), Error> { - let bytes_needed = stream.sample_file_bytes + extra_bytes_needed - stream.retain_bytes; +/// Deletes recordings to bring a stream's disk usage within bounds. +fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32, + extra_bytes_needed: i64) -> Result<(), Error> { + let bytes_needed = { + let stream = match db.streams_by_id().get(&stream_id) { + None => bail!("no stream {}", stream_id), + Some(s) => s, + }; + stream.sample_file_bytes - stream.bytes_to_delete + extra_bytes_needed + - stream.retain_bytes + }; let mut bytes_to_delete = 0; if bytes_needed <= 0 { - debug!("{}: have remaining quota of {}", stream.id, -bytes_needed); + debug!("{}: have remaining quota of {}", stream_id, -bytes_needed); return Ok(()); } let mut n = 0; - db.list_oldest_sample_files(stream_id, &mut |row| { - bytes_to_delete += row.sample_file_bytes as i64; - to_delete.push(row); + db.delete_oldest_recordings(stream_id, &mut |row| { n += 1; - bytes_needed > bytes_to_delete // continue as long as more deletions are needed. + if bytes_needed >= bytes_to_delete { + bytes_to_delete += row.sample_file_bytes as i64; + n += 1; + return true; + } + false })?; - if bytes_needed > bytes_to_delete { - bail!("{}: couldn't find enough files to delete: {} left.", stream.id, bytes_needed); - } info!("{}: deleting {} bytes in {} recordings ({} bytes needed)", - stream.id, bytes_to_delete, n, bytes_needed); + stream_id, bytes_to_delete, n, bytes_needed); Ok(()) } @@ -498,20 +507,19 @@ impl Syncer { /// Rotates files for all streams and deletes stale files from previous runs. fn initial_rotation(&mut self) -> Result<(), Error> { self.do_rotation(|db| { - let mut to_delete = Vec::new(); - for (stream_id, stream) in db.streams_by_id() { - get_rows_to_delete(&db, *stream_id, stream, 0, &mut to_delete)?; + let streams: Vec = db.streams_by_id().keys().map(|&id| id).collect(); + for &stream_id in &streams { + delete_recordings(db, stream_id, 0)?; } - Ok(to_delete) + Ok(()) }) } - fn do_rotation(&mut self, get_rows_to_delete: F) -> Result<(), Error> - where F: FnOnce(&db::LockedDatabase) -> Result, Error> { + fn do_rotation(&mut self, delete_recordings: F) -> Result<(), Error> + where F: FnOnce(&mut db::LockedDatabase) -> Result<(), Error> { { let mut db = self.db.lock(); - let mut to_delete = get_rows_to_delete(&*db)?; - db.delete_recordings(&mut to_delete); + delete_recordings(&mut *db)?; db.flush("synchronous deletion")?; } self.collect_garbage(false)?; @@ -576,16 +584,7 @@ impl Syncer { let stream_id = id.stream(); // Free up a like number of bytes. - { - let mut to_delete = Vec::new(); - let len = recording.lock().recording.as_ref().unwrap().sample_file_bytes as i64; - let mut db = self.db.lock(); - { - let stream = db.streams_by_id().get(&stream_id).unwrap(); - get_rows_to_delete(&db, stream_id, stream, len, &mut to_delete).unwrap(); - } - db.delete_recordings(&mut to_delete); - } + delete_recordings(&mut self.db.lock(), stream_id, 0).unwrap(); f.sync_all().unwrap(); self.dir.sync().unwrap(); diff --git a/db/raw.rs b/db/raw.rs index 9586d14..6e839de 100644 --- a/db/raw.rs +++ b/db/raw.rs @@ -72,6 +72,21 @@ const STREAM_MAX_START_SQL: &'static str = r#" order by start_time_90k desc; "#; +const LIST_OLDEST_RECORDINGS_SQL: &'static str = r#" + select + composite_id, + start_time_90k, + duration_90k, + sample_file_bytes + from + recording + where + :start <= composite_id and + composite_id < :end + order by + composite_id +"#; + /// Inserts the specified recording (for from `try_flush` only). pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: CompositeId, r: &db::RecordingToInsert) -> Result<(), Error> { @@ -105,33 +120,45 @@ pub(crate) fn insert_recording(tx: &rusqlite::Transaction, o: &db::Open, id: Com Ok(()) } -/// Deletes the given recordings from the `recording` and `recording_playback` tables. -/// Note they are not fully removed from the database; the ids are transferred to the -/// `garbage` table. -pub(crate) fn delete_recordings(tx: &rusqlite::Transaction, rows: &[db::ListOldestSampleFilesRow]) +/// Tranfers the given recording range from the `recording` and `recording_playback` tables to the +/// `garbage` table. `sample_file_dir_id` is assumed to be correct. +pub(crate) fn delete_recordings(tx: &rusqlite::Transaction, sample_file_dir_id: i32, + ids: Range) -> Result<(), Error> { - let mut del1 = tx.prepare_cached( - "delete from recording_playback where composite_id = :composite_id")?; - let mut del2 = tx.prepare_cached( - "delete from recording where composite_id = :composite_id")?; let mut insert = tx.prepare_cached(r#" - insert into garbage (sample_file_dir_id, composite_id) - values (:sample_file_dir_id, :composite_id) + insert into garbage (sample_file_dir_id, composite_id) + select + :sample_file_dir_id, + composite_id + from + recording + where + :start <= composite_id and + composite_id < :end "#)?; - for row in rows { - let changes = del1.execute_named(&[(":composite_id", &row.id.0)])?; - if changes != 1 { - bail!("no such recording_playback {}", row.id); - } - let changes = del2.execute_named(&[(":composite_id", &row.id.0)])?; - if changes != 1 { - bail!("no such recording {}", row.id); - } - insert.execute_named(&[ - (":sample_file_dir_id", &row.sample_file_dir_id), - (":composite_id", &row.id.0)], - )?; - } + let mut del1 = tx.prepare_cached(r#" + delete from recording_playback + where + :start <= composite_id and + composite_id < :end + "#)?; + let mut del2 = tx.prepare_cached(r#" + delete from recording + where + :start <= composite_id and + composite_id < :end + "#)?; + insert.execute_named(&[ + (":sample_file_dir_id", &sample_file_dir_id), + (":start", &ids.start.0), + (":end", &ids.end.0), + ])?; + let p: &[(&str, &rusqlite::types::ToSql)] = &[ + (":start", &ids.start.0), + (":end", &ids.end.0), + ]; + del1.execute_named(p)?; + del2.execute_named(p)?; Ok(()) } @@ -202,3 +229,28 @@ pub(crate) fn list_garbage(conn: &rusqlite::Connection, dir_id: i32) } Ok(garbage) } + +/// Lists the oldest recordings for a stream, starting with the given id. +/// `f` should return true as long as further rows are desired. +pub(crate) fn list_oldest_recordings(conn: &rusqlite::Connection, start: CompositeId, + f: &mut FnMut(db::ListOldestRecordingsRow) -> bool) + -> Result<(), Error> { + let mut stmt = conn.prepare_cached(LIST_OLDEST_RECORDINGS_SQL)?; + let mut rows = stmt.query_named(&[ + (":start", &start.0), + (":end", &CompositeId::new(start.stream() + 1, 0).0), + ])?; + while let Some(row) = rows.next() { + let row = row?; + let should_continue = f(db::ListOldestRecordingsRow { + id: CompositeId(row.get_checked(0)?), + start: recording::Time(row.get_checked(1)?), + duration: row.get_checked(2)?, + sample_file_bytes: row.get_checked(3)?, + }); + if !should_continue { + break; + } + } + Ok(()) +}