present signal days in API requests

I also enforced some invariants in the signals code, fixing a couple
bugs. The signals code is more complex than I'd like, but hopefully
is working now.
This commit is contained in:
Scott Lamb
2021-03-23 20:22:29 -07:00
parent caf65a045b
commit abcd650304
6 changed files with 257 additions and 47 deletions

View File

@@ -281,6 +281,13 @@ impl fmt::Display for Duration {
}
}
impl ops::Mul<i64> for Duration {
type Output = Self;
fn mul(self, rhs: i64) -> Self::Output {
Duration(self.0 * rhs)
}
}
impl ops::Add for Duration {
type Output = Duration;
fn add(self, rhs: Duration) -> Duration {

View File

@@ -16,8 +16,8 @@ use std::ops::Range;
use std::str;
/// A calendar day in `YYYY-mm-dd` format.
#[derive(Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
pub struct Key([u8; 10]);
#[derive(Copy, Clone, Eq, Ord, PartialEq, PartialOrd)]
pub struct Key(pub(crate) [u8; 10]);
impl Key {
fn new(tm: time::Tm) -> Result<Self, Error> {
@@ -46,6 +46,12 @@ impl AsRef<str> for Key {
}
}
impl std::fmt::Debug for Key {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self.as_ref())
}
}
pub trait Value: std::fmt::Debug + Default {
type Change: std::fmt::Debug;
@@ -140,14 +146,14 @@ pub struct SignalChange {
duration: Duration,
/// The state of the given range before this change.
old_state: i16,
old_state: u16,
/// The state of the given range after this change.
new_state: i16,
new_state: u16,
}
#[derive(Clone, Debug)]
pub struct Map<V: Value>(BTreeMap<Key, V>);
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Map<V: Value>(pub(crate) BTreeMap<Key, V>);
impl<V: Value> Map<V> {
pub fn new() -> Self {
@@ -253,14 +259,13 @@ impl Map<StreamValue> {
}
}
#[cfg(test)]
impl Map<SignalValue> {
/// Adjusts `self` to reflect the range of the given recording.
/// Note that the specified range may span several days (unlike StreamValue).
///
/// This function swallows/logs date formatting errors because they shouldn't happen and there's
/// not much that can be done about them. (The database operation has already gone through.)
pub(crate) fn adjust(&mut self, mut r: Range<Time>, old_state: i16, new_state: i16) {
pub(crate) fn adjust(&mut self, mut r: Range<Time>, old_state: u16, new_state: u16) {
// Find first day key.
let sec = r.start.unix_seconds();
let mut my_tm = time::at(time::Timespec { sec, nsec: 0 });

View File

@@ -412,6 +412,7 @@ create table user_session (
create index user_session_uid on user_session (user_id);
-- Timeseries with an enum value.
create table signal (
id integer primary key,
@@ -427,8 +428,8 @@ create table signal (
-- uuids, such as "Elk security system watcher".
type_uuid blob not null check (length(type_uuid) = 16),
-- a short human-readable description of the event to use in mouseovers or event
-- lists, such as "driveway motion" or "front door open".
-- a short human-readable description to use in mouseovers or event lists,
-- such as "driveway motion" or "front door open".
short_name not null,
unique (source_uuid, type_uuid)

View File

@@ -2,9 +2,9 @@
// Copyright (C) 2019 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt.
// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception.
use crate::coding;
use crate::db::FromSqlUuid;
use crate::recording;
use crate::{coding, days};
use base::bail_t;
use failure::{bail, format_err, Error};
use fnv::FnvHashMap;
@@ -24,6 +24,11 @@ pub(crate) struct State {
/// state for every `Type`.
types_by_uuid: FnvHashMap<Uuid, Type>,
/// All points in time.
/// Invariants, checked by `State::debug_assert_point_invariants`:
/// * the first point must have an empty previous state (all signals at state 0).
/// * each point's prev state matches the previous point's after state.
/// * the last point must have an empty final state (all signals changed to state 0).
points_by_time: BTreeMap<recording::Time, Point>,
/// Times which need to be flushed to the database.
@@ -89,14 +94,7 @@ impl Point {
while let Some((signal, state)) = it.next().expect("in-mem prev is valid") {
after.insert(signal, state);
}
let mut it = self.changes();
while let Some((signal, state)) = it.next().expect("in-mem changes is valid") {
if state == 0 {
after.remove(&signal);
} else {
after.insert(signal, state);
}
}
self.changes().update_map(&mut after);
after
}
}
@@ -173,6 +171,16 @@ impl<'a> PointDataIterator<'a> {
}
Ok(out)
}
fn update_map(mut self, m: &mut BTreeMap<u32, u16>) {
while let Some((signal, state)) = self.next().expect("in-mem changes is valid") {
if state == 0 {
m.remove(&signal);
} else {
m.insert(signal, state);
}
}
}
}
/// Representation of a `signal_camera` row.
@@ -205,13 +213,17 @@ impl State {
})?;
let mut signals_by_id = State::init_signals(conn)?;
State::fill_signal_cameras(conn, &mut signals_by_id)?;
Ok(State {
let mut points_by_time = BTreeMap::new();
State::fill_points(conn, &mut points_by_time, &mut signals_by_id)?;
let s = State {
max_signal_changes,
signals_by_id,
types_by_uuid: State::init_types(conn)?,
points_by_time: State::init_points(conn)?,
points_by_time,
dirty_by_time: BTreeSet::new(),
})
};
s.debug_assert_point_invariants();
Ok(s)
}
pub fn list_changes_by_time(
@@ -260,9 +272,10 @@ impl State {
}
// Apply the end before the start so that the `prev` state can be examined.
self.update_signals_end(when.end, signals, states);
self.update_signals_end(when.clone(), signals, states);
self.update_signals_start(when.start, signals, states);
self.update_signals_middle(when, signals, states);
self.debug_assert_point_invariants();
self.gc();
Ok(())
@@ -287,16 +300,50 @@ impl State {
to_remove
);
self.gc_days(to_remove);
let remove: smallvec::SmallVec<[recording::Time; 4]> = self
.points_by_time
.keys()
.take(to_remove)
.map(|p| *p)
.map(|t| *t)
.collect();
for p in &remove {
self.points_by_time.remove(p);
self.dirty_by_time.insert(*p);
for t in &remove {
self.points_by_time.remove(t);
self.dirty_by_time.insert(*t);
}
// Update the first remaining point to keep state starting from it unchanged.
let (t, p) = match self.points_by_time.iter_mut().next() {
Some(e) => e,
None => return,
};
let combined = p.after();
p.changes_off = 0;
p.data = serialize(&combined).into_boxed_slice();
self.dirty_by_time.insert(*t);
self.debug_assert_point_invariants();
}
/// Adjusts each signal's days index to reflect garbage-collecting the first `to_remove` points.
fn gc_days(&mut self, to_remove: usize) {
let mut it = self.points_by_time.iter().take(to_remove + 1);
let (mut prev_time, mut prev_state) = match it.next() {
None => return, // nothing to do.
Some(p) => (*p.0, p.1.after()),
};
for (&new_time, point) in it {
let mut changes = point.changes();
while let Some((signal, state)) = changes.next().expect("in-mem points valid") {
let s = self
.signals_by_id
.get_mut(&signal)
.expect("in-mem point signals valid");
let prev_state = prev_state.entry(signal).or_default();
s.days.adjust(prev_time..new_time, *prev_state, state);
*prev_state = state;
}
prev_time = new_time;
}
}
@@ -335,16 +382,35 @@ impl State {
}
/// Helper for `update_signals` to apply the end point.
fn update_signals_end(&mut self, end: recording::Time, signals: &[u32], states: &[u16]) {
fn update_signals_end(
&mut self,
when: Range<recording::Time>,
signals: &[u32],
states: &[u16],
) {
let mut prev;
let mut changes = BTreeMap::<u32, u16>::new();
if let Some((&t, ref mut p)) = self.points_by_time.range_mut(..=end).next_back() {
if t == end {
let prev_t = self
.points_by_time
.range(when.clone())
.next_back()
.map(|e| *e.0)
.unwrap_or(when.start);
let days_range = prev_t..when.end;
if let Some((&t, ref mut p)) = self.points_by_time.range_mut(..=when.end).next_back() {
if t == when.end {
// Already have a point at end. Adjust it. prev starts unchanged...
prev = p.prev().to_map().expect("in-mem prev is valid");
// ...and then prev and changes are altered to reflect the desired update.
State::update_signals_end_maps(signals, states, &mut prev, &mut changes);
State::update_signals_end_maps(
signals,
states,
days_range,
&mut self.signals_by_id,
&mut prev,
&mut changes,
);
// If this doesn't alter the new state, don't dirty the database.
if changes.is_empty() {
@@ -372,31 +438,44 @@ impl State {
}
// Create a new end point if necessary.
State::update_signals_end_maps(signals, states, &mut prev, &mut changes);
State::update_signals_end_maps(
signals,
states,
days_range,
&mut self.signals_by_id,
&mut prev,
&mut changes,
);
if changes.is_empty() {
return;
}
self.dirty_by_time.insert(end);
self.dirty_by_time.insert(when.end);
self.points_by_time
.insert(end, Point::new(&prev, &serialize(&changes)));
.insert(when.end, Point::new(&prev, &serialize(&changes)));
}
/// Helper for `update_signals_end`. Adjusts `prev` (the state prior to the end point) to
/// reflect the desired update (in `signals` and `states`). Adjusts `changes` (changes to
/// execute at the end point) to undo the change.
/// execute at the end point) to undo the change. Adjust each signal's days index for
/// the range from the penultimate point of the range (or lacking that, its start) to the end.
fn update_signals_end_maps(
signals: &[u32],
states: &[u16],
days_range: Range<recording::Time>,
signals_by_id: &mut BTreeMap<u32, Signal>,
prev: &mut BTreeMap<u32, u16>,
changes: &mut BTreeMap<u32, u16>,
) {
for (&signal, &state) in signals.iter().zip(states) {
let old_state;
match prev.entry(signal) {
Entry::Vacant(e) => {
old_state = 0;
changes.insert(signal, 0);
e.insert(state);
}
Entry::Occupied(mut e) => {
old_state = *e.get();
if state == 0 {
changes.insert(signal, *e.get());
e.remove();
@@ -406,6 +485,11 @@ impl State {
}
}
}
signals_by_id
.get_mut(&signal)
.expect("signal valid")
.days
.adjust(days_range.clone(), old_state, state);
}
}
@@ -469,6 +553,7 @@ impl State {
}
/// Helper for `update_signals` to apply all points in `(when.start, when.end)`.
/// This also updates each signal's days index for the points it finds.
fn update_signals_middle(
&mut self,
when: Range<recording::Time>,
@@ -477,13 +562,17 @@ impl State {
) {
let mut to_delete = Vec::new();
let after_start = recording::Time(when.start.0 + 1);
let mut prev_t = when.start;
for (&t, ref mut p) in self.points_by_time.range_mut(after_start..when.end) {
let mut prev = p.prev().to_map().expect("in-mem prev is valid");
// Update prev to reflect desired update.
// Update prev to reflect desired update; likewise each signal's days index.
for (&signal, &state) in signals.iter().zip(states) {
let s = self.signals_by_id.get_mut(&signal).expect("valid signals");
let prev_state;
match prev.entry(signal) {
Entry::Occupied(mut e) => {
prev_state = *e.get();
if state == 0 {
e.remove_entry();
} else if *e.get() != state {
@@ -491,11 +580,14 @@ impl State {
}
}
Entry::Vacant(e) => {
prev_state = 0;
if state != 0 {
e.insert(state);
}
}
}
s.days.adjust(prev_t..t, prev_state, state);
prev_t = t;
}
// Trim changes to omit any change to signals.
@@ -593,13 +685,20 @@ impl State {
type_: type_.0,
short_name: row.get(3)?,
cameras: Vec::new(),
days: days::Map::new(),
},
);
}
Ok(signals)
}
fn init_points(conn: &Connection) -> Result<BTreeMap<recording::Time, Point>, Error> {
/// Fills `points_by_time` from the database, also filling the `days`
/// index of each signal.
fn fill_points(
conn: &Connection,
points_by_time: &mut BTreeMap<recording::Time, Point>,
signals_by_id: &mut BTreeMap<u32, Signal>,
) -> Result<(), Error> {
let mut stmt = conn.prepare(
r#"
select
@@ -611,22 +710,43 @@ impl State {
"#,
)?;
let mut rows = stmt.query(params![])?;
let mut points = BTreeMap::new();
let mut cur = BTreeMap::new(); // latest signal -> state, where state != 0
let mut sig_last_state = BTreeMap::new();
while let Some(row) = rows.next()? {
let time_90k = recording::Time(row.get(0)?);
let changes = row.get_raw_checked(1)?.as_blob()?;
let before = cur.clone();
let mut it = PointDataIterator::new(changes);
while let Some((signal, state)) = it.next()? {
let e = sig_last_state.entry(signal);
if let Entry::Occupied(ref e) = e {
let (prev_time, prev_state) = *e.get();
let s = signals_by_id.get_mut(&signal).ok_or_else(|| {
format_err!("time {} references invalid signal {}", time_90k, signal)
})?;
s.days.adjust(prev_time..time_90k, 0, prev_state);
}
if state == 0 {
cur.remove(&signal);
if let Entry::Occupied(e) = e {
e.remove_entry();
}
} else {
cur.insert(signal, state);
*e.or_default() = (time_90k, state);
}
}
points.insert(time_90k, Point::new(&cur, changes));
points_by_time.insert(time_90k, Point::new(&before, changes));
}
Ok(points)
if !cur.is_empty() {
bail!(
"far future state should be unknown for all signals; is: {:?}",
cur
);
}
Ok(())
}
/// Fills the `cameras` field of the `Signal` structs within the supplied `signals`.
@@ -702,6 +822,25 @@ impl State {
pub fn types_by_uuid(&self) -> &FnvHashMap<Uuid, Type> {
&self.types_by_uuid
}
#[cfg(not(debug_assertions))]
fn debug_assert_point_invariants(&self) {}
/// Checks invariants on `points_by_time` (expensive).
#[cfg(debug_assertions)]
fn debug_assert_point_invariants(&self) {
let mut expected_prev = BTreeMap::new();
for (t, p) in self.points_by_time.iter() {
let cur = p.prev().to_map().expect("in-mem prev is valid");
assert_eq!(&expected_prev, &cur, "time {} prev mismatch", t);
p.changes().update_map(&mut expected_prev);
}
assert_eq!(
expected_prev.len(),
0,
"last point final state should be empty"
);
}
}
/// Representation of a `signal` row.
@@ -714,6 +853,8 @@ pub struct Signal {
/// The cameras this signal is associated with. Sorted by camera id, which is unique.
pub cameras: Vec<SignalCamera>,
pub days: days::Map<days::SignalValue>,
}
/// Representation of a `signal_type_enum` row.
@@ -738,6 +879,7 @@ mod tests {
use super::*;
use crate::{db, testutil};
use rusqlite::Connection;
use smallvec::smallvec;
#[test]
fn test_point_data_it() {
@@ -821,6 +963,22 @@ mod tests {
&mut |r| rows.push(*r),
);
assert_eq!(&rows[..], EXPECTED);
let mut expected_days = days::Map::new();
expected_days.0.insert(
days::Key(*b"2019-04-26"),
days::SignalValue {
states: smallvec![0, (NOW - START).0 as u64],
},
);
assert_eq!(&s.signals_by_id.get(&1).unwrap().days, &expected_days);
expected_days.0.clear();
expected_days.0.insert(
days::Key(*b"2019-04-26"),
days::SignalValue {
states: smallvec![(NOW - START).0 as u64],
},
);
assert_eq!(&s.signals_by_id.get(&2).unwrap().days, &expected_days);
{
let tx = conn.transaction().unwrap();

View File

@@ -106,6 +106,10 @@ pub struct Signal<'a> {
pub source: Uuid,
pub type_: Uuid,
pub short_name: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(serialize_with = "Signal::serialize_days")]
pub days: Option<&'a db::days::Map<db::days::SignalValue>>,
}
#[derive(Deserialize)]
@@ -275,13 +279,14 @@ impl<'a> Stream<'a> {
}
impl<'a> Signal<'a> {
pub fn wrap(s: &'a db::Signal, db: &'a db::LockedDatabase, _include_days: bool) -> Self {
pub fn wrap(s: &'a db::Signal, db: &'a db::LockedDatabase, include_days: bool) -> Self {
Signal {
id: s.id,
cameras: (s, db),
source: s.source,
type_: s.type_,
short_name: &s.short_name,
days: if include_days { Some(&s.days) } else { None },
}
}
@@ -306,6 +311,30 @@ impl<'a> Signal<'a> {
}
map.end()
}
fn serialize_days<S>(
days: &Option<&db::days::Map<db::days::SignalValue>>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let days = match *days {
Some(d) => d,
None => return serializer.serialize_none(),
};
let mut map = serializer.serialize_map(Some(days.len()))?;
for (k, v) in days {
map.serialize_key(k.as_ref())?;
let bounds = k.bounds();
map.serialize_value(&SignalDayValue {
start_time_90k: bounds.start.0,
end_time_90k: bounds.end.0,
states: &v.states[..],
})?;
}
map.end()
}
}
impl<'a> SignalType<'a> {
@@ -347,6 +376,14 @@ struct StreamDayValue {
pub total_duration_90k: i64,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct SignalDayValue<'a> {
pub start_time_90k: i64,
pub end_time_90k: i64,
pub states: &'a [u64],
}
impl<'a> TopLevel<'a> {
/// Serializes cameras as a list (rather than a map), optionally including the `days` and
/// `cameras` fields.