From 7dd98bb76afdad625339d6048ab87f614e34b29f Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Thu, 13 Jun 2019 21:55:15 -0700 Subject: [PATCH] db crate support for updating signals (#28) This is a definite work in progress. In particular, * there's no src/web.rs support yet so it can't be used, * the code is surprisingly complex, and there's almost no tests so far. I want to at least get complete branch coverage. * I may still go back to time_sec rather than time_90k to save RAM and flash. I simplified the approach a bit from the earlier goal in design/api.md. In particular, there's no longer the separate concept of "observation" vs "prediction". Now the predictions are just observations that extend a bit beyond now. They may be flushed prematurely and I'll try living with that to avoid making things even more complex. --- db/auth.rs | 9 +- db/db.rs | 10 +- db/recording.rs | 13 +- db/schema.sql | 24 +- db/signal.rs | 517 ++++++++++++++++++++++++++++++++++++----- db/upgrade/v3_to_v4.rs | 2 +- design/api.md | 89 +++---- src/web.rs | 6 +- 8 files changed, 538 insertions(+), 132 deletions(-) diff --git a/db/auth.rs b/db/auth.rs index bacfcb9..2bb585c 100644 --- a/db/auth.rs +++ b/db/auth.rs @@ -609,7 +609,11 @@ impl State { Ok(()) } - pub fn flush(&mut self, tx: &Transaction) -> Result<(), Error> { + /// Flushes all pending database changes to the given transaction. + /// + /// The caller is expected to call `post_flush` afterward if the transaction is + /// successfully committed. + pub fn flush(&self, tx: &Transaction) -> Result<(), Error> { let mut u_stmt = tx.prepare(r#" update user set @@ -655,6 +659,9 @@ impl State { Ok(()) } + /// Marks that the previous `flush` was completed successfully. + /// + /// See notes there. pub fn post_flush(&mut self) { for (_, u) in &mut self.users_by_id { u.dirty = false; diff --git a/db/db.rs b/db/db.rs index 69e58c5..7bb03dc 100644 --- a/db/db.rs +++ b/db/db.rs @@ -967,6 +967,7 @@ impl LockedDatabase { } } self.auth.flush(&tx)?; + self.signal.flush(&tx)?; tx.commit()?; // Process delete_garbage. @@ -1010,6 +1011,7 @@ impl LockedDatabase { s.range = new_range; } self.auth.post_flush(); + self.signal.post_flush(); self.flush_count += 1; info!("Flush {} (why: {}): added {} recordings ({}), deleted {} ({}), marked {} ({}) GCed.", self.flush_count, reason, added.len(), added.iter().join(", "), deleted.len(), @@ -1794,10 +1796,14 @@ impl LockedDatabase { self.signal.types_by_uuid() } pub fn list_changes_by_time( - &self, desired_time: Range, f: &mut FnMut(&signal::ListStateChangesRow)) - -> Result<(), Error> { + &self, desired_time: Range, f: &mut FnMut(&signal::ListStateChangesRow)) { self.signal.list_changes_by_time(desired_time, f) } + pub fn update_signals( + &mut self, when: Range, signals: &[u32], states: &[u16]) + -> Result<(), Error> { + self.signal.update_signals(when, signals, states) + } } /// Initializes a database. diff --git a/db/recording.rs b/db/recording.rs index dd28ef8..07c12ac 100644 --- a/db/recording.rs +++ b/db/recording.rs @@ -45,7 +45,7 @@ pub const DESIRED_RECORDING_DURATION: i64 = 60 * TIME_UNITS_PER_SEC; pub const MAX_RECORDING_DURATION: i64 = 5 * 60 * TIME_UNITS_PER_SEC; /// A time specified as 90,000ths of a second since 1970-01-01 00:00:00 UTC. -#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Copy, Default, Eq, Ord, PartialEq, PartialOrd)] pub struct Time(pub i64); impl Time { @@ -53,6 +53,9 @@ impl Time { Time(tm.sec * TIME_UNITS_PER_SEC + tm.nsec as i64 * TIME_UNITS_PER_SEC / 1_000_000_000) } + pub const fn min_value() -> Self { Time(i64::min_value()) } + pub const fn max_value() -> Self { Time(i64::max_value()) } + /// Parses a time as either 90,000ths of a second since epoch or a RFC 3339-like string. /// /// The former is 90,000ths of a second since 1970-01-01T00:00:00 UTC, excluding leap seconds. @@ -121,6 +124,7 @@ impl Time { Ok(Time(sec * TIME_UNITS_PER_SEC + fraction)) } + /// Convert to unix seconds by floor method (rounding down). pub fn unix_seconds(&self) -> i64 { self.0 / TIME_UNITS_PER_SEC } } @@ -143,6 +147,13 @@ impl ops::Sub for Time { fn sub(self, rhs: Duration) -> Time { Time(self.0 - rhs.0) } } +impl fmt::Debug for Time { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + // Write both the raw and display forms. + write!(f, "{} /* {} */", self.0, self) + } +} + impl fmt::Display for Time { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let tm = time::at(time::Timespec{sec: self.0 / TIME_UNITS_PER_SEC, nsec: 0}); diff --git a/db/schema.sql b/db/schema.sql index a672d33..b047a3e 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -460,22 +460,20 @@ create table signal_camera ( primary key (signal_id, camera_id) ) without rowid; --- State of signals as of a given timestamp. -create table signal_state ( - -- seconds since 1970-01-01 00:00:00 UTC. - time_sec integer primary key, +-- Changes to signals as of a given timestamp. +create table signal_change ( + -- Event time, in 90 kHz units since 1970-01-01 00:00:00Z excluding leap seconds. + time_90k integer primary key, -- Changes at this timestamp. -- - -- It's possible for a single signal to have multiple states; this means - -- that the signal held a state only momentarily. - -- - -- A blob of varints representing a list of (signal number delta, state) - -- pairs. For example, - -- input signals: 1 3 3 200 (must be sorted) - -- deltas: 1 2 0 197 (must be non-negative) - -- states: 1 1 0 2 - -- varint: \x01 \x01 \x02 \x01 \x00 \x00 \xc5 \x01 \x02 + -- A blob of varints representing a list of + -- (signal number - next allowed, state) pairs, where signal number is + -- non-decreasing. For example, + -- input signals: 1 3 200 (must be sorted) + -- delta: 1 1 196 (must be non-negative) + -- states: 1 1 2 + -- varint: \x01 \x01 \x01 \x01 \xc4 \x01 \x02 changes blob ); diff --git a/db/signal.rs b/db/signal.rs index 5451582..1b9edd9 100644 --- a/db/signal.rs +++ b/db/signal.rs @@ -28,60 +28,126 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use base::bail_t; use crate::coding; use crate::db::FromSqlUuid; use crate::recording; use failure::{Error, bail, format_err}; use fnv::FnvHashMap; -use rusqlite::{Connection, types::ToSql}; -use std::collections::BTreeMap; +use rusqlite::{Connection, Transaction, params}; +use std::collections::{BTreeMap, BTreeSet}; +use std::collections::btree_map::Entry; use std::ops::Range; use uuid::Uuid; /// All state associated with signals. This is the entry point to this module. pub(crate) struct State { signals_by_id: BTreeMap, + + /// All types with known states. Note that currently there's no requirement an entry here + /// exists for every `type_` specified in a `Signal`, and there's an implied `0` (unknown) + /// state for every `Type`. types_by_uuid: FnvHashMap, - points_by_time: BTreeMap, + + points_by_time: BTreeMap, + + /// `points_by_time` entries which need to be flushed to the database. + dirty_by_time: BTreeSet, } +/// Representation of all signals at a point in time. +/// Each point matches a `signal_change` table row (when flushed). However, the in-memory +/// representation keeps not only the changes as of that time but also the complete prior state. +#[derive(Default)] struct Point { - data: Vec, + /// All data associated with the point. + /// + /// `data[0..changes_off]` represents previous state (immediately prior to this point). + /// `data[changes_off..]` represents the changes at this point. + /// + /// This representation could be 8 bytes shorter on 64-bit platforms by using a u32 for the + /// lengths, but this would require some unsafe code. + /// + /// The serialized form stored here must always be valid. + data: Box<[u8]>, changes_off: usize, } impl Point { - fn new(cur: &BTreeMap, changes: &[u8]) -> Self { - let mut data = Vec::with_capacity(changes.len()); - let mut last_signal = 0; - for (&signal, &state) in cur { - let delta = (signal - last_signal) as u32; - coding::append_varint32(delta, &mut data); - coding::append_varint32(state as u32, &mut data); - last_signal = signal; - } + /// Creates a new point from `prev` and `changes`. + /// + /// The caller is responsible for validation. In particular, `changes` must be a valid + /// serialized form. + fn new(prev: &BTreeMap, changes: &[u8]) -> Self { + let mut data = Vec::with_capacity(3 * prev.len() + changes.len()); + append_serialized(prev, &mut data); let changes_off = data.len(); data.extend(changes); Point { - data, + data: data.into_boxed_slice(), changes_off, } } - fn cur(&self) -> PointDataIterator { + fn swap(&mut self, other: &mut Point) { + std::mem::swap(&mut self.data, &mut other.data); + std::mem::swap(&mut self.changes_off, &mut other.changes_off); + } + + /// Returns an iterator over state as of immediately before this point. + fn prev(&self) -> PointDataIterator { PointDataIterator::new(&self.data[0..self.changes_off]) } + /// Returns an iterator over changes in this point. fn changes(&self) -> PointDataIterator { PointDataIterator::new(&self.data[self.changes_off..]) } + + /// Returns a mapping of signals to states immediately after this point. + fn after(&self) -> BTreeMap { + let mut after = BTreeMap::new(); + let mut it = self.prev(); + while let Some((signal, state)) = it.next().expect("in-mem prev is valid") { + after.insert(signal, state); + } + let mut it = self.changes(); + while let Some((signal, state)) = it.next().expect("in-mem changes is valid") { + if state == 0 { + after.remove(&signal); + } else { + after.insert(signal, state); + } + } + after + } +} + +/// Appends a serialized form of `from` into `to`. +/// +/// `from` must be an iterator of `(signal, state)` with signal numbers in monotonically increasing +/// order. +fn append_serialized<'a, I>(from: I, to: &mut Vec) +where I: IntoIterator { + let mut next_allowed = 0; + for (&signal, &state) in from.into_iter() { + assert!(signal >= next_allowed); + coding::append_varint32(signal - next_allowed, to); + coding::append_varint32(state as u32, to); + next_allowed = signal + 1; + } +} + +fn serialize(from: &BTreeMap) -> Vec { + let mut to = Vec::with_capacity(3 * from.len()); + append_serialized(from, &mut to); + to } struct PointDataIterator<'a> { data: &'a [u8], cur_pos: usize, cur_signal: u32, - cur_state: u16, } impl<'a> PointDataIterator<'a> { @@ -90,10 +156,12 @@ impl<'a> PointDataIterator<'a> { data, cur_pos: 0, cur_signal: 0, - cur_state: 0, } } + /// Returns an error, `None`, or `Some((signal, state))`. + /// Note that errors should be impossible on in-memory data; this returns `Result` for + /// validating blobs as they're read from the database. fn next(&mut self) -> Result, Error> { if self.cur_pos == self.data.len() { return Ok(None); @@ -111,9 +179,16 @@ impl<'a> PointDataIterator<'a> { bail!("state overflow: {}", state); } self.cur_pos = p; - self.cur_signal = signal; - self.cur_state = state as u16; - Ok(Some((signal, self.cur_state))) + self.cur_signal = signal + 1; + Ok(Some((signal, state as u16))) + } + + fn to_map(mut self) -> Result, Error> { + let mut out = BTreeMap::new(); + while let Some((signal, state)) = self.next()? { + out.insert(signal, state); + } + Ok(out) } } @@ -132,7 +207,7 @@ pub enum SignalCameraType { Indirect = 1, } -#[derive(Debug)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] pub struct ListStateChangesRow { pub when: recording::Time, pub signal: u32, @@ -147,37 +222,18 @@ impl State { signals_by_id, types_by_uuid: State::init_types(conn)?, points_by_time: State::init_points(conn)?, + dirty_by_time: BTreeSet::new(), }) } pub fn list_changes_by_time( - &self, desired_time: Range, f: &mut FnMut(&ListStateChangesRow)) - -> Result<(), Error> { - - // Convert the desired range to seconds. Reducing precision of the end carefully. - let start = desired_time.start.unix_seconds() as u32; - let mut end = desired_time.end.unix_seconds(); - end += ((end * recording::TIME_UNITS_PER_SEC) < desired_time.end.0) as i64; - let end = end as u32; + &self, desired_time: Range, f: &mut FnMut(&ListStateChangesRow)) { // First find the state immediately before. If it exists, include it. - if let Some((&t, p)) = self.points_by_time.range(..start).next_back() { - let mut cur = BTreeMap::new(); - let mut it = p.cur(); - while let Some((signal, state)) = it.next()? { - cur.insert(signal, state); - } - let mut it = p.changes(); - while let Some((signal, state)) = it.next()? { - if state == 0 { - cur.remove(&signal); - } else { - cur.insert(signal, state); - } - } - for (&signal, &state) in &cur { + if let Some((&when, p)) = self.points_by_time.range(..desired_time.start).next_back() { + for (&signal, &state) in &p.after() { f(&ListStateChangesRow { - when: recording::Time(t as i64 * recording::TIME_UNITS_PER_SEC), + when, signal, state, }); @@ -185,20 +241,283 @@ impl State { } // Then include changes up to (but not including) the end time. - for (&t, p) in self.points_by_time.range(start..end) { + for (&when, p) in self.points_by_time.range(desired_time.clone()) { let mut it = p.changes(); - while let Some((signal, state)) = it.next()? { + while let Some((signal, state)) = it.next().expect("in-mem changes is valid") { f(&ListStateChangesRow { - when: recording::Time(t as i64 * recording::TIME_UNITS_PER_SEC), + when, signal, state, }); } } + } + pub fn update_signals( + &mut self, when: Range, signals: &[u32], states: &[u16]) + -> Result<(), Error> { + // Do input validation before any mutation. + self.update_signals_validate(signals, states)?; + + // Follow the std::ops::Range convention of considering a range empty if its start >= end. + // Bailing early in the empty case isn't just an optimization; apply_observation_end would + // be incorrect otherwise. + if when.end <= when.start { + return Ok(()); + } + + // Apply the end before the start so that the `prev` state can be examined. + self.update_signals_end(when.end, signals, states); + self.update_signals_start(when.start, signals, states); + self.update_signals_middle(when, signals, states); Ok(()) } + /// Helper for `update_signals` to do validation. + fn update_signals_validate(&self, signals: &[u32], states: &[u16]) -> Result<(), Error> { + if signals.len() != states.len() { + bail_t!(InvalidArgument, "signals and states must have same length"); + } + let mut next_allowed = 0u32; + for (&signal, &state) in signals.iter().zip(states) { + if signal < next_allowed { + bail_t!(InvalidArgument, "signals must be monotonically increasing"); + } + match self.signals_by_id.get(&signal) { + None => bail_t!(InvalidArgument, "unknown signal {}", signal), + Some(ref s) => { + let empty = Vec::new(); + let states = self.types_by_uuid.get(&s.type_) + .map(|t| &t.states) + .unwrap_or(&empty); + if signal != 0 && states.binary_search_by_key(&state, |s| s.value).is_err() { + bail_t!(FailedPrecondition, "signal {} specifies unknown state {}", + signal, state); + } + }, + } + next_allowed = signal + 1; + } + Ok(()) + } + + /// Helper for `update_signals` to apply the end point. + fn update_signals_end(&mut self, end: recording::Time, signals: &[u32], states: &[u16]) { + let mut prev; + let mut changes = BTreeMap::::new(); + if let Some((&t, ref mut p)) = self.points_by_time.range_mut(..=end).next_back() { + if t == end { + // Already have a point at end. Adjust it. prev starts unchanged... + prev = p.prev().to_map().expect("in-mem prev is valid"); + + // ...and then prev and changes are altered to reflect the desired update. + State::update_signals_end_maps(signals, states, &mut prev, &mut changes); + + // If this doesn't alter the new state, don't dirty the database. + if changes.is_empty() { + return; + } + + // Any existing changes should still be applied. They win over reverting to prev. + let mut it = p.changes(); + while let Some((signal, state)) = it.next().expect("in-mem changes is valid") { + changes.entry(signal).and_modify(|e| *e = state).or_insert(state); + } + self.dirty_by_time.insert(t); + p.swap(&mut Point::new(&prev, &serialize(&changes))); + return; + } + + // Don't have a point at end, but do have previous state. + prev = p.after(); + } else { + // No point at or before end. Start from scratch (all signals unknown). + prev = BTreeMap::new(); + } + + // Create a new end point if necessary. + State::update_signals_end_maps(signals, states, &mut prev, &mut changes); + if changes.is_empty() { + return; + } + self.dirty_by_time.insert(end); + self.points_by_time.insert(end, Point::new(&prev, &serialize(&changes))); + } + + /// Helper for `update_signals_end`. Adjusts `prev` (the state prior to the end point) to + /// reflect the desired update (in `signals` and `states`). Adjusts `changes` (changes to + /// execute at the end point) to undo the change. + fn update_signals_end_maps(signals: &[u32], states: &[u16], prev: &mut BTreeMap, + changes: &mut BTreeMap) { + for (&signal, &state) in signals.iter().zip(states) { + match prev.entry(signal) { + Entry::Vacant(e) => { + changes.insert(signal, 0); + e.insert(state); + }, + Entry::Occupied(mut e) => { + if state == 0 { + changes.insert(signal, *e.get()); + e.remove(); + } else if *e.get() != state { + changes.insert(signal, *e.get()); + *e.get_mut() = state; + } + }, + } + } + } + + /// Helper for `update_signals` to apply the start point. + fn update_signals_start(&mut self, start: recording::Time, signals: &[u32], states: &[u16]) { + let prev; + if let Some((&t, ref mut p)) = self.points_by_time.range_mut(..=start).next_back() { + if t == start { + // Reuse existing point at start. + prev = p.prev().to_map().expect("in-mem prev is valid"); + let mut changes = p.changes().to_map().expect("in-mem changes is valid"); + let mut dirty = false; + for (&signal, &state) in signals.iter().zip(states) { + match changes.entry(signal) { + Entry::Occupied(mut e) => { + if *e.get() != state { + dirty = true; + if state == *prev.get(&signal).unwrap_or(&0) { + e.remove(); + } else { + *e.get_mut() = state; + } + } + }, + Entry::Vacant(e) => { + if signal != 0 { + dirty = true; + e.insert(state); + } + }, + } + } + if dirty { + p.swap(&mut Point::new(&prev, &serialize(&changes))); + self.dirty_by_time.insert(start); + } + return; + } + + // Create new point at start, using state from previous point. + prev = p.after(); + } else { + // Create new point at start, from scratch. + prev = BTreeMap::new(); + } + + let mut changes = BTreeMap::new(); + for (&signal, &state) in signals.iter().zip(states) { + if state != *prev.get(&signal).unwrap_or(&0) { + changes.insert(signal, state); + } + } + + if changes.is_empty() { + return; + } + + self.dirty_by_time.insert(start); + self.points_by_time.insert(start, Point::new(&prev, &serialize(&changes))); + } + + /// Helper for `update_signals` to apply all points in `(when.start, when.end)`. + fn update_signals_middle(&mut self, when: Range, signals: &[u32], + states: &[u16]) { + let mut to_delete = Vec::new(); + let after_start = recording::Time(when.start.0+1); + for (&t, ref mut p) in self.points_by_time.range_mut(after_start..when.end) { + let mut prev = p.prev().to_map().expect("in-mem prev is valid"); + + // Update prev to reflect desired update. + for (&signal, &state) in signals.iter().zip(states) { + match prev.entry(signal) { + Entry::Occupied(mut e) => { + if state == 0 { + e.remove_entry(); + } else if *e.get() != state { + *e.get_mut() = state; + } + }, + Entry::Vacant(e) => { + if state != 0 { + e.insert(state); + } + } + } + } + + // Trim changes to omit any change to signals. + let mut changes = Vec::with_capacity(3*signals.len()); + let mut it = p.changes(); + let mut next_allowed = 0; + let mut dirty = false; + while let Some((signal, state)) = it.next().expect("in-memory changes is valid") { + if signals.binary_search(&signal).is_ok() { // discard. + dirty = true; + } else { // keep. + assert!(signal >= next_allowed); + coding::append_varint32(signal - next_allowed, &mut changes); + coding::append_varint32(state as u32, &mut changes); + next_allowed = signal + 1; + } + } + if changes.is_empty() { + to_delete.push(t); + } else { + p.swap(&mut Point::new(&prev, &changes)); + } + if dirty { + self.dirty_by_time.insert(t); + } + } + + // Delete any points with no more changes. + for &t in &to_delete { + self.points_by_time.remove(&t).expect("point exists"); + } + } + + /// Flushes all pending database changes to the given transaction. + /// + /// The caller is expected to call `post_flush` afterward if the transaction is + /// successfully committed. No mutations should happen between these calls. + pub fn flush(&mut self, tx: &Transaction) -> Result<(), Error> { + let mut i_stmt = tx.prepare(r#" + insert or replace into signal_change (time_90k, changes) values (?, ?) + "#)?; + let mut d_stmt = tx.prepare(r#" + delete from signal_change where time_90k = ? + "#)?; + for &t in &self.dirty_by_time { + match self.points_by_time.entry(t) { + Entry::Occupied(ref e) => { + let p = e.get(); + i_stmt.execute(params![ + t.0, + &p.data[p.changes_off..], + ])?; + }, + Entry::Vacant(_) => { + d_stmt.execute(&[t.0])?; + }, + } + } + Ok(()) + } + + /// Marks that the previous `flush` was completed successfully. + /// + /// See notes there. + pub fn post_flush(&mut self) { + self.dirty_by_time.clear(); + } + fn init_signals(conn: &Connection) -> Result, Error> { let mut signals = BTreeMap::new(); let mut stmt = conn.prepare(r#" @@ -210,7 +529,7 @@ impl State { from signal "#)?; - let mut rows = stmt.query(&[] as &[&ToSql])?; + let mut rows = stmt.query(params![])?; while let Some(row) = rows.next()? { let id = row.get(0)?; let source: FromSqlUuid = row.get(1)?; @@ -226,20 +545,20 @@ impl State { Ok(signals) } - fn init_points(conn: &Connection) -> Result, Error> { + fn init_points(conn: &Connection) -> Result, Error> { let mut stmt = conn.prepare(r#" select - time_sec, + time_90k, changes from - signal_state - order by time_sec + signal_change + order by time_90k "#)?; - let mut rows = stmt.query(&[] as &[&ToSql])?; + let mut rows = stmt.query(params![])?; let mut points = BTreeMap::new(); let mut cur = BTreeMap::new(); // latest signal -> state, where state != 0 while let Some(row) = rows.next()? { - let time_sec = row.get(0)?; + let time_90k = recording::Time(row.get(0)?); let changes = row.get_raw_checked(1)?.as_blob()?; let mut it = PointDataIterator::new(changes); while let Some((signal, state)) = it.next()? { @@ -249,7 +568,7 @@ impl State { cur.insert(signal, state); } } - points.insert(time_sec, Point::new(&cur, changes)); + points.insert(time_90k, Point::new(&cur, changes)); } Ok(points) } @@ -266,7 +585,7 @@ impl State { signal_camera order by signal_id, camera_id "#)?; - let mut rows = stmt.query(&[] as &[&ToSql])?; + let mut rows = stmt.query(params![])?; while let Some(row) = rows.next()? { let signal_id = row.get(0)?; let s = signals.get_mut(&signal_id) @@ -298,7 +617,7 @@ impl State { signal_type_enum order by type_uuid, value "#)?; - let mut rows = stmt.query(&[] as &[&ToSql])?; + let mut rows = stmt.query(params![])?; while let Some(row) = rows.next()? { let type_: FromSqlUuid = row.get(0)?; types.entry(type_.0).or_insert_with(Type::default).states.push(TypeState { @@ -346,15 +665,93 @@ pub struct Type { #[cfg(test)] mod tests { + use crate::{db, testutil}; + use rusqlite::Connection; + use super::*; + #[test] fn test_point_data_it() { // Example taken from the .sql file. - let data = b"\x01\x01\x02\x01\x00\x00\xc5\x01\x02"; + let data = b"\x01\x01\x01\x01\xc4\x01\x02"; let mut it = super::PointDataIterator::new(data); assert_eq!(it.next().unwrap(), Some((1, 1))); assert_eq!(it.next().unwrap(), Some((3, 1))); - assert_eq!(it.next().unwrap(), Some((3, 0))); assert_eq!(it.next().unwrap(), Some((200, 2))); assert_eq!(it.next().unwrap(), None); } + + #[test] + fn test_empty_db() { + testutil::init(); + let mut conn = Connection::open_in_memory().unwrap(); + db::init(&mut conn).unwrap(); + let s = State::init(&conn).unwrap(); + s.list_changes_by_time(recording::Time::min_value() .. recording::Time::max_value(), + &mut |_r| panic!("no changes expected")); + } + + #[test] + fn round_trip() { + testutil::init(); + let mut conn = Connection::open_in_memory().unwrap(); + db::init(&mut conn).unwrap(); + conn.execute_batch(r#" + insert into signal (id, source_uuid, type_uuid, short_name) + values (1, x'1B3889C0A59F400DA24C94EBEB19CC3A', + x'EE66270FD9C648198B339720D4CBCA6B', 'a'), + (2, x'A4A73D9A53424EBCB9F6366F1E5617FA', + x'EE66270FD9C648198B339720D4CBCA6B', 'b'); + + insert into signal_type_enum (type_uuid, value, name, motion, color) + values (x'EE66270FD9C648198B339720D4CBCA6B', 1, 'still', 0, 'black'), + (x'EE66270FD9C648198B339720D4CBCA6B', 2, 'moving', 1, 'red'); + "#).unwrap(); + let mut s = State::init(&conn).unwrap(); + s.list_changes_by_time(recording::Time::min_value() .. recording::Time::max_value(), + &mut |_r| panic!("no changes expected")); + const START: recording::Time = recording::Time(140067462600000); // 2019-04-26T11:59:00 + const NOW: recording::Time = recording::Time(140067468000000); // 2019-04-26T12:00:00 + s.update_signals(START..NOW, &[1, 2], &[2, 1]).unwrap(); + let mut rows = Vec::new(); + + const EXPECTED: &[ListStateChangesRow] = &[ + ListStateChangesRow { + when: START, + signal: 1, + state: 2, + }, + ListStateChangesRow { + when: START, + signal: 2, + state: 1, + }, + ListStateChangesRow { + when: NOW, + signal: 1, + state: 0, + }, + ListStateChangesRow { + when: NOW, + signal: 2, + state: 0, + }, + ]; + + s.list_changes_by_time(recording::Time::min_value() .. recording::Time::max_value(), + &mut |r| rows.push(*r)); + assert_eq!(&rows[..], EXPECTED); + + { + let tx = conn.transaction().unwrap(); + s.flush(&tx).unwrap(); + tx.commit().unwrap(); + } + + drop(s); + let s = State::init(&conn).unwrap(); + rows.clear(); + s.list_changes_by_time(recording::Time::min_value() .. recording::Time::max_value(), + &mut |r| rows.push(*r)); + assert_eq!(&rows[..], EXPECTED); + } } diff --git a/db/upgrade/v3_to_v4.rs b/db/upgrade/v3_to_v4.rs index 8fe1b8c..2cae498 100644 --- a/db/upgrade/v3_to_v4.rs +++ b/db/upgrade/v3_to_v4.rs @@ -59,7 +59,7 @@ pub fn run(_args: &super::Args, tx: &rusqlite::Transaction) -> Result<(), Error> ) without rowid; create table signal_state ( - time_sec integer primary key, + time_90k integer primary key, changes blob ); "#)?; diff --git a/design/api.md b/design/api.md index 8c5cf9e..bd47877 100644 --- a/design/api.md +++ b/design/api.md @@ -525,52 +525,38 @@ This represents the following observations: ### `POST /api/signals` -**status: unimplemented** - Alters the state of a signal. -Signal state can be broken into two parts: observed history and predicted -future. Observed history is written to the database on next flush. -Predicted future is only retained in RAM and returned on `GET` requests on the -near future. This avoids having to display the "unknown" state when it's very -likely that the state has not changed since the most recent update. - -A typical request will add observed history from the time of a recent previous -prediction until now, and add another prediction. Following `GET /api/signals` -requests will return the new prediction until a pre-determined expiration -time. The client typically will send a following request before this -expiration time so that history is recorded and the UI never displays -`unknown` when the signal is being actively managed. +A typical client might be a subscriber of a camera's built-in motion +detection event stream or of a security system's zone status event stream. +It makes a request on every event or on every 30 second timeout, predicting +that the state will last for a minute. This prediction may be changed later. +Writing to the near future in this way ensures that the UI never displays +`unknown` when the client is actively managing the signal. Some requests may instead backfill earlier history, such as when a video analytics client starts up and analyzes all video segments recorded since it -last ran. These will specify beginning and end times for the observed history -and not make a prediction. +last ran. These will specify beginning and end times. The request should have an `application/json` body describing the change to make. It should be a dict with these attributes: -* `signalIds`: a list of signal ids to change. -* `observedStates`: (optional) a list (one entry per `signalIds` entry) of - observed states to set. -* `predictedStates`: (optional) a list (one entry per `signalIds` entry) - of predictions to make. If absent, assumed to match `observedStates`. - Only used if `predictedDur90k` is non-zero. -* `observedStartTime90k` (optional): if absent, assumed to be now. Otherwise - is typically a time from an earlier response. -* `observedEndTime90k` (optional): if absent, assumed to be now. -* `predictionDur90k` (optional): (only allowed when `endTime90k` is absent) - additional time in which the current state should seem to "linger" if no - further updates are received. a `GET /api/signals` request until this time - will reflect the curent - between the time this request +* `signalIds`: a list of signal ids to change. Must be sorted. +* `states`: a list (one per `signalIds` entry) of states to set. +* `startTime90k`: (optional) The start of the observation in 90 kHz units + since 1970-01-01 00:00:00 UTC; commonly taken from an earlier response. If + absent, assumed to be now. +* `endBase`: if `epoch`, `relEndTime90k` is relative to 1970-01-01 00:00:00 + UTC. If `now`, epoch is relative to the current time. +* `relEndTime90k` (optional): The end of the observation, relative to the + specified base. Note this time is allowed to be in the future. The response will be an `application/json` body dict with the following attributes: -* `time90k`: the current time. When the request's `observedStartTime90k` - and/or `observedEndTime90k` were absent, this is needed to later upgrade - predictions to history seamlessly. +* `time90k`: the current time. When the request's `startTime90k` is absent + and/or its `endBase` is `now`, this is needed to know the effect of the + earlier request. Example request sequence: @@ -584,9 +570,10 @@ Request: ```json { - 'signalIds': [1], - 'predictedStates': [2], - 'predictionDur90k': 5400000 + "signalIds": [1], + "states": [2], + "endBase": "now", + "relEndTime90k": 5400000 } ``` @@ -594,23 +581,23 @@ Response: ```json { - 'time90k': 140067468000000 + "time90k": 140067468000000 } ``` #### Request 2 30 seconds later (half the prediction interval), the client still observes -motion. It records the previous prediction and predicts the motion will continue. +motion. It leaves the prior data alone and predicts the motion will continue. Request: ```json { - 'signalIds': [1], - 'observedStates': [2], - 'observedStartTime90k': 140067468000000, - 'predictionDur90k': 5400000 + "signalIds": [1], + "states": [2], + "endBase": "now", + "relEndTime90k": 5400000 } ``` @@ -618,24 +605,24 @@ Response: ```json { - 'time90k': 140067470700000 + "time90k": 140067470700000 } ``` ### Request 3 -5 seconds later, the client observes motion has ended. It records the history -and predicts no more motion. +5 seconds later, the client observes motion has ended. It leaves the prior +data alone and predicts no more motion. Request: ```json { - 'signalIds': [1], - 'observedStates': [2], - 'predictedStates': [1], - 'observedStartTime90k': 140067470700000, - 'predictionDur90k': 5400000 + "signalIds": [1], + "states": [2], + "endBase": "now", + "relEndTime90k": 5400000 + } } ``` @@ -643,7 +630,7 @@ Response: ```json { - 'time90k': 140067471150000 + "time90k": 140067471150000 } ``` diff --git a/src/web.rs b/src/web.rs index a606bac..a4bead4 100644 --- a/src/web.rs +++ b/src/web.rs @@ -297,7 +297,7 @@ impl ServiceInner { fn stream_recordings(&self, req: &Request<::hyper::Body>, uuid: Uuid, type_: db::StreamType) -> ResponseResult { let (r, split) = { - let mut time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); + let mut time = recording::Time::min_value() .. recording::Time::max_value(); let mut split = recording::Duration(i64::max_value()); if let Some(q) = req.uri().query() { for (key, value) in form_urlencoded::parse(q.as_bytes()) { @@ -630,7 +630,7 @@ impl ServiceInner { } fn signals(&self, req: &Request) -> ResponseResult { - let mut time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); + let mut time = recording::Time::min_value() .. recording::Time::max_value(); if let Some(q) = req.uri().query() { for (key, value) in form_urlencoded::parse(q.as_bytes()) { let (key, value) = (key.borrow(), value.borrow()); @@ -653,7 +653,7 @@ impl ServiceInner { signals.times_90k.push(c.when.0); signals.signal_ids.push(c.signal); signals.states.push(c.state); - }).map_err(internal_server_err)?; + }); serve_json(req, &signals) }