Merge branch 'master' into new-ui

This commit is contained in:
Scott Lamb 2021-03-26 19:29:22 -07:00
commit b0b650b6b9
13 changed files with 862 additions and 351 deletions

View File

@ -6,6 +6,14 @@ changes, see Git history.
Each release is tagged in Git and on the Docker repository
[`scottlamb/moonfire-nvr`](https://hub.docker.com/r/scottlamb/moonfire-nvr).
## `v0.6.3` (in progress)
* Compile fix for nightly rust 2021-03-14 and beyond.
* Fix incorrect `prev_media_duration_90k` calculation. No current impact.
This field is intended to be used in an upcoming scrub bar UI, and when
not calculated properly there might be unexpected gaps or overlaps in
playback.
## `v0.6.2`
* Fix panics when a stream's PTS has extreme jumps

View File

@ -91,7 +91,7 @@ The `application/json` response will have a dict as follows:
this stream. This is slightly more than `totalSampleFileBytes`
because it also includes the wasted portion of the final
filesystem block allocated to each file.
* `days`: (only included if request pararameter `days` is true)
* `days`: (only included if request parameter `days` is true)
dictionary representing calendar days (in the server's time zone)
with non-zero total duration of recordings for that day. Currently
this includes uncommitted and growing recordings. This is likely
@ -118,8 +118,11 @@ The `application/json` response will have a dict as follows:
* `cameras`: a map of associated cameras' UUIDs to the type of association:
`direct` or `indirect`. See `db/schema.sql` for more description.
* `type`: a UUID, expected to match one of `signalTypes`.
* `days`: as in `cameras.streams.days` above.
**status: unimplemented**
* `days`: (only included if request parameter `days` is true) similar to
`cameras.days` above. Values are objects with the following attributes:
* `states`: an array of the time the signal is in each state, starting
from 1. These may not sum to the entire day; if so, the rest of the
day is in state 0 (`unknown`).
* `signalTypes`: a list of all known signal types.
* `uuid`: in text format.
* `states`: a map of all possible states of the enumeration to more
@ -183,7 +186,7 @@ Example response:
"2016-05-01": {
"endTime90k": 131595516000000,
"startTime90k": 131587740000000,
"totalDuration90k": 5400000
"states": [5400000]
}
}
}
@ -487,6 +490,8 @@ followed by by a `.mp4` media segment. The following headers will be included:
* `X-Media-Time-Range`: the relative media start and end times of these
frames within the recording, as a half-open interval.
The server will also send pings, currently at 30-second intervals.
The WebSocket will always open immediately but will receive messages only while the
backing RTSP stream is connected.
@ -577,8 +582,7 @@ Valid request parameters:
This will return the current state as of the latest change (to any signal)
before the start time (if any), then all changes in the interval. This
allows the caller to determine the state at every moment during the
selected timespan, as well as observe all events (even instantaneous
ones).
selected timespan, as well as observe all events.
Responses are several parallel arrays for each observation:

25
server/Cargo.lock generated
View File

