moonfire-nvr/server/db/days.rs

530 lines
16 KiB
Rust
Raw Normal View History

// This file is part of Moonfire NVR, a security camera network video recorder.
// Copyright (C) 2021 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt.
// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception.
//! In-memory indexes by calendar day.
use crate::recording::{self, Time};
use failure::Error;
use log::{error, trace};
use smallvec::SmallVec;
use std::cmp;
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::io::Write;
use std::ops::Range;
use std::str;
/// A calendar day in `YYYY-mm-dd` format.
#[derive(Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
pub struct Key([u8; 10]);
impl Key {
fn new(tm: time::Tm) -> Result<Self, Error> {
let mut s = Key([0u8; 10]);
write!(&mut s.0[..], "{}", tm.strftime("%Y-%m-%d")?)?;
Ok(s)
}
pub fn bounds(&self) -> Range<Time> {
let mut my_tm = time::strptime(self.as_ref(), "%Y-%m-%d").expect("days must be parseable");
my_tm.tm_utcoff = 1; // to the time crate, values != 0 mean local time.
my_tm.tm_isdst = -1;
let start = Time(my_tm.to_timespec().sec * recording::TIME_UNITS_PER_SEC);
my_tm.tm_hour = 0;
my_tm.tm_min = 0;
my_tm.tm_sec = 0;
my_tm.tm_mday += 1;
let end = Time(my_tm.to_timespec().sec * recording::TIME_UNITS_PER_SEC);
start..end
}
}
impl AsRef<str> for Key {
fn as_ref(&self) -> &str {
str::from_utf8(&self.0[..]).expect("days are always UTF-8")
}
}
pub trait Value: std::fmt::Debug + Default {
type Change: std::fmt::Debug;
/// Applies the given change to this value.
fn apply(&mut self, c: &Self::Change);
fn is_empty(&self) -> bool;
}
/// In-memory state about a particular stream on a particular day.
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub struct StreamValue {
/// The number of recordings that overlap with this day.
pub recordings: i64,
/// The total duration recorded on this day. This can be 0; because frames' durations are taken
/// from the time of the next frame, a recording that ends unexpectedly after a single frame
/// will have 0 duration of that frame and thus the whole recording.
pub duration: recording::Duration,
}
impl Value for StreamValue {
type Change = Self;
fn apply(&mut self, c: &StreamValue) {
self.recordings += c.recordings;
self.duration += c.duration;
}
fn is_empty(&self) -> bool {
self.recordings == 0
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct SignalValue {
/// `states[i]` represents the amount of time spent in state `i+1`.
/// (The signal is the unknown state, 0, for the remainder of the time.)
pub states: SmallVec<[u32; 4]>,
}
impl Value for SignalValue {
type Change = SignalChange;
fn apply(&mut self, c: &SignalChange) {
if self.states.len() < usize::try_from(c.new_state).unwrap() {
self.states.resize(c.new_state as usize, 0);
}
if c.new_state > 0 {
// add to new state.
let s = &mut self.states[c.new_state as usize - 1];
let n = s
.checked_add(c.duration)
.unwrap_or_else(|| panic!("add range violation: s={:?} c={:?}", s, c));
*s = n;
}
if c.old_state > 0 {
// remove from old state.
let i = usize::try_from(c.old_state).unwrap() - 1;
assert!(
self.states.len() > i,
"no such old state: s={:?} c={:?}",
self,
c
);
let s = &mut self.states[c.old_state as usize - 1];
let n = s
.checked_sub(c.duration)
.unwrap_or_else(|| panic!("sub range violation: s={:?} c={:?}", s, c));
*s = n;
}
// Normalize.
let mut l = self.states.len();
while l > 0 && self.states[l - 1] == 0 {
l -= 1;
}
self.states.truncate(l);
}
fn is_empty(&self) -> bool {
self.states.is_empty()
}
}
/// A change to a signal within a single day.
#[derive(Debug)]
pub struct SignalChange {
/// The duration of time being altered.
duration: u32,
/// The state of the given range before this change.
old_state: i16,
/// The state of the given range after this change.
new_state: i16,
}
#[derive(Clone, Debug)]
pub struct Map<V: Value>(BTreeMap<Key, V>);
impl<V: Value> Map<V> {
pub fn new() -> Self {
Self(BTreeMap::new())
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn get(&self, k: &Key) -> Option<&V> {
self.0.get(k)
}
/// Adds non-zero `delta` to the day represented by `day` in the map `m`.
/// Inserts a map entry if absent; removes the entry if it has 0 entries on exit.
fn adjust_day(&mut self, day: Key, c: V::Change) {
trace!("adjust_day {} {:?}", day.as_ref(), &c);
use ::std::collections::btree_map::Entry;
match self.0.entry(day) {
Entry::Vacant(e) => e.insert(Default::default()).apply(&c),
Entry::Occupied(mut e) => {
let v = e.get_mut();
v.apply(&c);
if v.is_empty() {
e.remove_entry();
}
}
}
}
}
impl<'a, V: Value> IntoIterator for &'a Map<V> {
type Item = (&'a Key, &'a V);
type IntoIter = std::collections::btree_map::Iter<'a, Key, V>;
fn into_iter(self) -> Self::IntoIter {
self.0.iter()
}
}
impl Map<StreamValue> {
/// Adjusts `self` to reflect the range of the given recording.
/// Note that the specified range may span two days. It will never span more because the maximum
/// length of a recording entry is less than a day (even a 23-hour "spring forward" day).
///
/// This function swallows/logs date formatting errors because they shouldn't happen and there's
/// not much that can be done about them. (The database operation has already gone through.)
pub(crate) fn adjust(&mut self, r: Range<Time>, sign: i64) {
// Find first day key.
let sec = r.start.unix_seconds();
let mut my_tm = time::at(time::Timespec { sec, nsec: 0 });
let day = match Key::new(my_tm) {
Ok(d) => d,
Err(ref e) => {
error!(
"Unable to fill first day key from {:?}->{:?}: {}; will ignore.",
r.start, my_tm, e
);
return;
}
};
// Determine the start of the next day.
// Use mytm to hold a non-normalized representation of the boundary.
my_tm.tm_isdst = -1;
my_tm.tm_hour = 0;
my_tm.tm_min = 0;
my_tm.tm_sec = 0;
my_tm.tm_mday += 1;
let boundary = my_tm.to_timespec();
let boundary_90k = boundary.sec * recording::TIME_UNITS_PER_SEC;
// Adjust the first day.
let first_day_delta = StreamValue {
recordings: sign,
duration: recording::Duration(sign * (cmp::min(r.end.0, boundary_90k) - r.start.0)),
};
self.adjust_day(day, first_day_delta);
if r.end.0 <= boundary_90k {
return;
}
// Fill day with the second day. This requires a normalized representation so recalculate.
// (The C mktime(3) already normalized for us once, but .to_timespec() discarded that
// result.)
let my_tm = time::at(boundary);
let day = match Key::new(my_tm) {
Ok(d) => d,
Err(ref e) => {
error!(
"Unable to fill second day key from {:?}: {}; will ignore.",
my_tm, e
);
return;
}
};
let second_day_delta = StreamValue {
recordings: sign,
duration: recording::Duration(sign * (r.end.0 - boundary_90k)),
};
self.adjust_day(day, second_day_delta);
}
}
#[cfg(test)]
impl Map<SignalValue> {
/// Adjusts `self` to reflect the range of the given recording.
/// Note that the specified range may span several days (unlike StreamValue).
///
/// This function swallows/logs date formatting errors because they shouldn't happen and there's
/// not much that can be done about them. (The database operation has already gone through.)
pub(crate) fn adjust(&mut self, mut r: Range<i64>, old_state: i16, new_state: i16) {
// Find first day key.
let mut my_tm = time::at(time::Timespec {
sec: r.start,
nsec: 0,
});
let mut day = match Key::new(my_tm) {
Ok(d) => d,
Err(ref e) => {
error!(
"Unable to fill first day key from {:?}->{:?}: {}; will ignore.",
r.start, my_tm, e
);
return;
}
};
// Determine the start of the next day.
// Use mytm to hold a non-normalized representation of the boundary.
my_tm.tm_isdst = -1;
my_tm.tm_hour = 0;
my_tm.tm_min = 0;
my_tm.tm_sec = 0;
loop {
my_tm.tm_mday += 1;
let boundary = my_tm.to_timespec().sec;
// Adjust this day.
let duration = cmp::min(r.end, boundary) - r.start;
assert!(0 <= duration && duration < i32::max_value().into());
self.adjust_day(
day,
SignalChange {
old_state,
new_state,
duration: u32::try_from(duration).unwrap(),
},
);
if r.end <= boundary {
return;
}
// Fill day with the next day. This requires a normalized representation so
// recalculate. (The C mktime(3) already normalized for us once, but .to_timespec()
// discarded that result.)
let my_tm = time::at(time::Timespec {
sec: boundary,
nsec: 0,
});
day = match Key::new(my_tm) {
Ok(d) => d,
Err(ref e) => {
error!(
"Unable to fill day key from {:?}: {}; will ignore.",
my_tm, e
);
return;
}
};
r.start = boundary;
}
}
}
#[cfg(test)]
mod tests {
use super::{Key, Map, SignalValue, StreamValue};
use crate::recording::{self, TIME_UNITS_PER_SEC};
use crate::testutil;
use smallvec::smallvec;
#[test]
fn test_adjust_stream() {
testutil::init();
let mut m: Map<StreamValue> = Map::new();
// Create a day.
let test_time = recording::Time(130647162600000i64); // 2015-12-31 23:59:00 (Pacific).
let one_min = recording::Duration(60 * TIME_UNITS_PER_SEC);
let two_min = recording::Duration(2 * 60 * TIME_UNITS_PER_SEC);
let three_min = recording::Duration(3 * 60 * TIME_UNITS_PER_SEC);
let four_min = recording::Duration(4 * 60 * TIME_UNITS_PER_SEC);
let test_day1 = &Key(*b"2015-12-31");
let test_day2 = &Key(*b"2016-01-01");
m.adjust(test_time..test_time + one_min, 1);
assert_eq!(1, m.len());
assert_eq!(
Some(&StreamValue {
recordings: 1,
duration: one_min
}),
m.get(test_day1)
);
// Add to a day.
m.adjust(test_time..test_time + one_min, 1);
assert_eq!(1, m.len());
assert_eq!(
Some(&StreamValue {
recordings: 2,
duration: two_min
}),
m.get(test_day1)
);
// Subtract from a day.
m.adjust(test_time..test_time + one_min, -1);
assert_eq!(1, m.len());
assert_eq!(
Some(&StreamValue {
recordings: 1,
duration: one_min
}),
m.get(test_day1)
);
// Remove a day.
m.adjust(test_time..test_time + one_min, -1);
assert_eq!(0, m.len());
// Create two days.
m.adjust(test_time..test_time + three_min, 1);
assert_eq!(2, m.len());
assert_eq!(
Some(&StreamValue {
recordings: 1,
duration: one_min
}),
m.get(test_day1)
);
assert_eq!(
Some(&StreamValue {
recordings: 1,
duration: two_min
}),
m.get(test_day2)
);
// Add to two days.
m.adjust(test_time..test_time + three_min, 1);
assert_eq!(2, m.len());
assert_eq!(
Some(&StreamValue {
recordings: 2,
duration: two_min
}),
m.get(test_day1)
);
assert_eq!(
Some(&StreamValue {
recordings: 2,
duration: four_min
}),
m.get(test_day2)
);
// Subtract from two days.
m.adjust(test_time..test_time + three_min, -1);
assert_eq!(2, m.len());
assert_eq!(
Some(&StreamValue {
recordings: 1,
duration: one_min
}),
m.get(test_day1)
);
assert_eq!(
Some(&StreamValue {
recordings: 1,
duration: two_min
}),
m.get(test_day2)
);
// Remove two days.
m.adjust(test_time..test_time + three_min, -1);
assert_eq!(0, m.len());
}
#[test]
fn test_adjust_signal() {
testutil::init();
let mut m: Map<SignalValue> = Map::new();
let test_sec = 1451635140i64; // 2015-12-31 23:59:00 (Pacific).
const SEC_PER_MIN: i64 = 60;
const SEC_PER_HOUR: i64 = 60 * SEC_PER_MIN;
let test_day1 = &Key(*b"2015-12-31");
let test_day2 = &Key(*b"2016-01-01");
let test_day3 = &Key(*b"2016-01-02");
m.adjust(test_sec..test_sec + 30 * SEC_PER_HOUR, 0, 3);
assert_eq!(3, m.len());
assert_eq!(
m.get(test_day1),
Some(&SignalValue {
states: smallvec![0, 0, SEC_PER_MIN as u32],
})
);
assert_eq!(
m.get(test_day2),
Some(&SignalValue {
states: smallvec![0, 0, (24 * SEC_PER_HOUR) as u32],
})
);
assert_eq!(
m.get(test_day3),
Some(&SignalValue {
states: smallvec![0, 0, (5 * SEC_PER_HOUR + 59 * SEC_PER_MIN) as u32],
})
);
m.adjust(1451635200..1451721600, 3, 1); // entire 2016-01-01
assert_eq!(3, m.len());
assert_eq!(
m.get(test_day1),
Some(&SignalValue {
states: smallvec![0, 0, SEC_PER_MIN as u32],
})
);
assert_eq!(
m.get(test_day2),
Some(&SignalValue {
states: smallvec![(24 * SEC_PER_HOUR) as u32],
})
);
assert_eq!(
m.get(test_day3),
Some(&SignalValue {
states: smallvec![0, 0, (5 * SEC_PER_HOUR + 59 * SEC_PER_MIN) as u32],
})
);
m.adjust(1451635200..1451721600, 1, 0); // entire 2016-01-01
assert_eq!(2, m.len());
assert_eq!(
m.get(test_day1),
Some(&SignalValue {
states: smallvec![0, 0, SEC_PER_MIN as u32],
})
);
assert_eq!(
m.get(test_day3),
Some(&SignalValue {
states: smallvec![0, 0, (5 * SEC_PER_HOUR + 59 * SEC_PER_MIN) as u32],
})
);
}
#[test]
fn test_day_bounds() {
testutil::init();
assert_eq!(
Key(*b"2017-10-10").bounds(), // normal day (24 hrs)
recording::Time(135685692000000)..recording::Time(135693468000000)
);
assert_eq!(
Key(*b"2017-03-12").bounds(), // spring forward (23 hrs)
recording::Time(134037504000000)..recording::Time(134044956000000)
);
assert_eq!(
Key(*b"2017-11-05").bounds(), // fall back (25 hrs)
recording::Time(135887868000000)..recording::Time(135895968000000)
);
}
}