garbage collection for signals

This commit is contained in:
Scott Lamb 2019-12-28 07:48:08 -06:00
parent 1fdf6eb022
commit 2bd8963961
3 changed files with 88 additions and 3 deletions

View File

@ -33,7 +33,12 @@
-- Database metadata. There should be exactly one row in this table. -- Database metadata. There should be exactly one row in this table.
create table meta ( create table meta (
uuid blob not null check (length(uuid) = 16) uuid blob not null check (length(uuid) = 16),
-- The maximum number of entries in the signal_state table. If an update
-- causes this to be exceeded, older times will be garbage collected to stay
-- within the limit.
max_signal_changes integer check (max_signal_changes >= 0)
); );
-- This table tracks the schema version. -- This table tracks the schema version.

View File

@ -34,6 +34,7 @@ use crate::db::FromSqlUuid;
use crate::recording; use crate::recording;
use failure::{Error, bail, format_err}; use failure::{Error, bail, format_err};
use fnv::FnvHashMap; use fnv::FnvHashMap;
use log::debug;
use rusqlite::{Connection, Transaction, params}; use rusqlite::{Connection, Transaction, params};
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::collections::btree_map::Entry; use std::collections::btree_map::Entry;
@ -51,8 +52,11 @@ pub(crate) struct State {
points_by_time: BTreeMap<recording::Time, Point>, points_by_time: BTreeMap<recording::Time, Point>,
/// `points_by_time` entries which need to be flushed to the database. /// Times which need to be flushed to the database.
/// These either have a matching `points_by_time` entry or represent a removal.
dirty_by_time: BTreeSet<recording::Time>, dirty_by_time: BTreeSet<recording::Time>,
max_signal_changes: Option<i64>,
} }
/// Representation of all signals at a point in time. /// Representation of all signals at a point in time.
@ -216,9 +220,12 @@ pub struct ListStateChangesRow {
impl State { impl State {
pub fn init(conn: &Connection) -> Result<Self, Error> { pub fn init(conn: &Connection) -> Result<Self, Error> {
let max_signal_changes: Option<i64> =
conn.query_row("select max_signal_changes from meta", params![], |row| row.get(0))?;
let mut signals_by_id = State::init_signals(conn)?; let mut signals_by_id = State::init_signals(conn)?;
State::fill_signal_cameras(conn, &mut signals_by_id)?; State::fill_signal_cameras(conn, &mut signals_by_id)?;
Ok(State { Ok(State {
max_signal_changes,
signals_by_id, signals_by_id,
types_by_uuid: State::init_types(conn)?, types_by_uuid: State::init_types(conn)?,
points_by_time: State::init_points(conn)?, points_by_time: State::init_points(conn)?,
@ -270,9 +277,35 @@ impl State {
self.update_signals_end(when.end, signals, states); self.update_signals_end(when.end, signals, states);
self.update_signals_start(when.start, signals, states); self.update_signals_start(when.start, signals, states);
self.update_signals_middle(when, signals, states); self.update_signals_middle(when, signals, states);
self.gc();
Ok(()) Ok(())
} }
/// Performs garbage collection if the number of points exceeds `max_signal_changes`.
fn gc(&mut self) {
let max = match self.max_signal_changes {
None => return,
Some(m) if m < 0 => 0 as usize,
Some(m) if m > (isize::max_value() as i64) => return,
Some(m) => m as usize,
};
let to_remove = match self.points_by_time.len().checked_sub(max) {
None => return,
Some(p) => p,
};
debug!("Performing signal GC: have {} points, want only {}, so removing {}",
self.points_by_time.len(), max, to_remove);
let remove: smallvec::SmallVec<[recording::Time; 4]> =
self.points_by_time.keys().take(to_remove).map(|p| *p).collect();
for p in &remove {
self.points_by_time.remove(p);
self.dirty_by_time.insert(*p);
}
}
/// Helper for `update_signals` to do validation. /// Helper for `update_signals` to do validation.
fn update_signals_validate(&self, signals: &[u32], states: &[u16]) -> Result<(), base::Error> { fn update_signals_validate(&self, signals: &[u32], states: &[u16]) -> Result<(), base::Error> {
if signals.len() != states.len() { if signals.len() != states.len() {
@ -696,6 +729,8 @@ mod tests {
let mut conn = Connection::open_in_memory().unwrap(); let mut conn = Connection::open_in_memory().unwrap();
db::init(&mut conn).unwrap(); db::init(&mut conn).unwrap();
conn.execute_batch(r#" conn.execute_batch(r#"
update meta set max_signal_changes = 2;
insert into signal (id, source_uuid, type_uuid, short_name) insert into signal (id, source_uuid, type_uuid, short_name)
values (1, x'1B3889C0A59F400DA24C94EBEB19CC3A', values (1, x'1B3889C0A59F400DA24C94EBEB19CC3A',
x'EE66270FD9C648198B339720D4CBCA6B', 'a'), x'EE66270FD9C648198B339720D4CBCA6B', 'a'),
@ -748,10 +783,53 @@ mod tests {
} }
drop(s); drop(s);
let s = State::init(&conn).unwrap(); let mut s = State::init(&conn).unwrap();
rows.clear(); rows.clear();
s.list_changes_by_time(recording::Time::min_value() .. recording::Time::max_value(), s.list_changes_by_time(recording::Time::min_value() .. recording::Time::max_value(),
&mut |r| rows.push(*r)); &mut |r| rows.push(*r));
assert_eq!(&rows[..], EXPECTED); assert_eq!(&rows[..], EXPECTED);
// Go through it again. This time, hit the max number of signals, forcing START to be
// dropped.
const SOON: recording::Time = recording::Time(140067473400000); // 2019-04-26T12:01:00
s.update_signals(NOW..SOON, &[1, 2], &[1, 2]).unwrap();
rows.clear();
const EXPECTED2: &[ListStateChangesRow] = &[
ListStateChangesRow {
when: NOW,
signal: 1,
state: 1,
},
ListStateChangesRow {
when: NOW,
signal: 2,
state: 2,
},
ListStateChangesRow {
when: SOON,
signal: 1,
state: 0,
},
ListStateChangesRow {
when: SOON,
signal: 2,
state: 0,
},
];
s.list_changes_by_time(recording::Time::min_value() .. recording::Time::max_value(),
&mut |r| rows.push(*r));
assert_eq!(&rows[..], EXPECTED2);
{
let tx = conn.transaction().unwrap();
s.flush(&tx).unwrap();
tx.commit().unwrap();
}
drop(s);
let s = State::init(&conn).unwrap();
rows.clear();
s.list_changes_by_time(recording::Time::min_value() .. recording::Time::max_value(),
&mut |r| rows.push(*r));
assert_eq!(&rows[..], EXPECTED2);
} }
} }

View File

@ -35,6 +35,8 @@ use failure::Error;
pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error> { pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error> {
// These create statements match the schema.sql when version 4 was the latest. // These create statements match the schema.sql when version 4 was the latest.
tx.execute_batch(r#" tx.execute_batch(r#"
alter table meta add column max_signal_changes integer check (max_signal_changes >= 0);
create table signal ( create table signal (
id integer primary key, id integer primary key,
source_uuid blob not null check (length(source_uuid) = 16), source_uuid blob not null check (length(source_uuid) = 16),