@ -1016,13 +1016,13 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "lexical-core"
version = "0.7.4"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db65c6da02e61f55dae90a0ae427b2a5f6b3e8db09f58d10efab23af92592616"
checksum = "21f866863575d0e1d654fbeeabdc927292fdf862873dc3c96c6f753357e13374"
dependencies = [
"arrayvec 0.5.2",
"bitflags",
"cfg-if 0.1.10",
"cfg-if 1.0.0",
"ryu",
"static_assertions",
]
@ -1256,6 +1256,7 @@ dependencies = [
"tempdir",
"time",
"tokio",
"tokio-stream",
"tokio-tungstenite",
"url",
"uuid",
@ -1629,12 +1630,12 @@ dependencies = [
[[package]]
name = "protobuf"
version = "3.0.0-pre"
source = "git+https://github.com/stepancheg/rust-protobuf#5f3ed259acf9ae42014e4e49f7c4d62917685584"
source = "git+https://github.com/stepancheg/rust-protobuf#c27743ae4ce421f5d9a61ef0d1885eface278d6c"
[[package]]
name = "protobuf-codegen"
version = "3.0.0-pre"
source = "git+https://github.com/stepancheg/rust-protobuf#5f3ed259acf9ae42014e4e49f7c4d62917685584"
source = "git+https://github.com/stepancheg/rust-protobuf#c27743ae4ce421f5d9a61ef0d1885eface278d6c"
dependencies = [
"protobuf",
]
@ -1642,7 +1643,7 @@ dependencies = [
[[package]]
name = "protobuf-codegen-pure"
version = "3.0.0-pre"
source = "git+https://github.com/stepancheg/rust-protobuf#5f3ed259acf9ae42014e4e49f7c4d62917685584"
source = "git+https://github.com/stepancheg/rust-protobuf#c27743ae4ce421f5d9a61ef0d1885eface278d6c"
dependencies = [
"protobuf",
"protobuf-codegen",
@ -2354,9 +2355,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.1.0"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8efab2086f17abcddb8f756117665c958feee6b2e39974c2f1600592ab3a4195"
checksum = "134af885d758d645f0f0505c9a8b3f9bf8a348fd822e112ab5248138348f1722"
dependencies = [
"autocfg",
"bytes",
@ -2374,9 +2375,9 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "1.0.0"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42517d2975ca3114b22a16192634e8241dc5cc1f130be194645970cc1c371494"
checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.8",
@ -2385,9 +2386,9 @@ dependencies = [
[[package]]
name = "tokio-stream"
version = "0.1.2"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76066865172052eb8796c686f0b441a93df8b08d40a950b062ffb9a426f00edd"
checksum = "e177a5d8c3bf36de9ebe6d58537d8879e964332f93fb3339e43f618c81361af0"
dependencies = [
"futures-core",
"pin-project-lite",

View File

@ -57,7 +57,8 @@ smallvec = "1.0"
structopt = { version = "0.3.13", features = ["default", "wrap_help"] }
sync_wrapper = "0.1.0"
time = "0.1"
tokio = { version = "1.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal"] }
tokio = { version = "1.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] }
tokio-stream = "0.1.5"
tokio-tungstenite = "0.13.0"
url = "2.1.1"
uuid = { version = "0.8", features = ["serde", "std", "v4"] }

View File

@ -281,6 +281,13 @@ impl fmt::Display for Duration {
}
}
impl ops::Mul<i64> for Duration {
type Output = Self;
fn mul(self, rhs: i64) -> Self::Output {
Duration(self.0 * rhs)
}
}
impl ops::Add for Duration {
type Output = Duration;
fn add(self, rhs: Duration) -> Duration {

530
server/db/days.rs Normal file
View File

@ -0,0 +1,530 @@
// This file is part of Moonfire NVR, a security camera network video recorder.
// Copyright (C) 2021 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt.
// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception.
//! In-memory indexes by calendar day.
use base::time::{Duration, Time, TIME_UNITS_PER_SEC};
use failure::Error;
use log::{error, trace};
use smallvec::SmallVec;
use std::cmp;
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::io::Write;
use std::ops::Range;
use std::str;
/// A calendar day in `YYYY-mm-dd` format.
#[derive(Copy, Clone, Eq, Ord, PartialEq, PartialOrd)]
pub struct Key(pub(crate) [u8; 10]);
impl Key {
fn new(tm: time::Tm) -> Result<Self, Error> {
let mut s = Key([0u8; 10]);
write!(&mut s.0[..], "{}", tm.strftime("%Y-%m-%d")?)?;
Ok(s)
}
pub fn bounds(&self) -> Range<Time> {
let mut my_tm = time::strptime(self.as_ref(), "%Y-%m-%d").expect("days must be parseable");
my_tm.tm_utcoff = 1; // to the time crate, values != 0 mean local time.
my_tm.tm_isdst = -1;
let start = Time(my_tm.to_timespec().sec * TIME_UNITS_PER_SEC);
my_tm.tm_hour = 0;
my_tm.tm_min = 0;
my_tm.tm_sec = 0;
my_tm.tm_mday += 1;
let end = Time(my_tm.to_timespec().sec * TIME_UNITS_PER_SEC);
start..end
}
}
impl AsRef<str> for Key {
fn as_ref(&self) -> &str {
str::from_utf8(&self.0[..]).expect("days are always UTF-8")
}
}
impl std::fmt::Debug for Key {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self.as_ref())
}
}
pub trait Value: std::fmt::Debug + Default {
type Change: std::fmt::Debug;
/// Applies the given change to this value.
fn apply(&mut self, c: &Self::Change);
fn is_empty(&self) -> bool;
}
/// In-memory state about a particular stream on a particular day.
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub struct StreamValue {
/// The number of recordings that overlap with this day.
pub recordings: i64,
/// The total duration recorded on this day. This can be 0; because frames' durations are taken
/// from the time of the next frame, a recording that ends unexpectedly after a single frame
/// will have 0 duration of that frame and thus the whole recording.
pub duration: Duration,
}
impl Value for StreamValue {
type Change = Self;
fn apply(&mut self, c: &StreamValue) {
self.recordings += c.recordings;
self.duration += c.duration;
}
fn is_empty(&self) -> bool {
self.recordings == 0
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct SignalValue {
/// `states[i]` represents the amount of time spent in state `i+1`.
/// (The signal is the unknown state, 0, for the remainder of the time.)
pub states: SmallVec<[u64; 4]>,
}
impl Value for SignalValue {
type Change = SignalChange;
fn apply(&mut self, c: &SignalChange) {
if self.states.len() < usize::try_from(c.new_state).unwrap() {
self.states.resize(c.new_state as usize, 0);
}
if c.new_state > 0 {
// add to new state.
let s = &mut self.states[c.new_state as usize - 1];
let n = s
.checked_add(u64::try_from(c.duration.0).unwrap())
.unwrap_or_else(|| panic!("add range violation: s={:?} c={:?}", s, c));
*s = n;
}
if c.old_state > 0 {
// remove from old state.
let i = usize::try_from(c.old_state).unwrap() - 1;
assert!(
self.states.len() > i,
"no such old state: s={:?} c={:?}",
self,
c
);
let s = &mut self.states[c.old_state as usize - 1];
let n = s
.checked_sub(u64::try_from(c.duration.0).unwrap())
.unwrap_or_else(|| panic!("sub range violation: s={:?} c={:?}", s, c));
*s = n;
}
// Normalize.
let mut l = self.states.len();
while l > 0 && self.states[l - 1] == 0 {
l -= 1;
}
self.states.truncate(l);
}
fn is_empty(&self) -> bool {
self.states.is_empty()
}
}
/// A change to a signal within a single day.
#[derive(Debug)]
pub struct SignalChange {
/// The duration of time being altered.
duration: Duration,
/// The state of the given range before this change.
old_state: u16,
/// The state of the given range after this change.
new_state: u16,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Map<V: Value>(pub(crate) BTreeMap<Key, V>);
impl<V: Value> Map<V> {
pub fn new() -> Self {
Self(BTreeMap::new())
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn get(&self, k: &Key) -> Option<&V> {
self.0.get(k)
}
/// Adds non-zero `delta` to the day represented by `day` in the map `m`.
/// Inserts a map entry if absent; removes the entry if it has 0 entries on exit.
fn adjust_day(&mut self, day: Key, c: V::Change) {
trace!("adjust_day {} {:?}", day.as_ref(), &c);
use ::std::collections::btree_map::Entry;
match self.0.entry(day) {
Entry::Vacant(e) => e.insert(Default::default()).apply(&c),
Entry::Occupied(mut e) => {
let v = e.get_mut();
v.apply(&c);
if v.is_empty() {
e.remove_entry();
}
}
}
}
}
impl<'a, V: Value> IntoIterator for &'a Map<V> {
type Item = (&'a Key, &'a V);
type IntoIter = std::collections::btree_map::Iter<'a, Key, V>;
fn into_iter(self) -> Self::IntoIter {
self.0.iter()
}
}
impl Map<StreamValue> {
/// Adjusts `self` to reflect the range of the given recording.
/// Note that the specified range may span two days. It will never span more because the maximum
/// length of a recording entry is less than a day (even a 23-hour "spring forward" day).
///
/// This function swallows/logs date formatting errors because they shouldn't happen and there's
/// not much that can be done about them. (The database operation has already gone through.)
pub(crate) fn adjust(&mut self, r: Range<Time>, sign: i64) {
// Find first day key.
let sec = r.start.unix_seconds();
let mut my_tm = time::at(time::Timespec { sec, nsec: 0 });
let day = match Key::new(my_tm) {
Ok(d) => d,
Err(ref e) => {
error!(
"Unable to fill first day key from {:?}->{:?}: {}; will ignore.",
r, my_tm, e
);
return;
}
};
// Determine the start of the next day.
// Use mytm to hold a non-normalized representation of the boundary.
my_tm.tm_isdst = -1;
my_tm.tm_hour = 0;
my_tm.tm_min = 0;
my_tm.tm_sec = 0;
my_tm.tm_mday += 1;
let boundary = my_tm.to_timespec();
let boundary_90k = boundary.sec * TIME_UNITS_PER_SEC;
// Adjust the first day.
let first_day_delta = StreamValue {
recordings: sign,
duration: Duration(sign * (cmp::min(r.end.0, boundary_90k) - r.start.0)),
};
self.adjust_day(day, first_day_delta);
if r.end.0 <= boundary_90k {
return;
}
// Fill day with the second day. This requires a normalized representation so recalculate.
// (The C mktime(3) already normalized for us once, but .to_timespec() discarded that
// result.)
let my_tm = time::at(boundary);
let day = match Key::new(my_tm) {
Ok(d) => d,
Err(ref e) => {
error!(
"Unable to fill second day key from {:?}: {}; will ignore.",
my_tm, e
);
return;
}
};
let second_day_delta = StreamValue {
recordings: sign,
duration: Duration(sign * (r.end.0 - boundary_90k)),
};
self.adjust_day(day, second_day_delta);
}
}
impl Map<SignalValue> {
/// Adjusts `self` to reflect the range of the given recording.
/// Note that the specified range may span several days (unlike StreamValue).
///
/// This function swallows/logs date formatting errors because they shouldn't happen and there's
/// not much that can be done about them. (The database operation has already gone through.)
pub(crate) fn adjust(&mut self, mut r: Range<Time>, old_state: u16, new_state: u16) {
// Find first day key.
let sec = r.start.unix_seconds();
let mut my_tm = time::at(time::Timespec { sec, nsec: 0 });
let mut day = match Key::new(my_tm) {
Ok(d) => d,
Err(ref e) => {
error!(
"Unable to fill first day key from {:?}->{:?}: {}; will ignore.",
r, my_tm, e
);
return;
}
};
// Determine the start of the next day.
// Use mytm to hold a non-normalized representation of the boundary.
my_tm.tm_isdst = -1;
my_tm.tm_hour = 0;
my_tm.tm_min = 0;
my_tm.tm_sec = 0;
loop {
my_tm.tm_mday += 1;
let boundary_90k = my_tm.to_timespec().sec * TIME_UNITS_PER_SEC;
// Adjust this day.
let duration = Duration(cmp::min(r.end.0, boundary_90k) - r.start.0);
self.adjust_day(
day,
SignalChange {
old_state,
new_state,
duration,
},
);
if r.end.0 <= boundary_90k {
return;
}
// Fill day with the next day. This requires a normalized representation so
// recalculate. (The C mktime(3) already normalized for us once, but .to_timespec()
// discarded that result.)
let my_tm = time::at(time::Timespec {
sec: Time(boundary_90k).unix_seconds(),
nsec: 0,
});
day = match Key::new(my_tm) {
Ok(d) => d,
Err(ref e) => {
error!(
"Unable to fill day key from {:?}: {}; will ignore.",
my_tm, e
);
return;
}
};
r.start.0 = boundary_90k;
}
}
}
#[cfg(test)]
mod tests {
use super::{Key, Map, SignalValue, StreamValue};
use crate::testutil;
use base::time::{Duration, Time, TIME_UNITS_PER_SEC};
use smallvec::smallvec;
#[test]
fn test_adjust_stream() {
testutil::init();
let mut m: Map<StreamValue> = Map::new();
// Create a day.
let test_time = Time(130647162600000i64); // 2015-12-31 23:59:00 (Pacific).
let one_min = Duration(60 * TIME_UNITS_PER_SEC);
let two_min = Duration(2 * 60 * TIME_UNITS_PER_SEC);
let three_min = Duration(3 * 60 * TIME_UNITS_PER_SEC);
let four_min = Duration(4 * 60 * TIME_UNITS_PER_SEC);
let test_day1 = &Key(*b"2015-12-31");
let test_day2 = &Key(*b"2016-01-01");
m.adjust(test_time..test_time + one_min, 1);
assert_eq!(1, m.len());
assert_eq!(
Some(&StreamValue {
recordings: 1,
duration: one_min
}),
m.get(test_day1)
);
// Add to a day.
m.adjust(test_time..test_time + one_min, 1);
assert_eq!(1, m.len());
assert_eq!(
Some(&StreamValue {
recordings: 2,
duration: two_min
}),
m.get(test_day1)
);
// Subtract from a day.
m.adjust(test_time..test_time + one_min, -1);
assert_eq!(1, m.len());
assert_eq!(
Some(&StreamValue {
recordings: 1,
duration: one_min
}),
m.get(test_day1)
);
// Remove a day.
m.adjust(test_time..test_time + one_min, -1);
assert_eq!(0, m.len());
// Create two days.
m.adjust(test_time..test_time + three_min, 1);
assert_eq!(2, m.len());
assert_eq!(
Some(&StreamValue {
recordings: 1,
duration: one_min
}),
m.get(test_day1)
);
assert_eq!(
Some(&StreamValue {
recordings: 1,
duration: two_min
}),
m.get(test_day2)
);
// Add to two days.
m.adjust(test_time..test_time + three_min, 1);
assert_eq!(2, m.len());
assert_eq!(
Some(&StreamValue {
recordings: 2,
duration: two_min
}),
m.get(test_day1)
);
assert_eq!(
Some(&StreamValue {
recordings: 2,
duration: four_min
}),
m.get(test_day2)
);
// Subtract from two days.
m.adjust(test_time..test_time + three_min, -1);
assert_eq!(2, m.len());
assert_eq!(
Some(&StreamValue {
recordings: 1,
duration: one_min
}),
m.get(test_day1)
);
assert_eq!(
Some(&StreamValue {
recordings: 1,
duration: two_min
}),
m.get(test_day2)
);
// Remove two days.
m.adjust(test_time..test_time + three_min, -1);
assert_eq!(0, m.len());
}
#[test]
fn test_adjust_signal() {
testutil::init();
let mut m: Map<SignalValue> = Map::new();
let test_time = Time(130646844000000i64); // 2015-12-31 23:00:00 (Pacific).
let hr = Duration(60 * 60 * TIME_UNITS_PER_SEC);
let test_day1 = &Key(*b"2015-12-31");
let test_day2 = &Key(*b"2016-01-01");
let test_day3 = &Key(*b"2016-01-02");
m.adjust(test_time..test_time + hr * 30, 0, 3);
assert_eq!(3, m.len());
assert_eq!(
m.get(test_day1),
Some(&SignalValue {
states: smallvec![0, 0, hr.0 as u64],
})
);
assert_eq!(
m.get(test_day2),
Some(&SignalValue {
states: smallvec![0, 0, 24 * hr.0 as u64],
})
);
assert_eq!(
m.get(test_day3),
Some(&SignalValue {
states: smallvec![0, 0, 5 * hr.0 as u64],
})
);
m.adjust(Time(130647168000000)..Time(130654944000000), 3, 1); // entire 2016-01-01
assert_eq!(3, m.len());
assert_eq!(
m.get(test_day1),
Some(&SignalValue {
states: smallvec![0, 0, hr.0 as u64],
})
);
assert_eq!(
m.get(test_day2),
Some(&SignalValue {
states: smallvec![24 * hr.0 as u64],
})
);
assert_eq!(
m.get(test_day3),
Some(&SignalValue {
states: smallvec![0, 0, 5 * hr.0 as u64],
})
);
m.adjust(Time(130647168000000)..Time(130654944000000), 1, 0); // entire 2016-01-01
assert_eq!(2, m.len());
assert_eq!(
m.get(test_day1),
Some(&SignalValue {
states: smallvec![0, 0, hr.0 as u64],
})
);
assert_eq!(
m.get(test_day3),
Some(&SignalValue {
states: smallvec![0, 0, 5 * hr.0 as u64],
})
);
}
#[test]
fn test_day_bounds() {
testutil::init();
assert_eq!(
Key(*b"2017-10-10").bounds(), // normal day (24 hrs)
Time(135685692000000)..Time(135693468000000)
);
assert_eq!(
Key(*b"2017-03-12").bounds(), // spring forward (23 hrs)
Time(134037504000000)..Time(134044956000000)
);
assert_eq!(
Key(*b"2017-11-05").bounds(), // fall back (25 hrs)
Time(135887868000000)..Time(135895968000000)
);
}
}

View File

@ -27,9 +27,10 @@
//! cycles.
use crate::auth;
use crate::days;
use crate::dir;
use crate::raw;
use crate::recording::{self, TIME_UNITS_PER_SEC};
use crate::recording;
use crate::schema;
use crate::signal;
use base::clock::{self, Clocks};
@ -47,14 +48,12 @@ use std::cmp;
use std::collections::{BTreeMap, VecDeque};
use std::convert::TryInto;
use std::fmt::Write as _;
use std::io::Write;
use std::mem;
use std::ops::Range;
use std::str;
use std::string::String;
use std::sync::Arc;
use std::vec::Vec;
use time;
use uuid::Uuid;
/// Expected schema version. See `guide/schema.md` for more information.
@ -278,50 +277,6 @@ pub(crate) struct ListOldestRecordingsRow {
pub sample_file_bytes: i32,
}
/// A calendar day in `YYYY-mm-dd` format.
#[derive(Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
pub struct StreamDayKey([u8; 10]);
impl StreamDayKey {
fn new(tm: time::Tm) -> Result<Self, Error> {
let mut s = StreamDayKey([0u8; 10]);
write!(&mut s.0[..], "{}", tm.strftime("%Y-%m-%d")?)?;
Ok(s)
}
pub fn bounds(&self) -> Range<recording::Time> {
let mut my_tm = time::strptime(self.as_ref(), "%Y-%m-%d").expect("days must be parseable");
my_tm.tm_utcoff = 1; // to the time crate, values != 0 mean local time.
my_tm.tm_isdst = -1;
let start = recording::Time(my_tm.to_timespec().sec * recording::TIME_UNITS_PER_SEC);
my_tm.tm_hour = 0;
my_tm.tm_min = 0;
my_tm.tm_sec = 0;
my_tm.tm_mday += 1;
let end = recording::Time(my_tm.to_timespec().sec * recording::TIME_UNITS_PER_SEC);
start..end
}
}
impl AsRef<str> for StreamDayKey {
fn as_ref(&self) -> &str {
str::from_utf8(&self.0[..]).expect("days are always UTF-8")
}
}
/// In-memory state about a particular camera on a particular day.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct StreamDayValue {
/// The number of recordings that overlap with this day. Note that `adjust_day` automatically
/// prunes days with 0 recordings.
pub recordings: i64,
/// The total wall duration recorded on this day. This can be 0; because frames' durations are
/// taken from the time of the next frame, a recording that ends unexpectedly after a single
/// frame will have 0 duration of that frame and thus the whole recording.
pub duration: recording::Duration,
}
#[derive(Debug)]
pub struct SampleFileDir {
pub id: i32,
@ -473,7 +428,7 @@ pub struct Stream {
/// Mapping of calendar day (in the server's time zone) to a summary of committed recordings on
/// that day.
pub committed_days: BTreeMap<StreamDayKey, StreamDayValue>,
pub committed_days: days::Map<days::StreamValue>,
pub record: bool,
/// The `cum_recordings` currently committed to the database.
@ -538,97 +493,6 @@ pub struct CameraChange {
pub streams: [StreamChange; 2],
}
/// Adds non-zero `delta` to the day represented by `day` in the map `m`.
/// Inserts a map entry if absent; removes the entry if it has 0 entries on exit.
fn adjust_day(
day: StreamDayKey,
delta: StreamDayValue,
m: &mut BTreeMap<StreamDayKey, StreamDayValue>,
) {
use ::std::collections::btree_map::Entry;
match m.entry(day) {
Entry::Vacant(e) => {
e.insert(delta);
}
Entry::Occupied(mut e) => {
let v = e.get_mut();
v.recordings += delta.recordings;
v.duration += delta.duration;
if v.recordings == 0 {
e.remove_entry();
}
}
}
}
/// Adjusts the day map `m` to reflect the range of the given recording.
/// Note that the specified range may span two days. It will never span more because the maximum
/// length of a recording entry is less than a day (even a 23-hour "spring forward" day).
///
/// This function swallows/logs date formatting errors because they shouldn't happen and there's
/// not much that can be done about them. (The database operation has already gone through.)
fn adjust_days(
r: Range<recording::Time>,
sign: i64,
m: &mut BTreeMap<StreamDayKey, StreamDayValue>,
) {
// Find first day key.
let mut my_tm = time::at(time::Timespec {
sec: r.start.unix_seconds(),
nsec: 0,
});
let day = match StreamDayKey::new(my_tm) {
Ok(d) => d,
Err(ref e) => {
error!(
"Unable to fill first day key from {:?}: {}; will ignore.",
my_tm, e
);
return;
}
};
// Determine the start of the next day.
// Use mytm to hold a non-normalized representation of the boundary.
my_tm.tm_isdst = -1;
my_tm.tm_hour = 0;
my_tm.tm_min = 0;
my_tm.tm_sec = 0;
my_tm.tm_mday += 1;
let boundary = my_tm.to_timespec();
let boundary_90k = boundary.sec * TIME_UNITS_PER_SEC;
// Adjust the first day.
let first_day_delta = StreamDayValue {
recordings: sign,
duration: recording::Duration(sign * (cmp::min(r.end.0, boundary_90k) - r.start.0)),
};
adjust_day(day, first_day_delta, m);
if r.end.0 <= boundary_90k {
return;
}
// Fill day with the second day. This requires a normalized representation so recalculate.
// (The C mktime(3) already normalized for us once, but .to_timespec() discarded that result.)
let my_tm = time::at(boundary);
let day = match StreamDayKey::new(my_tm) {
Ok(d) => d,
Err(ref e) => {
error!(
"Unable to fill second day key from {:?}: {}; will ignore.",
my_tm, e
);
return;
}
};
let second_day_delta = StreamDayValue {
recordings: sign,
duration: recording::Duration(sign * (r.end.0 - boundary_90k)),
};
adjust_day(day, second_day_delta, m);
}
impl Stream {
/// Adds a single fully committed recording with the given properties to the in-memory state.
fn add_recording(&mut self, r: Range<recording::Time>, sample_file_bytes: i32) {
@ -639,18 +503,17 @@ impl Stream {
self.duration += r.end - r.start;
self.sample_file_bytes += sample_file_bytes as i64;
self.fs_bytes += round_up(i64::from(sample_file_bytes));
adjust_days(r, 1, &mut self.committed_days);
self.committed_days.adjust(r, 1);
}
/// Returns a days map including unflushed recordings.
pub fn days(&self) -> BTreeMap<StreamDayKey, StreamDayValue> {
pub fn days(&self) -> days::Map<days::StreamValue> {
let mut days = self.committed_days.clone();
for u in &self.uncommitted {
let l = u.lock();
adjust_days(
days.adjust(
l.start..l.start + recording::Duration(i64::from(l.wall_duration_90k)),
1,
&mut days,
);
}
days
@ -887,7 +750,7 @@ impl StreamStateChanger {
bytes_to_add: 0,
fs_bytes_to_add: 0,
duration: recording::Duration(0),
committed_days: BTreeMap::new(),
committed_days: days::Map::new(),
record: sc.record,
cum_recordings: 0,
cum_media_duration: recording::Duration(0),
@ -963,7 +826,7 @@ impl LockedDatabase {
Some(s) => {
let l = s.lock();
r.prev_media_duration =
l.prev_media_duration + recording::Duration(l.wall_duration_90k.into());
l.prev_media_duration + recording::Duration(l.media_duration_90k.into());
r.prev_runs = l.prev_runs + if l.run_offset == 0 { 1 } else { 0 };
}
None => {
@ -1191,7 +1054,7 @@ impl LockedDatabase {
dir.garbage_needs_unlink.insert(row.id);
let d = recording::Duration(i64::from(row.wall_duration_90k));
s.duration -= d;
adjust_days(row.start..row.start + d, -1, &mut s.committed_days);
s.committed_days.adjust(row.start..row.start + d, -1);
}
// Process add_recordings.
@ -1790,7 +1653,7 @@ impl LockedDatabase {
bytes_to_add: 0,
fs_bytes_to_add: 0,
duration: recording::Duration(0),
committed_days: BTreeMap::new(),
committed_days: days::Map::new(),
cum_recordings: row.get(7)?,
cum_media_duration: recording::Duration(row.get(8)?),
cum_runs: row.get(9)?,
@ -2458,13 +2321,11 @@ impl<'db, C: Clocks + Clone> ::std::ops::DerefMut for DatabaseGuard<'db, C> {
#[cfg(test)]
mod tests {
use super::adjust_days; // non-public.
use super::*;
use crate::recording::{self, TIME_UNITS_PER_SEC};
use crate::testutil;
use base::clock;
use rusqlite::Connection;
use std::collections::BTreeMap;
use uuid::Uuid;
fn setup_conn() -> Connection {
@ -2566,131 +2427,6 @@ mod tests {
// TODO: with_recording_playback.
}
#[test]
fn test_adjust_days() {
testutil::init();
let mut m = BTreeMap::new();
// Create a day.
let test_time = recording::Time(130647162600000i64); // 2015-12-31 23:59:00 (Pacific).
let one_min = recording::Duration(60 * TIME_UNITS_PER_SEC);
let two_min = recording::Duration(2 * 60 * TIME_UNITS_PER_SEC);
let three_min = recording::Duration(3 * 60 * TIME_UNITS_PER_SEC);
let four_min = recording::Duration(4 * 60 * TIME_UNITS_PER_SEC);
let test_day1 = &StreamDayKey(*b"2015-12-31");
let test_day2 = &StreamDayKey(*b"2016-01-01");
adjust_days(test_time..test_time + one_min, 1, &mut m);
assert_eq!(1, m.len());
assert_eq!(
Some(&StreamDayValue {
recordings: 1,
duration: one_min
}),
m.get(test_day1)
);
// Add to a day.
adjust_days(test_time..test_time + one_min, 1, &mut m);
assert_eq!(1, m.len());
assert_eq!(
Some(&StreamDayValue {
recordings: 2,
duration: two_min
}),
m.get(test_day1)
);
// Subtract from a day.
adjust_days(test_time..test_time + one_min, -1, &mut m);
assert_eq!(1, m.len());
assert_eq!(
Some(&StreamDayValue {
recordings: 1,
duration: one_min
}),
m.get(test_day1)
);
// Remove a day.
adjust_days(test_time..test_time + one_min, -1, &mut m);
assert_eq!(0, m.len());
// Create two days.
adjust_days(test_time..test_time + three_min, 1, &mut m);
assert_eq!(2, m.len());
assert_eq!(
Some(&StreamDayValue {
recordings: 1,
duration: one_min
}),
m.get(test_day1)
);
assert_eq!(
Some(&StreamDayValue {
recordings: 1,
duration: two_min
}),
m.get(test_day2)
);
// Add to two days.
adjust_days(test_time..test_time + three_min, 1, &mut m);
assert_eq!(2, m.len());
assert_eq!(
Some(&StreamDayValue {
recordings: 2,
duration: two_min
}),
m.get(test_day1)
);
assert_eq!(
Some(&StreamDayValue {
recordings: 2,
duration: four_min
}),
m.get(test_day2)
);
// Subtract from two days.
adjust_days(test_time..test_time + three_min, -1, &mut m);
assert_eq!(2, m.len());
assert_eq!(
Some(&StreamDayValue {
recordings: 1,
duration: one_min
}),
m.get(test_day1)
);
assert_eq!(
Some(&StreamDayValue {
recordings: 1,
duration: two_min
}),
m.get(test_day2)
);
// Remove two days.
adjust_days(test_time..test_time + three_min, -1, &mut m);
assert_eq!(0, m.len());
}
#[test]
fn test_day_bounds() {
testutil::init();
assert_eq!(
StreamDayKey(*b"2017-10-10").bounds(), // normal day (24 hrs)
recording::Time(135685692000000)..recording::Time(135693468000000)
);
assert_eq!(
StreamDayKey(*b"2017-03-12").bounds(), // spring forward (23 hrs)
recording::Time(134037504000000)..recording::Time(134044956000000)
);
assert_eq!(
StreamDayKey(*b"2017-11-05").bounds(), // fall back (25 hrs)
recording::Time(135887868000000)..recording::Time(135895968000000)
);
}
#[test]
fn test_no_meta_or_version() {
testutil::init();

View File

@ -8,6 +8,7 @@ pub mod auth;
pub mod check;
mod coding;
mod compare;
pub mod days;
pub mod db;
pub mod dir;
mod fs;

View File

@ -192,7 +192,8 @@ create table recording (
wall_duration_90k integer not null
check (wall_duration_90k >= 0 and wall_duration_90k < 5*60*90000),
-- TODO: comment.
-- The media-time duration of the recording, relative to wall_duration_90k.
-- That is, media_duration_90k = wall_duration_90k + media_duration_delta_90k.
media_duration_delta_90k integer not null,
video_samples integer not null check (video_samples > 0),
@ -412,6 +413,7 @@ create table user_session (
create index user_session_uid on user_session (user_id);
-- Timeseries with an enum value.
create table signal (
id integer primary key,
@ -427,8 +429,8 @@ create table signal (
-- uuids, such as "Elk security system watcher".
type_uuid blob not null check (length(type_uuid) = 16),
-- a short human-readable description of the event to use in mouseovers or event
-- lists, such as "driveway motion" or "front door open".
-- a short human-readable description to use in mouseovers or event lists,
-- such as "driveway motion" or "front door open".
short_name not null,
unique (source_uuid, type_uuid)

View File

@ -2,9 +2,9 @@
// Copyright (C) 2019 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt.
// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception.
use crate::coding;
use crate::db::FromSqlUuid;
use crate::recording;
use crate::{coding, days};
use base::bail_t;
use failure::{bail, format_err, Error};
use fnv::FnvHashMap;
@ -24,6 +24,11 @@ pub(crate) struct State {
/// state for every `Type`.
types_by_uuid: FnvHashMap<Uuid, Type>,
/// All points in time.
/// Invariants, checked by `State::debug_assert_point_invariants`:
/// * the first point must have an empty previous state (all signals at state 0).
/// * each point's prev state matches the previous point's after state.
/// * the last point must have an empty final state (all signals changed to state 0).
points_by_time: BTreeMap<recording::Time, Point>,
/// Times which need to be flushed to the database.
@ -89,14 +94,7 @@ impl Point {
while let Some((signal, state)) = it.next().expect("in-mem prev is valid") {
after.insert(signal, state);
}
let mut it = self.changes();
while let Some((signal, state)) = it.next().expect("in-mem changes is valid") {
if state == 0 {
after.remove(&signal);
} else {
after.insert(signal, state);
}
}
self.changes().update_map(&mut after);
after
}
}
@ -173,6 +171,16 @@ impl<'a> PointDataIterator<'a> {
}
Ok(out)
}
fn update_map(mut self, m: &mut BTreeMap<u32, u16>) {
while let Some((signal, state)) = self.next().expect("in-mem changes is valid") {
if state == 0 {
m.remove(&signal);
} else {
m.insert(signal, state);
}
}
}
}
/// Representation of a `signal_camera` row.
@ -205,13 +213,17 @@ impl State {
})?;
let mut signals_by_id = State::init_signals(conn)?;
State::fill_signal_cameras(conn, &mut signals_by_id)?;
Ok(State {
let mut points_by_time = BTreeMap::new();
State::fill_points(conn, &mut points_by_time, &mut signals_by_id)?;
let s = State {
max_signal_changes,
signals_by_id,
types_by_uuid: State::init_types(conn)?,
points_by_time: State::init_points(conn)?,
points_by_time,
dirty_by_time: BTreeSet::new(),
})
};
s.debug_assert_point_invariants();
Ok(s)
}
pub fn list_changes_by_time(
@ -260,9 +272,10 @@ impl State {
}
// Apply the end before the start so that the `prev` state can be examined.
self.update_signals_end(when.end, signals, states);
self.update_signals_end(when.clone(), signals, states);
self.update_signals_start(when.start, signals, states);
self.update_signals_middle(when, signals, states);
self.debug_assert_point_invariants();
self.gc();
Ok(())
@ -287,16 +300,50 @@ impl State {
to_remove
);
self.gc_days(to_remove);
let remove: smallvec::SmallVec<[recording::Time; 4]> = self
.points_by_time
.keys()
.take(to_remove)
.map(|p| *p)
.map(|t| *t)
.collect();
for p in &remove {
self.points_by_time.remove(p);
self.dirty_by_time.insert(*p);
for t in &remove {
self.points_by_time.remove(t);
self.dirty_by_time.insert(*t);
}
// Update the first remaining point to keep state starting from it unchanged.
let (t, p) = match self.points_by_time.iter_mut().next() {
Some(e) => e,
None => return,
};
let combined = p.after();
p.changes_off = 0;
p.data = serialize(&combined).into_boxed_slice();
self.dirty_by_time.insert(*t);
self.debug_assert_point_invariants();
}
/// Adjusts each signal's days index to reflect garbage-collecting the first `to_remove` points.
fn gc_days(&mut self, to_remove: usize) {
let mut it = self.points_by_time.iter().take(to_remove + 1);
let (mut prev_time, mut prev_state) = match it.next() {
None => return, // nothing to do.
Some(p) => (*p.0, p.1.after()),
};
for (&new_time, point) in it {
let mut changes = point.changes();
while let Some((signal, state)) = changes.next().expect("in-mem points valid") {
let s = self
.signals_by_id
.get_mut(&signal)
.expect("in-mem point signals valid");
let prev_state = prev_state.entry(signal).or_default();
s.days.adjust(prev_time..new_time, *prev_state, state);
*prev_state = state;
}
prev_time = new_time;
}
}
@ -335,16 +382,35 @@ impl State {
}
/// Helper for `update_signals` to apply the end point.
fn update_signals_end(&mut self, end: recording::Time, signals: &[u32], states: &[u16]) {
fn update_signals_end(
&mut self,
when: Range<recording::Time>,
signals: &[u32],
states: &[u16],
) {
let mut prev;
let mut changes = BTreeMap::<u32, u16>::new();
if let Some((&t, ref mut p)) = self.points_by_time.range_mut(..=end).next_back() {
if t == end {
let prev_t = self
.points_by_time
.range(when.clone())
.next_back()
.map(|e| *e.0)
.unwrap_or(when.start);
let days_range = prev_t..when.end;
if let Some((&t, ref mut p)) = self.points_by_time.range_mut(..=when.end).next_back() {
if t == when.end {
// Already have a point at end. Adjust it. prev starts unchanged...
prev = p.prev().to_map().expect("in-mem prev is valid");
// ...and then prev and changes are altered to reflect the desired update.
State::update_signals_end_maps(signals, states, &mut prev, &mut changes);
State::update_signals_end_maps(
signals,
states,
days_range,
&mut self.signals_by_id,
&mut prev,
&mut changes,
);
// If this doesn't alter the new state, don't dirty the database.
if changes.is_empty() {
@ -372,31 +438,44 @@ impl State {
}
// Create a new end point if necessary.
State::update_signals_end_maps(signals, states, &mut prev, &mut changes);
State::update_signals_end_maps(
signals,
states,
days_range,
&mut self.signals_by_id,
&mut prev,
&mut changes,
);
if changes.is_empty() {
return;
}
self.dirty_by_time.insert(end);
self.dirty_by_time.insert(when.end);
self.points_by_time
.insert(end, Point::new(&prev, &serialize(&changes)));
.insert(when.end, Point::new(&prev, &serialize(&changes)));
}
/// Helper for `update_signals_end`. Adjusts `prev` (the state prior to the end point) to
/// reflect the desired update (in `signals` and `states`). Adjusts `changes` (changes to
/// execute at the end point) to undo the change.
/// execute at the end point) to undo the change. Adjust each signal's days index for
/// the range from the penultimate point of the range (or lacking that, its start) to the end.
fn update_signals_end_maps(
signals: &[u32],
states: &[u16],
days_range: Range<recording::Time>,
signals_by_id: &mut BTreeMap<u32, Signal>,
prev: &mut BTreeMap<u32, u16>,
changes: &mut BTreeMap<u32, u16>,
) {
for (&signal, &state) in signals.iter().zip(states) {
let old_state;
match prev.entry(signal) {
Entry::Vacant(e) => {
old_state = 0;
changes.insert(signal, 0);
e.insert(state);
}
Entry::Occupied(mut e) => {
old_state = *e.get();
if state == 0 {
changes.insert(signal, *e.get());
e.remove();
@ -406,6 +485,11 @@ impl State {
}
}
}
signals_by_id
.get_mut(&signal)
.expect("signal valid")
.days
.adjust(days_range.clone(), old_state, state);
}
}
@ -469,6 +553,7 @@ impl State {
}
/// Helper for `update_signals` to apply all points in `(when.start, when.end)`.
/// This also updates each signal's days index for the points it finds.
fn update_signals_middle(
&mut self,
when: Range<recording::Time>,
@ -477,13 +562,17 @@ impl State {
) {
let mut to_delete = Vec::new();
let after_start = recording::Time(when.start.0 + 1);
let mut prev_t = when.start;
for (&t, ref mut p) in self.points_by_time.range_mut(after_start..when.end) {
let mut prev = p.prev().to_map().expect("in-mem prev is valid");
// Update prev to reflect desired update.
// Update prev to reflect desired update; likewise each signal's days index.
for (&signal, &state) in signals.iter().zip(states) {
let s = self.signals_by_id.get_mut(&signal).expect("valid signals");
let prev_state;
match prev.entry(signal) {
Entry::Occupied(mut e) => {
prev_state = *e.get();
if state == 0 {
e.remove_entry();
} else if *e.get() != state {
@ -491,11 +580,14 @@ impl State {
}
}
Entry::Vacant(e) => {
prev_state = 0;
if state != 0 {
e.insert(state);
}
}
}
s.days.adjust(prev_t..t, prev_state, state);
prev_t = t;
}
// Trim changes to omit any change to signals.
@ -593,13 +685,20 @@ impl State {
type_: type_.0,
short_name: row.get(3)?,
cameras: Vec::new(),
days: days::Map::new(),
},
);
}
Ok(signals)
}
fn init_points(conn: &Connection) -> Result<BTreeMap<recording::Time, Point>, Error> {
/// Fills `points_by_time` from the database, also filling the `days`
/// index of each signal.
fn fill_points(
conn: &Connection,
points_by_time: &mut BTreeMap<recording::Time, Point>,
signals_by_id: &mut BTreeMap<u32, Signal>,
) -> Result<(), Error> {
let mut stmt = conn.prepare(
r#"
select
@ -611,22 +710,43 @@ impl State {
"#,
)?;
let mut rows = stmt.query(params![])?;
let mut points = BTreeMap::new();
let mut cur = BTreeMap::new(); // latest signal -> state, where state != 0
let mut sig_last_state = BTreeMap::new();
while let Some(row) = rows.next()? {
let time_90k = recording::Time(row.get(0)?);
let changes = row.get_raw_checked(1)?.as_blob()?;
let before = cur.clone();
let mut it = PointDataIterator::new(changes);
while let Some((signal, state)) = it.next()? {
let e = sig_last_state.entry(signal);
if let Entry::Occupied(ref e) = e {
let (prev_time, prev_state) = *e.get();
let s = signals_by_id.get_mut(&signal).ok_or_else(|| {
format_err!("time {} references invalid signal {}", time_90k, signal)
})?;
s.days.adjust(prev_time..time_90k, 0, prev_state);
}
if state == 0 {
cur.remove(&signal);
if let Entry::Occupied(e) = e {
e.remove_entry();
}
} else {
cur.insert(signal, state);
*e.or_default() = (time_90k, state);
}
}
points.insert(time_90k, Point::new(&cur, changes));
points_by_time.insert(time_90k, Point::new(&before, changes));
}
Ok(points)
if !cur.is_empty() {
bail!(
"far future state should be unknown for all signals; is: {:?}",
cur
);
}
Ok(())
}
/// Fills the `cameras` field of the `Signal` structs within the supplied `signals`.
@ -702,6 +822,25 @@ impl State {
pub fn types_by_uuid(&self) -> &FnvHashMap<Uuid, Type> {
&self.types_by_uuid
}
#[cfg(not(debug_assertions))]
fn debug_assert_point_invariants(&self) {}
/// Checks invariants on `points_by_time` (expensive).
#[cfg(debug_assertions)]
fn debug_assert_point_invariants(&self) {
let mut expected_prev = BTreeMap::new();
for (t, p) in self.points_by_time.iter() {
let cur = p.prev().to_map().expect("in-mem prev is valid");
assert_eq!(&expected_prev, &cur, "time {} prev mismatch", t);
p.changes().update_map(&mut expected_prev);
}
assert_eq!(
expected_prev.len(),
0,
"last point final state should be empty"
);
}
}
/// Representation of a `signal` row.
@ -714,6 +853,8 @@ pub struct Signal {
/// The cameras this signal is associated with. Sorted by camera id, which is unique.
pub cameras: Vec<SignalCamera>,
pub days: days::Map<days::SignalValue>,
}
/// Representation of a `signal_type_enum` row.
@ -738,6 +879,7 @@ mod tests {
use super::*;
use crate::{db, testutil};
use rusqlite::Connection;
use smallvec::smallvec;
#[test]
fn test_point_data_it() {
@ -821,6 +963,22 @@ mod tests {
&mut |r| rows.push(*r),
);
assert_eq!(&rows[..], EXPECTED);
let mut expected_days = days::Map::new();
expected_days.0.insert(
days::Key(*b"2019-04-26"),
days::SignalValue {
states: smallvec![0, (NOW - START).0 as u64],
},
);
assert_eq!(&s.signals_by_id.get(&1).unwrap().days, &expected_days);
expected_days.0.clear();
expected_days.0.insert(
days::Key(*b"2019-04-26"),
days::SignalValue {
states: smallvec![(NOW - START).0 as u64],
},
);
assert_eq!(&s.signals_by_id.get(&2).unwrap().days, &expected_days);
{
let tx = conn.transaction().unwrap();

View File

@ -6,7 +6,6 @@ use db::auth::SessionHash;
use failure::{format_err, Error};
use serde::ser::{Error as _, SerializeMap, SerializeSeq, Serializer};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::ops::Not;
use uuid::Uuid;
@ -86,7 +85,7 @@ pub struct Stream<'a> {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(serialize_with = "Stream::serialize_days")]
pub days: Option<BTreeMap<db::StreamDayKey, db::StreamDayValue>>,
pub days: Option<db::days::Map<db::days::StreamValue>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub config: Option<StreamConfig<'a>>,
@ -107,6 +106,10 @@ pub struct Signal<'a> {
pub source: Uuid,
pub type_: Uuid,
pub short_name: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(serialize_with = "Signal::serialize_days")]
pub days: Option<&'a db::days::Map<db::days::SignalValue>>,
}
#[derive(Deserialize)]
@ -251,7 +254,7 @@ impl<'a> Stream<'a> {
}
fn serialize_days<S>(
days: &Option<BTreeMap<db::StreamDayKey, db::StreamDayValue>>,
days: &Option<db::days::Map<db::days::StreamValue>>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
@ -276,13 +279,14 @@ impl<'a> Stream<'a> {
}
impl<'a> Signal<'a> {
pub fn wrap(s: &'a db::Signal, db: &'a db::LockedDatabase, _include_days: bool) -> Self {
pub fn wrap(s: &'a db::Signal, db: &'a db::LockedDatabase, include_days: bool) -> Self {
Signal {
id: s.id,
cameras: (s, db),
source: s.source,
type_: s.type_,
short_name: &s.short_name,
days: if include_days { Some(&s.days) } else { None },
}
}
@ -307,6 +311,30 @@ impl<'a> Signal<'a> {
}
map.end()
}
fn serialize_days<S>(
days: &Option<&db::days::Map<db::days::SignalValue>>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let days = match *days {
Some(d) => d,
None => return serializer.serialize_none(),
};
let mut map = serializer.serialize_map(Some(days.len()))?;
for (k, v) in days {
map.serialize_key(k.as_ref())?;
let bounds = k.bounds();
map.serialize_value(&SignalDayValue {
start_time_90k: bounds.start.0,
end_time_90k: bounds.end.0,
states: &v.states[..],
})?;
}
map.end()
}
}
impl<'a> SignalType<'a> {
@ -348,6 +376,14 @@ struct StreamDayValue {
pub total_duration_90k: i64,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct SignalDayValue<'a> {
pub start_time_90k: i64,
pub end_time_90k: i64,
pub states: &'a [u64],
}
impl<'a> TopLevel<'a> {
/// Serializes cameras as a list (rather than a map), optionally including the `days` and
/// `cameras` fields.

View File

@ -1336,7 +1336,7 @@ impl FileBuilder {
struct Entry {
segment_duration: u64,
media_time: u64,
};
}
let mut flushed: Vec<Entry> = Vec::new();
let mut unflushed: Entry = Default::default();
let mut cur_media_time: u64 = 0;

View File

@ -14,8 +14,8 @@ use db::dir::SampleFileDir;
use db::{auth, recording};
use failure::{bail, format_err, Error};
use fnv::FnvHashMap;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use futures::{future::Either, sink::SinkExt};
use http::header::{self, HeaderValue};
use http::{status::StatusCode, Request, Response};
use http_serve::dir::FsDir;
@ -443,7 +443,7 @@ impl Service {
stream_id: i32,
open_id: u32,
req: hyper::Request<hyper::Body>,
mut sub_rx: futures::channel::mpsc::UnboundedReceiver<db::LiveSegment>,
sub_rx: futures::channel::mpsc::UnboundedReceiver<db::LiveSegment>,
) {
let upgraded = match hyper::upgrade::on(req).await {
Ok(u) => u,
@ -452,33 +452,60 @@ impl Service {
return;
}
};
let mut ws = tokio_tungstenite::WebSocketStream::from_raw_socket(
let ws = tokio_tungstenite::WebSocketStream::from_raw_socket(
upgraded,
tungstenite::protocol::Role::Server,
None,
)
.await;
// Start the first segment at a key frame to reduce startup latency.
let mut start_at_key = true;
loop {
let live = match sub_rx.next().await {
Some(l) => l,
None => return,
};
info!("chunk: is_key={:?}", live.is_key);
if let Err(e) = self
.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live, start_at_key)
.await
{
info!("Dropping WebSocket after error: {}", e);
return;
}
start_at_key = false;
if let Err(e) = self
.stream_live_m4s_ws_loop(stream_id, open_id, sub_rx, ws)
.await
{
info!("Dropping WebSocket after error: {}", e);
}
}
/// Helper for `stream_live_m4s_ws` that returns error when the stream is dropped.
/// The outer function logs the error.
async fn stream_live_m4s_ws_loop(
self: Arc<Self>,
stream_id: i32,
open_id: u32,
sub_rx: futures::channel::mpsc::UnboundedReceiver<db::LiveSegment>,
mut ws: tokio_tungstenite::WebSocketStream<hyper::upgrade::Upgraded>,
) -> Result<(), Error> {
let keepalive = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
std::time::Duration::new(30, 0),
));
let mut combo = futures::stream::select(
sub_rx.map(|s| Either::Left(s)),
keepalive.map(|_| Either::Right(())),
);
// On the first LiveSegment, send all the data from the previous key frame onward.
// For LiveSegments, it's okay to send a single non-key frame at a time.
let mut start_at_key = true;
loop {
let next = combo
.next()
.await
.unwrap_or_else(|| unreachable!("timer stream never ends"));
match next {
Either::Left(live) => {
self.stream_live_m4s_chunk(open_id, stream_id, &mut ws, live, start_at_key)
.await?;
start_at_key = false;
}
Either::Right(_) => {
ws.send(tungstenite::Message::Ping(Vec::new())).await?;
}
}
}
}
/// Sends a single live segment chunk of a `live.m4s` stream.
async fn stream_live_m4s_chunk(
&self,
open_id: u32,