diff --git a/db/schema.sql b/db/schema.sql index 95d50bb..20b22fc 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -33,7 +33,12 @@ -- Database metadata. There should be exactly one row in this table. create table meta ( - uuid blob not null check (length(uuid) = 16) + uuid blob not null check (length(uuid) = 16), + + -- The maximum number of entries in the signal_state table. If an update + -- causes this to be exceeded, older times will be garbage collected to stay + -- within the limit. + max_signal_changes integer check (max_signal_changes >= 0) ); -- This table tracks the schema version. diff --git a/db/signal.rs b/db/signal.rs index 0b8c5cf..e9fa42a 100644 --- a/db/signal.rs +++ b/db/signal.rs @@ -34,6 +34,7 @@ use crate::db::FromSqlUuid; use crate::recording; use failure::{Error, bail, format_err}; use fnv::FnvHashMap; +use log::debug; use rusqlite::{Connection, Transaction, params}; use std::collections::{BTreeMap, BTreeSet}; use std::collections::btree_map::Entry; @@ -51,8 +52,11 @@ pub(crate) struct State { points_by_time: BTreeMap, - /// `points_by_time` entries which need to be flushed to the database. + /// Times which need to be flushed to the database. + /// These either have a matching `points_by_time` entry or represent a removal. dirty_by_time: BTreeSet, + + max_signal_changes: Option, } /// Representation of all signals at a point in time. @@ -216,9 +220,12 @@ pub struct ListStateChangesRow { impl State { pub fn init(conn: &Connection) -> Result { + let max_signal_changes: Option = + conn.query_row("select max_signal_changes from meta", params![], |row| row.get(0))?; let mut signals_by_id = State::init_signals(conn)?; State::fill_signal_cameras(conn, &mut signals_by_id)?; Ok(State { + max_signal_changes, signals_by_id, types_by_uuid: State::init_types(conn)?, points_by_time: State::init_points(conn)?, @@ -270,9 +277,35 @@ impl State { self.update_signals_end(when.end, signals, states); self.update_signals_start(when.start, signals, states); self.update_signals_middle(when, signals, states); + + self.gc(); Ok(()) } + /// Performs garbage collection if the number of points exceeds `max_signal_changes`. + fn gc(&mut self) { + let max = match self.max_signal_changes { + None => return, + Some(m) if m < 0 => 0 as usize, + Some(m) if m > (isize::max_value() as i64) => return, + Some(m) => m as usize, + }; + let to_remove = match self.points_by_time.len().checked_sub(max) { + None => return, + Some(p) => p, + }; + debug!("Performing signal GC: have {} points, want only {}, so removing {}", + self.points_by_time.len(), max, to_remove); + + let remove: smallvec::SmallVec<[recording::Time; 4]> = + self.points_by_time.keys().take(to_remove).map(|p| *p).collect(); + + for p in &remove { + self.points_by_time.remove(p); + self.dirty_by_time.insert(*p); + } + } + /// Helper for `update_signals` to do validation. fn update_signals_validate(&self, signals: &[u32], states: &[u16]) -> Result<(), base::Error> { if signals.len() != states.len() { @@ -696,6 +729,8 @@ mod tests { let mut conn = Connection::open_in_memory().unwrap(); db::init(&mut conn).unwrap(); conn.execute_batch(r#" + update meta set max_signal_changes = 2; + insert into signal (id, source_uuid, type_uuid, short_name) values (1, x'1B3889C0A59F400DA24C94EBEB19CC3A', x'EE66270FD9C648198B339720D4CBCA6B', 'a'), @@ -748,10 +783,53 @@ mod tests { } drop(s); - let s = State::init(&conn).unwrap(); + let mut s = State::init(&conn).unwrap(); rows.clear(); s.list_changes_by_time(recording::Time::min_value() .. recording::Time::max_value(), &mut |r| rows.push(*r)); assert_eq!(&rows[..], EXPECTED); + + // Go through it again. This time, hit the max number of signals, forcing START to be + // dropped. + const SOON: recording::Time = recording::Time(140067473400000); // 2019-04-26T12:01:00 + s.update_signals(NOW..SOON, &[1, 2], &[1, 2]).unwrap(); + rows.clear(); + const EXPECTED2: &[ListStateChangesRow] = &[ + ListStateChangesRow { + when: NOW, + signal: 1, + state: 1, + }, + ListStateChangesRow { + when: NOW, + signal: 2, + state: 2, + }, + ListStateChangesRow { + when: SOON, + signal: 1, + state: 0, + }, + ListStateChangesRow { + when: SOON, + signal: 2, + state: 0, + }, + ]; + s.list_changes_by_time(recording::Time::min_value() .. recording::Time::max_value(), + &mut |r| rows.push(*r)); + assert_eq!(&rows[..], EXPECTED2); + + { + let tx = conn.transaction().unwrap(); + s.flush(&tx).unwrap(); + tx.commit().unwrap(); + } + drop(s); + let s = State::init(&conn).unwrap(); + rows.clear(); + s.list_changes_by_time(recording::Time::min_value() .. recording::Time::max_value(), + &mut |r| rows.push(*r)); + assert_eq!(&rows[..], EXPECTED2); } } diff --git a/db/upgrade/v3_to_v4.rs b/db/upgrade/v3_to_v4.rs index 8af214f..3bbb998 100644 --- a/db/upgrade/v3_to_v4.rs +++ b/db/upgrade/v3_to_v4.rs @@ -35,6 +35,8 @@ use failure::Error; pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error> { // These create statements match the schema.sql when version 4 was the latest. tx.execute_batch(r#" + alter table meta add column max_signal_changes integer check (max_signal_changes >= 0); + create table signal ( id integer primary key, source_uuid blob not null check (length(source_uuid) = 16),