Besides being more clear about what belongs to which, this helps with docker caching. The server and ui parts are only rebuilt when their respective subdirectories change. Extend this a bit further by making the webpack build not depend on the target architecture. And adding cache dirs so parts of the server and ui build process can be reused when layer-wide caching fails.
// This file is part of Moonfire NVR, a security camera network video recorder.
// Copyright (C) 2019 The Moonfire NVR Authors
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// In addition, as a special exception, the copyright holders give
// permission to link the code of portions of this program with the
// OpenSSL library under certain conditions as described in each
// individual source file, and distribute linked combinations including
// the two.
// You must obey the GNU General Public License in all respects for all
// of the code used other than OpenSSL. If you modify file(s) with this
// exception, you may extend this exception to your version of the
// file(s), but you are not obligated to do so. If you do not wish to do
// so, delete this exception statement from your version. If you delete
// this exception statement from all source files in the program, then
// also delete it here.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use base::bail_t;
use crate::coding;
use crate::db::FromSqlUuid;
use crate::recording;
use failure::{Error, bail, format_err};
use fnv::FnvHashMap;
use log::debug;
use rusqlite::{Connection, Transaction, params};
use std::collections::{BTreeMap, BTreeSet};
use std::collections::btree_map::Entry;
use std::ops::Range;
use uuid::Uuid;
/// All state associated with signals. This is the entry point to this module.
pub(crate) struct State {
signals_by_id: BTreeMap<u32, Signal>,
/// All types with known states. Note that currently there's no requirement an entry here
/// exists for every `type_` specified in a `Signal`, and there's an implied `0` (unknown)
/// state for every `Type`.
types_by_uuid: FnvHashMap<Uuid, Type>,
points_by_time: BTreeMap<recording::Time, Point>,
/// Times which need to be flushed to the database.
/// These either have a matching `points_by_time` entry or represent a removal.
dirty_by_time: BTreeSet<recording::Time>,
max_signal_changes: Option<i64>,
/// Representation of all signals at a point in time.
/// Each point matches a `signal_change` table row (when flushed). However, the in-memory
/// representation keeps not only the changes as of that time but also the complete prior state.
struct Point {
/// All data associated with the point.
/// `data[0..changes_off]` represents previous state (immediately prior to this point).
/// `data[changes_off..]` represents the changes at this point.
/// This representation could be 8 bytes shorter on 64-bit platforms by using a u32 for the
/// lengths, but this would require some unsafe code.
/// The serialized form stored here must always be valid.
data: Box<[u8]>,
changes_off: usize,
impl Point {
/// Creates a new point from `prev` and `changes`.
/// The caller is responsible for validation. In particular, `changes` must be a valid
/// serialized form.
fn new(prev: &BTreeMap<u32, u16>, changes: &[u8]) -> Self {
let mut data = Vec::with_capacity(3 * prev.len() + changes.len());
append_serialized(prev, &mut data);
let changes_off = data.len();
Point {
data: data.into_boxed_slice(),
fn swap(&mut self, other: &mut Point) {
std::mem::swap(&mut self.data, &mut other.data);
std::mem::swap(&mut self.changes_off, &mut other.changes_off);
/// Returns an iterator over state as of immediately before this point.
fn prev(&self) -> PointDataIterator {
/// Returns an iterator over changes in this point.
fn changes(&self) -> PointDataIterator {
/// Returns a mapping of signals to states immediately after this point.
fn after(&self) -> BTreeMap<u32, u16> {
let mut after = BTreeMap::new();
let mut it = self.prev();
while let Some((signal, state)) = it.next().expect("in-mem prev is valid") {
after.insert(signal, state);
let mut it = self.changes();
while let Some((signal, state)) = it.next().expect("in-mem changes is valid") {
if state == 0 {
} else {
after.insert(signal, state);
/// Appends a serialized form of `from` into `to`.
/// `from` must be an iterator of `(signal, state)` with signal numbers in monotonically increasing
/// order.
fn append_serialized<'a, I>(from: I, to: &mut Vec<u8>)
where I: IntoIterator<Item = (&'a u32, &'a u16)> {
let mut next_allowed = 0;
for (&signal, &state) in from.into_iter() {
assert!(signal >= next_allowed);
coding::append_varint32(signal - next_allowed, to);
coding::append_varint32(state as u32, to);
next_allowed = signal + 1;
fn serialize(from: &BTreeMap<u32, u16>) -> Vec<u8> {
let mut to = Vec::with_capacity(3 * from.len());
append_serialized(from, &mut to);
struct PointDataIterator<'a> {
data: &'a [u8],
cur_pos: usize,
cur_signal: u32,
impl<'a> PointDataIterator<'a> {
fn new(data: &'a [u8]) -> Self {
PointDataIterator {
cur_pos: 0,
cur_signal: 0,
/// Returns an error, `None`, or `Some((signal, state))`.
/// Note that errors should be impossible on in-memory data; this returns `Result` for
/// validating blobs as they're read from the database.
fn next(&mut self) -> Result<Option<(u32, u16)>, Error> {
if self.cur_pos == self.data.len() {
return Ok(None);
let (signal_delta, p) = coding::decode_varint32(self.data, self.cur_pos)
.map_err(|()| format_err!("varint32 decode failure; data={:?} pos={}",
self.data, self.cur_pos))?;
let (state, p) = coding::decode_varint32(self.data, p)
.map_err(|()| format_err!("varint32 decode failure; data={:?} pos={}",
self.data, p))?;
let signal = self.cur_signal.checked_add(signal_delta)
.ok_or_else(|| format_err!("signal overflow: {} + {}",
self.cur_signal, signal_delta))?;
if state > u16::max_value() as u32 {
bail!("state overflow: {}", state);
self.cur_pos = p;
self.cur_signal = signal + 1;
Ok(Some((signal, state as u16)))
fn to_map(mut self) -> Result<BTreeMap<u32, u16>, Error> {
let mut out = BTreeMap::new();
while let Some((signal, state)) = self.next()? {
out.insert(signal, state);
/// Representation of a `signal_camera` row.
/// `signal_id` is implied by the `Signal` which owns this struct.
pub struct SignalCamera {
pub camera_id: i32,
pub type_: SignalCameraType,
/// Representation of the `type` field in a `signal_camera` row.
pub enum SignalCameraType {
Direct = 0,
Indirect = 1,
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct ListStateChangesRow {
pub when: recording::Time,
pub signal: u32,
pub state: u16,
impl State {
pub fn init(conn: &Connection) -> Result<Self, Error> {
let max_signal_changes: Option<i64> =
conn.query_row("select max_signal_changes from meta", params![], |row| row.get(0))?;
let mut signals_by_id = State::init_signals(conn)?;
State::fill_signal_cameras(conn, &mut signals_by_id)?;
Ok(State {
types_by_uuid: State::init_types(conn)?,
points_by_time: State::init_points(conn)?,
dirty_by_time: BTreeSet::new(),
pub fn list_changes_by_time(
&self, desired_time: Range<recording::Time>, f: &mut dyn FnMut(&ListStateChangesRow)) {
// First find the state immediately before. If it exists, include it.
if let Some((&when, p)) = self.points_by_time.range(..desired_time.start).next_back() {
for (&signal, &state) in &p.after() {
f(&ListStateChangesRow {
// Then include changes up to (but not including) the end time.
for (&when, p) in self.points_by_time.range(desired_time.clone()) {
let mut it = p.changes();
while let Some((signal, state)) = it.next().expect("in-mem changes is valid") {
f(&ListStateChangesRow {
pub fn update_signals(
&mut self, when: Range<recording::Time>, signals: &[u32], states: &[u16])
-> Result<(), base::Error> {
// Do input validation before any mutation.
self.update_signals_validate(signals, states)?;
// Follow the std::ops::Range convention of considering a range empty if its start >= end.
// Bailing early in the empty case isn't just an optimization; apply_observation_end would
// be incorrect otherwise.
if when.end <= when.start {
return Ok(());
// Apply the end before the start so that the `prev` state can be examined.
self.update_signals_end(when.end, signals, states);
self.update_signals_start(when.start, signals, states);
self.update_signals_middle(when, signals, states);
/// Performs garbage collection if the number of points exceeds `max_signal_changes`.
fn gc(&mut self) {
let max = match self.max_signal_changes {
None => return,
Some(m) if m < 0 => 0 as usize,
Some(m) if m > (isize::max_value() as i64) => return,
Some(m) => m as usize,
let to_remove = match self.points_by_time.len().checked_sub(max) {
None => return,
Some(p) => p,
debug!("Performing signal GC: have {} points, want only {}, so removing {}",
self.points_by_time.len(), max, to_remove);
let remove: smallvec::SmallVec<[recording::Time; 4]> =
self.points_by_time.keys().take(to_remove).map(|p| *p).collect();
for p in &remove {
/// Helper for `update_signals` to do validation.
fn update_signals_validate(&self, signals: &[u32], states: &[u16]) -> Result<(), base::Error> {
if signals.len() != states.len() {
bail_t!(InvalidArgument, "signals and states must have same length");
let mut next_allowed = 0u32;
for (&signal, &state) in signals.iter().zip(states) {
if signal < next_allowed {
bail_t!(InvalidArgument, "signals must be monotonically increasing");
match self.signals_by_id.get(&signal) {
None => bail_t!(InvalidArgument, "unknown signal {}", signal),
Some(ref s) => {
let empty = Vec::new();
let states = self.types_by_uuid.get(&s.type_)
.map(|t| &t.states)
if state != 0 && states.binary_search_by_key(&state, |s| s.value).is_err() {
bail_t!(FailedPrecondition, "signal {} specifies unknown state {}",
signal, state);
next_allowed = signal + 1;
/// Helper for `update_signals` to apply the end point.
fn update_signals_end(&mut self, end: recording::Time, signals: &[u32], states: &[u16]) {
let mut prev;
let mut changes = BTreeMap::<u32, u16>::new();
if let Some((&t, ref mut p)) = self.points_by_time.range_mut(..=end).next_back() {
if t == end {
// Already have a point at end. Adjust it. prev starts unchanged...
prev = p.prev().to_map().expect("in-mem prev is valid");
// ...and then prev and changes are altered to reflect the desired update.
State::update_signals_end_maps(signals, states, &mut prev, &mut changes);
// If this doesn't alter the new state, don't dirty the database.
if changes.is_empty() {
// Any existing changes should still be applied. They win over reverting to prev.
let mut it = p.changes();
while let Some((signal, state)) = it.next().expect("in-mem changes is valid") {
changes.entry(signal).and_modify(|e| *e = state).or_insert(state);
p.swap(&mut Point::new(&prev, &serialize(&changes)));
// Don't have a point at end, but do have previous state.
prev = p.after();
} else {
// No point at or before end. Start from scratch (all signals unknown).
prev = BTreeMap::new();
// Create a new end point if necessary.
State::update_signals_end_maps(signals, states, &mut prev, &mut changes);
if changes.is_empty() {
self.points_by_time.insert(end, Point::new(&prev, &serialize(&changes)));
/// Helper for `update_signals_end`. Adjusts `prev` (the state prior to the end point) to
/// reflect the desired update (in `signals` and `states`). Adjusts `changes` (changes to
/// execute at the end point) to undo the change.
fn update_signals_end_maps(signals: &[u32], states: &[u16], prev: &mut BTreeMap<u32, u16>,
changes: &mut BTreeMap<u32, u16>) {
for (&signal, &state) in signals.iter().zip(states) {
match prev.entry(signal) {
Entry::Vacant(e) => {
changes.insert(signal, 0);
Entry::Occupied(mut e) => {
if state == 0 {
changes.insert(signal, *e.get());
} else if *e.get() != state {
changes.insert(signal, *e.get());
*e.get_mut() = state;
/// Helper for `update_signals` to apply the start point.
fn update_signals_start(&mut self, start: recording::Time, signals: &[u32], states: &[u16]) {
let prev;
if let Some((&t, ref mut p)) = self.points_by_time.range_mut(..=start).next_back() {
if t == start {
// Reuse existing point at start.
prev = p.prev().to_map().expect("in-mem prev is valid");
let mut changes = p.changes().to_map().expect("in-mem changes is valid");
let mut dirty = false;
for (&signal, &state) in signals.iter().zip(states) {
match changes.entry(signal) {
Entry::Occupied(mut e) => {
if *e.get() != state {
dirty = true;
if state == *prev.get(&signal).unwrap_or(&0) {
} else {
*e.get_mut() = state;
Entry::Vacant(e) => {
if signal != 0 {
dirty = true;
if dirty {
p.swap(&mut Point::new(&prev, &serialize(&changes)));
// Create new point at start, using state from previous point.
prev = p.after();
} else {
// Create new point at start, from scratch.
prev = BTreeMap::new();
let mut changes = BTreeMap::new();
for (&signal, &state) in signals.iter().zip(states) {
if state != *prev.get(&signal).unwrap_or(&0) {
changes.insert(signal, state);
if changes.is_empty() {
self.points_by_time.insert(start, Point::new(&prev, &serialize(&changes)));
/// Helper for `update_signals` to apply all points in `(when.start, when.end)`.
fn update_signals_middle(&mut self, when: Range<recording::Time>, signals: &[u32],
states: &[u16]) {
let mut to_delete = Vec::new();
let after_start = recording::Time(when.start.0+1);
for (&t, ref mut p) in self.points_by_time.range_mut(after_start..when.end) {
let mut prev = p.prev().to_map().expect("in-mem prev is valid");
// Update prev to reflect desired update.
for (&signal, &state) in signals.iter().zip(states) {
match prev.entry(signal) {
Entry::Occupied(mut e) => {
if state == 0 {
} else if *e.get() != state {
*e.get_mut() = state;
Entry::Vacant(e) => {
if state != 0 {
// Trim changes to omit any change to signals.
let mut changes = Vec::with_capacity(3*signals.len());
let mut it = p.changes();
let mut next_allowed = 0;
let mut dirty = false;
while let Some((signal, state)) = it.next().expect("in-memory changes is valid") {
if signals.binary_search(&signal).is_ok() { // discard.
dirty = true;
} else { // keep.
assert!(signal >= next_allowed);
coding::append_varint32(signal - next_allowed, &mut changes);
coding::append_varint32(state as u32, &mut changes);
next_allowed = signal + 1;
if changes.is_empty() {
} else {
p.swap(&mut Point::new(&prev, &changes));
if dirty {
// Delete any points with no more changes.
for &t in &to_delete {
self.points_by_time.remove(&t).expect("point exists");
/// Flushes all pending database changes to the given transaction.
/// The caller is expected to call `post_flush` afterward if the transaction is
/// successfully committed. No mutations should happen between these calls.
pub fn flush(&mut self, tx: &Transaction) -> Result<(), Error> {
let mut i_stmt = tx.prepare(r#"
insert or replace into signal_change (time_90k, changes) values (?, ?)
let mut d_stmt = tx.prepare(r#"
delete from signal_change where time_90k = ?
for &t in &self.dirty_by_time {
match self.points_by_time.entry(t) {
Entry::Occupied(ref e) => {
let p = e.get();
Entry::Vacant(_) => {
/// Marks that the previous `flush` was completed successfully.
/// See notes there.
pub fn post_flush(&mut self) {
fn init_signals(conn: &Connection) -> Result<BTreeMap<u32, Signal>, Error> {
let mut signals = BTreeMap::new();
let mut stmt = conn.prepare(r#"
let mut rows = stmt.query(params![])?;
while let Some(row) = rows.next()? {
let id = row.get(0)?;
let source: FromSqlUuid = row.get(1)?;
let type_: FromSqlUuid = row.get(2)?;
signals.insert(id, Signal {
source: source.0,
type_: type_.0,
short_name: row.get(3)?,
cameras: Vec::new(),
fn init_points(conn: &Connection) -> Result<BTreeMap<recording::Time, Point>, Error> {
let mut stmt = conn.prepare(r#"
order by time_90k
let mut rows = stmt.query(params![])?;
let mut points = BTreeMap::new();
let mut cur = BTreeMap::new(); // latest signal -> state, where state != 0
while let Some(row) = rows.next()? {
let time_90k = recording::Time(row.get(0)?);
let changes = row.get_raw_checked(1)?.as_blob()?;
let mut it = PointDataIterator::new(changes);
while let Some((signal, state)) = it.next()? {
if state == 0 {
} else {
cur.insert(signal, state);
points.insert(time_90k, Point::new(&cur, changes));
/// Fills the `cameras` field of the `Signal` structs within the supplied `signals`.
fn fill_signal_cameras(conn: &Connection, signals: &mut BTreeMap<u32, Signal>)
-> Result<(), Error> {
let mut stmt = conn.prepare(r#"
order by signal_id, camera_id
let mut rows = stmt.query(params![])?;
while let Some(row) = rows.next()? {
let signal_id = row.get(0)?;
let s = signals.get_mut(&signal_id)
.ok_or_else(|| format_err!("signal_camera row for unknown signal id {}",
let type_ = row.get(2)?;
s.cameras.push(SignalCamera {
camera_id: row.get(1)?,
type_: match type_ {
0 => SignalCameraType::Direct,
1 => SignalCameraType::Indirect,
_ => bail!("unknown signal_camera type {}", type_),
fn init_types(conn: &Connection) -> Result<FnvHashMap<Uuid, Type>, Error> {
let mut types = FnvHashMap::default();
let mut stmt = conn.prepare(r#"
order by type_uuid, value
let mut rows = stmt.query(params![])?;
while let Some(row) = rows.next()? {
let type_: FromSqlUuid = row.get(0)?;
types.entry(type_.0).or_insert_with(Type::default).states.push(TypeState {
value: row.get(1)?,
name: row.get(2)?,
motion: row.get(3)?,
color: row.get(4)?,
pub fn signals_by_id(&self) -> &BTreeMap<u32, Signal> { &self.signals_by_id }
pub fn types_by_uuid(&self) -> &FnvHashMap<Uuid, Type> { & self.types_by_uuid }
/// Representation of a `signal` row.
pub struct Signal {
pub id: u32,
pub source: Uuid,
pub type_: Uuid,
pub short_name: String,
/// The cameras this signal is associated with. Sorted by camera id, which is unique.
pub cameras: Vec<SignalCamera>,
/// Representation of a `signal_type_enum` row.
/// `type_uuid` is implied by the `Type` which owns this struct.
pub struct TypeState {
pub value: u16,
pub name: String,
pub motion: bool,
pub color: String,
/// Representation of a signal type; currently this just gathers together the TypeStates.
#[derive(Debug, Default)]
pub struct Type {
/// The possible states associated with this type. They are sorted by value, which is unique.
pub states: Vec<TypeState>,
mod tests {
use crate::{db, testutil};
use rusqlite::Connection;
use super::*;
fn test_point_data_it() {
// Example taken from the .sql file.
let data = b"\x01\x01\x01\x01\xc4\x01\x02";
let mut it = super::PointDataIterator::new(data);
assert_eq!(it.next().unwrap(), Some((1, 1)));
assert_eq!(it.next().unwrap(), Some((3, 1)));
assert_eq!(it.next().unwrap(), Some((200, 2)));
assert_eq!(it.next().unwrap(), None);
fn test_empty_db() {
let mut conn = Connection::open_in_memory().unwrap();
db::init(&mut conn).unwrap();
let s = State::init(&conn).unwrap();
s.list_changes_by_time(recording::Time::min_value() .. recording::Time::max_value(),
&mut |_r| panic!("no changes expected"));
fn round_trip() {
let mut conn = Connection::open_in_memory().unwrap();
db::init(&mut conn).unwrap();
update meta set max_signal_changes = 2;
insert into signal (id, source_uuid, type_uuid, short_name)
values (1, x'1B3889C0A59F400DA24C94EBEB19CC3A',
x'EE66270FD9C648198B339720D4CBCA6B', 'a'),
(2, x'A4A73D9A53424EBCB9F6366F1E5617FA',
x'EE66270FD9C648198B339720D4CBCA6B', 'b');
insert into signal_type_enum (type_uuid, value, name, motion, color)
values (x'EE66270FD9C648198B339720D4CBCA6B', 1, 'still', 0, 'black'),
(x'EE66270FD9C648198B339720D4CBCA6B', 2, 'moving', 1, 'red');
let mut s = State::init(&conn).unwrap();
s.list_changes_by_time(recording::Time::min_value() .. recording::Time::max_value(),
&mut |_r| panic!("no changes expected"));
const START: recording::Time = recording::Time(140067462600000); // 2019-04-26T11:59:00
const NOW: recording::Time = recording::Time(140067468000000); // 2019-04-26T12:00:00
s.update_signals(START..NOW, &[1, 2], &[2, 1]).unwrap();
let mut rows = Vec::new();
const EXPECTED: &[ListStateChangesRow] = &[
ListStateChangesRow {
when: START,
signal: 1,
state: 2,
ListStateChangesRow {
when: START,
signal: 2,
state: 1,
ListStateChangesRow {
when: NOW,
signal: 1,
state: 0,
ListStateChangesRow {
when: NOW,
signal: 2,
state: 0,
s.list_changes_by_time(recording::Time::min_value() .. recording::Time::max_value(),
&mut |r| rows.push(*r));
assert_eq!(&rows[..], EXPECTED);
let tx = conn.transaction().unwrap();
let mut s = State::init(&conn).unwrap();
s.list_changes_by_time(recording::Time::min_value() .. recording::Time::max_value(),
&mut |r| rows.push(*r));
assert_eq!(&rows[..], EXPECTED);
// Go through it again. This time, hit the max number of signals, forcing START to be
// dropped.
const SOON: recording::Time = recording::Time(140067473400000); // 2019-04-26T12:01:00
s.update_signals(NOW..SOON, &[1, 2], &[1, 2]).unwrap();
const EXPECTED2: &[ListStateChangesRow] = &[
ListStateChangesRow {
when: NOW,
signal: 1,
state: 1,
ListStateChangesRow {
when: NOW,
signal: 2,
state: 2,
ListStateChangesRow {
when: SOON,
signal: 1,
state: 0,
ListStateChangesRow {
when: SOON,
signal: 2,
state: 0,
s.list_changes_by_time(recording::Time::min_value() .. recording::Time::max_value(),
&mut |r| rows.push(*r));
assert_eq!(&rows[..], EXPECTED2);
let tx = conn.transaction().unwrap();
let s = State::init(&conn).unwrap();
s.list_changes_by_time(recording::Time::min_value() .. recording::Time::max_value(),
&mut |r| rows.push(*r));
assert_eq!(&rows[..], EXPECTED2);