mirror of
https://github.com/scottlamb/moonfire-nvr.git
synced 2024-12-26 23:25:55 -05:00
extract & generalize calendar day indexes
My main goal is to support creating indexes for signals as well as recordings. An additional goal is to just shrink db.rs a bit; it's gotten quite large.
This commit is contained in:
parent
3c057af896
commit
3ec60b85a3
320
server/db/days.rs
Normal file
320
server/db/days.rs
Normal file
@ -0,0 +1,320 @@
|
|||||||
|
// This file is part of Moonfire NVR, a security camera network video recorder.
|
||||||
|
// Copyright (C) 2021 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt.
|
||||||
|
// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception.
|
||||||
|
|
||||||
|
//! In-memory indexes by calendar day.
|
||||||
|
|
||||||
|
use crate::recording::{self, Time};
|
||||||
|
use failure::Error;
|
||||||
|
use log::{error, trace};
|
||||||
|
use std::cmp;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::io::Write;
|
||||||
|
use std::ops::Range;
|
||||||
|
use std::str;
|
||||||
|
|
||||||
|
/// A calendar day in `YYYY-mm-dd` format.
|
||||||
|
#[derive(Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
|
||||||
|
pub struct Key([u8; 10]);
|
||||||
|
|
||||||
|
impl Key {
|
||||||
|
fn new(tm: time::Tm) -> Result<Self, Error> {
|
||||||
|
let mut s = Key([0u8; 10]);
|
||||||
|
write!(&mut s.0[..], "{}", tm.strftime("%Y-%m-%d")?)?;
|
||||||
|
Ok(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn bounds(&self) -> Range<Time> {
|
||||||
|
let mut my_tm = time::strptime(self.as_ref(), "%Y-%m-%d").expect("days must be parseable");
|
||||||
|
my_tm.tm_utcoff = 1; // to the time crate, values != 0 mean local time.
|
||||||
|
my_tm.tm_isdst = -1;
|
||||||
|
let start = Time(my_tm.to_timespec().sec * recording::TIME_UNITS_PER_SEC);
|
||||||
|
my_tm.tm_hour = 0;
|
||||||
|
my_tm.tm_min = 0;
|
||||||
|
my_tm.tm_sec = 0;
|
||||||
|
my_tm.tm_mday += 1;
|
||||||
|
let end = Time(my_tm.to_timespec().sec * recording::TIME_UNITS_PER_SEC);
|
||||||
|
start..end
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsRef<str> for Key {
|
||||||
|
fn as_ref(&self) -> &str {
|
||||||
|
str::from_utf8(&self.0[..]).expect("days are always UTF-8")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait Value: std::fmt::Debug + Default {
|
||||||
|
type Change: std::fmt::Debug;
|
||||||
|
|
||||||
|
/// Applies the given change to this value.
|
||||||
|
fn apply(&mut self, c: &Self::Change);
|
||||||
|
|
||||||
|
fn is_empty(&self) -> bool;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// In-memory state about a particular stream on a particular day.
|
||||||
|
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
|
||||||
|
pub struct StreamValue {
|
||||||
|
/// The number of recordings that overlap with this day.
|
||||||
|
pub recordings: i64,
|
||||||
|
|
||||||
|
/// The total duration recorded on this day. This can be 0; because frames' durations are taken
|
||||||
|
/// from the time of the next frame, a recording that ends unexpectedly after a single frame
|
||||||
|
/// will have 0 duration of that frame and thus the whole recording.
|
||||||
|
pub duration: recording::Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Value for StreamValue {
|
||||||
|
type Change = Self;
|
||||||
|
|
||||||
|
fn apply(&mut self, c: &StreamValue) {
|
||||||
|
self.recordings += c.recordings;
|
||||||
|
self.duration += c.duration;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_empty(&self) -> bool {
|
||||||
|
self.recordings == 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct Map<V: Value>(BTreeMap<Key, V>);
|
||||||
|
|
||||||
|
impl<V: Value> Map<V> {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self(BTreeMap::new())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.0.len()
|
||||||
|
}
|
||||||
|
pub fn get(&self, k: &Key) -> Option<&V> {
|
||||||
|
self.0.get(k)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Adds non-zero `delta` to the day represented by `day` in the map `m`.
|
||||||
|
/// Inserts a map entry if absent; removes the entry if it has 0 entries on exit.
|
||||||
|
fn adjust_day(&mut self, day: Key, c: V::Change) {
|
||||||
|
trace!("adjust_day {} {:?}", day.as_ref(), &c);
|
||||||
|
use ::std::collections::btree_map::Entry;
|
||||||
|
match self.0.entry(day) {
|
||||||
|
Entry::Vacant(e) => e.insert(Default::default()).apply(&c),
|
||||||
|
Entry::Occupied(mut e) => {
|
||||||
|
let v = e.get_mut();
|
||||||
|
v.apply(&c);
|
||||||
|
if v.is_empty() {
|
||||||
|
e.remove_entry();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, V: Value> IntoIterator for &'a Map<V> {
|
||||||
|
type Item = (&'a Key, &'a V);
|
||||||
|
type IntoIter = std::collections::btree_map::Iter<'a, Key, V>;
|
||||||
|
|
||||||
|
fn into_iter(self) -> Self::IntoIter {
|
||||||
|
self.0.iter()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Map<StreamValue> {
|
||||||
|
/// Adjusts `self` to reflect the range of the given recording.
|
||||||
|
/// Note that the specified range may span two days. It will never span more because the maximum
|
||||||
|
/// length of a recording entry is less than a day (even a 23-hour "spring forward" day).
|
||||||
|
///
|
||||||
|
/// This function swallows/logs date formatting errors because they shouldn't happen and there's
|
||||||
|
/// not much that can be done about them. (The database operation has already gone through.)
|
||||||
|
pub(crate) fn adjust(&mut self, r: Range<Time>, sign: i64) {
|
||||||
|
// Find first day key.
|
||||||
|
let mut my_tm = time::at(time::Timespec {
|
||||||
|
sec: r.start.unix_seconds(),
|
||||||
|
nsec: 0,
|
||||||
|
});
|
||||||
|
let day = match Key::new(my_tm) {
|
||||||
|
Ok(d) => d,
|
||||||
|
Err(ref e) => {
|
||||||
|
error!(
|
||||||
|
"Unable to fill first day key from {:?}: {}; will ignore.",
|
||||||
|
my_tm, e
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Determine the start of the next day.
|
||||||
|
// Use mytm to hold a non-normalized representation of the boundary.
|
||||||
|
my_tm.tm_isdst = -1;
|
||||||
|
my_tm.tm_hour = 0;
|
||||||
|
my_tm.tm_min = 0;
|
||||||
|
my_tm.tm_sec = 0;
|
||||||
|
my_tm.tm_mday += 1;
|
||||||
|
let boundary = my_tm.to_timespec();
|
||||||
|
let boundary_90k = boundary.sec * recording::TIME_UNITS_PER_SEC;
|
||||||
|
|
||||||
|
// Adjust the first day.
|
||||||
|
let first_day_delta = StreamValue {
|
||||||
|
recordings: sign,
|
||||||
|
duration: recording::Duration(sign * (cmp::min(r.end.0, boundary_90k) - r.start.0)),
|
||||||
|
};
|
||||||
|
self.adjust_day(day, first_day_delta);
|
||||||
|
|
||||||
|
if r.end.0 <= boundary_90k {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fill day with the second day. This requires a normalized representation so recalculate.
|
||||||
|
// (The C mktime(3) already normalized for us once, but .to_timespec() discarded that
|
||||||
|
// result.)
|
||||||
|
let my_tm = time::at(boundary);
|
||||||
|
let day = match Key::new(my_tm) {
|
||||||
|
Ok(d) => d,
|
||||||
|
Err(ref e) => {
|
||||||
|
error!(
|
||||||
|
"Unable to fill second day key from {:?}: {}; will ignore.",
|
||||||
|
my_tm, e
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let second_day_delta = StreamValue {
|
||||||
|
recordings: sign,
|
||||||
|
duration: recording::Duration(sign * (r.end.0 - boundary_90k)),
|
||||||
|
};
|
||||||
|
self.adjust_day(day, second_day_delta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::{Key, Map, StreamValue};
|
||||||
|
use crate::recording::{self, TIME_UNITS_PER_SEC};
|
||||||
|
use crate::testutil;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_adjust_stream() {
|
||||||
|
testutil::init();
|
||||||
|
let mut m: Map<StreamValue> = Map::new();
|
||||||
|
|
||||||
|
// Create a day.
|
||||||
|
let test_time = recording::Time(130647162600000i64); // 2015-12-31 23:59:00 (Pacific).
|
||||||
|
let one_min = recording::Duration(60 * TIME_UNITS_PER_SEC);
|
||||||
|
let two_min = recording::Duration(2 * 60 * TIME_UNITS_PER_SEC);
|
||||||
|
let three_min = recording::Duration(3 * 60 * TIME_UNITS_PER_SEC);
|
||||||
|
let four_min = recording::Duration(4 * 60 * TIME_UNITS_PER_SEC);
|
||||||
|
let test_day1 = &Key(*b"2015-12-31");
|
||||||
|
let test_day2 = &Key(*b"2016-01-01");
|
||||||
|
m.adjust(test_time..test_time + one_min, 1);
|
||||||
|
assert_eq!(1, m.len());
|
||||||
|
assert_eq!(
|
||||||
|
Some(&StreamValue {
|
||||||
|
recordings: 1,
|
||||||
|
duration: one_min
|
||||||
|
}),
|
||||||
|
m.get(test_day1)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Add to a day.
|
||||||
|
m.adjust(test_time..test_time + one_min, 1);
|
||||||
|
assert_eq!(1, m.len());
|
||||||
|
assert_eq!(
|
||||||
|
Some(&StreamValue {
|
||||||
|
recordings: 2,
|
||||||
|
duration: two_min
|
||||||
|
}),
|
||||||
|
m.get(test_day1)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Subtract from a day.
|
||||||
|
m.adjust(test_time..test_time + one_min, -1);
|
||||||
|
assert_eq!(1, m.len());
|
||||||
|
assert_eq!(
|
||||||
|
Some(&StreamValue {
|
||||||
|
recordings: 1,
|
||||||
|
duration: one_min
|
||||||
|
}),
|
||||||
|
m.get(test_day1)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Remove a day.
|
||||||
|
m.adjust(test_time..test_time + one_min, -1);
|
||||||
|
assert_eq!(0, m.len());
|
||||||
|
|
||||||
|
// Create two days.
|
||||||
|
m.adjust(test_time..test_time + three_min, 1);
|
||||||
|
assert_eq!(2, m.len());
|
||||||
|
assert_eq!(
|
||||||
|
Some(&StreamValue {
|
||||||
|
recordings: 1,
|
||||||
|
duration: one_min
|
||||||
|
}),
|
||||||
|
m.get(test_day1)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
Some(&StreamValue {
|
||||||
|
recordings: 1,
|
||||||
|
duration: two_min
|
||||||
|
}),
|
||||||
|
m.get(test_day2)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Add to two days.
|
||||||
|
m.adjust(test_time..test_time + three_min, 1);
|
||||||
|
assert_eq!(2, m.len());
|
||||||
|
assert_eq!(
|
||||||
|
Some(&StreamValue {
|
||||||
|
recordings: 2,
|
||||||
|
duration: two_min
|
||||||
|
}),
|
||||||
|
m.get(test_day1)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
Some(&StreamValue {
|
||||||
|
recordings: 2,
|
||||||
|
duration: four_min
|
||||||
|
}),
|
||||||
|
m.get(test_day2)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Subtract from two days.
|
||||||
|
m.adjust(test_time..test_time + three_min, -1);
|
||||||
|
assert_eq!(2, m.len());
|
||||||
|
assert_eq!(
|
||||||
|
Some(&StreamValue {
|
||||||
|
recordings: 1,
|
||||||
|
duration: one_min
|
||||||
|
}),
|
||||||
|
m.get(test_day1)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
Some(&StreamValue {
|
||||||
|
recordings: 1,
|
||||||
|
duration: two_min
|
||||||
|
}),
|
||||||
|
m.get(test_day2)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Remove two days.
|
||||||
|
m.adjust(test_time..test_time + three_min, -1);
|
||||||
|
assert_eq!(0, m.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_day_bounds() {
|
||||||
|
testutil::init();
|
||||||
|
assert_eq!(
|
||||||
|
Key(*b"2017-10-10").bounds(), // normal day (24 hrs)
|
||||||
|
recording::Time(135685692000000)..recording::Time(135693468000000)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
Key(*b"2017-03-12").bounds(), // spring forward (23 hrs)
|
||||||
|
recording::Time(134037504000000)..recording::Time(134044956000000)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
Key(*b"2017-11-05").bounds(), // fall back (25 hrs)
|
||||||
|
recording::Time(135887868000000)..recording::Time(135895968000000)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
282
server/db/db.rs
282
server/db/db.rs
@ -27,9 +27,10 @@
|
|||||||
//! cycles.
|
//! cycles.
|
||||||
|
|
||||||
use crate::auth;
|
use crate::auth;
|
||||||
|
use crate::days;
|
||||||
use crate::dir;
|
use crate::dir;
|
||||||
use crate::raw;
|
use crate::raw;
|
||||||
use crate::recording::{self, TIME_UNITS_PER_SEC};
|
use crate::recording;
|
||||||
use crate::schema;
|
use crate::schema;
|
||||||
use crate::signal;
|
use crate::signal;
|
||||||
use base::clock::{self, Clocks};
|
use base::clock::{self, Clocks};
|
||||||
@ -47,14 +48,12 @@ use std::cmp;
|
|||||||
use std::collections::{BTreeMap, VecDeque};
|
use std::collections::{BTreeMap, VecDeque};
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::fmt::Write as _;
|
use std::fmt::Write as _;
|
||||||
use std::io::Write;
|
|
||||||
use std::mem;
|
use std::mem;
|
||||||
use std::ops::Range;
|
use std::ops::Range;
|
||||||
use std::str;
|
use std::str;
|
||||||
use std::string::String;
|
use std::string::String;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::vec::Vec;
|
use std::vec::Vec;
|
||||||
use time;
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
/// Expected schema version. See `guide/schema.md` for more information.
|
/// Expected schema version. See `guide/schema.md` for more information.
|
||||||
@ -278,50 +277,6 @@ pub(crate) struct ListOldestRecordingsRow {
|
|||||||
pub sample_file_bytes: i32,
|
pub sample_file_bytes: i32,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A calendar day in `YYYY-mm-dd` format.
|
|
||||||
#[derive(Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
|
|
||||||
pub struct StreamDayKey([u8; 10]);
|
|
||||||
|
|
||||||
impl StreamDayKey {
|
|
||||||
fn new(tm: time::Tm) -> Result<Self, Error> {
|
|
||||||
let mut s = StreamDayKey([0u8; 10]);
|
|
||||||
write!(&mut s.0[..], "{}", tm.strftime("%Y-%m-%d")?)?;
|
|
||||||
Ok(s)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn bounds(&self) -> Range<recording::Time> {
|
|
||||||
let mut my_tm = time::strptime(self.as_ref(), "%Y-%m-%d").expect("days must be parseable");
|
|
||||||
my_tm.tm_utcoff = 1; // to the time crate, values != 0 mean local time.
|
|
||||||
my_tm.tm_isdst = -1;
|
|
||||||
let start = recording::Time(my_tm.to_timespec().sec * recording::TIME_UNITS_PER_SEC);
|
|
||||||
my_tm.tm_hour = 0;
|
|
||||||
my_tm.tm_min = 0;
|
|
||||||
my_tm.tm_sec = 0;
|
|
||||||
my_tm.tm_mday += 1;
|
|
||||||
let end = recording::Time(my_tm.to_timespec().sec * recording::TIME_UNITS_PER_SEC);
|
|
||||||
start..end
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsRef<str> for StreamDayKey {
|
|
||||||
fn as_ref(&self) -> &str {
|
|
||||||
str::from_utf8(&self.0[..]).expect("days are always UTF-8")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// In-memory state about a particular camera on a particular day.
|
|
||||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
|
||||||
pub struct StreamDayValue {
|
|
||||||
/// The number of recordings that overlap with this day. Note that `adjust_day` automatically
|
|
||||||
/// prunes days with 0 recordings.
|
|
||||||
pub recordings: i64,
|
|
||||||
|
|
||||||
/// The total wall duration recorded on this day. This can be 0; because frames' durations are
|
|
||||||
/// taken from the time of the next frame, a recording that ends unexpectedly after a single
|
|
||||||
/// frame will have 0 duration of that frame and thus the whole recording.
|
|
||||||
pub duration: recording::Duration,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct SampleFileDir {
|
pub struct SampleFileDir {
|
||||||
pub id: i32,
|
pub id: i32,
|
||||||
@ -473,7 +428,7 @@ pub struct Stream {
|
|||||||
|
|
||||||
/// Mapping of calendar day (in the server's time zone) to a summary of committed recordings on
|
/// Mapping of calendar day (in the server's time zone) to a summary of committed recordings on
|
||||||
/// that day.
|
/// that day.
|
||||||
pub committed_days: BTreeMap<StreamDayKey, StreamDayValue>,
|
pub committed_days: days::Map<days::StreamValue>,
|
||||||
pub record: bool,
|
pub record: bool,
|
||||||
|
|
||||||
/// The `cum_recordings` currently committed to the database.
|
/// The `cum_recordings` currently committed to the database.
|
||||||
@ -538,97 +493,6 @@ pub struct CameraChange {
|
|||||||
pub streams: [StreamChange; 2],
|
pub streams: [StreamChange; 2],
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Adds non-zero `delta` to the day represented by `day` in the map `m`.
|
|
||||||
/// Inserts a map entry if absent; removes the entry if it has 0 entries on exit.
|
|
||||||
fn adjust_day(
|
|
||||||
day: StreamDayKey,
|
|
||||||
delta: StreamDayValue,
|
|
||||||
m: &mut BTreeMap<StreamDayKey, StreamDayValue>,
|
|
||||||
) {
|
|
||||||
use ::std::collections::btree_map::Entry;
|
|
||||||
match m.entry(day) {
|
|
||||||
Entry::Vacant(e) => {
|
|
||||||
e.insert(delta);
|
|
||||||
}
|
|
||||||
Entry::Occupied(mut e) => {
|
|
||||||
let v = e.get_mut();
|
|
||||||
v.recordings += delta.recordings;
|
|
||||||
v.duration += delta.duration;
|
|
||||||
if v.recordings == 0 {
|
|
||||||
e.remove_entry();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Adjusts the day map `m` to reflect the range of the given recording.
|
|
||||||
/// Note that the specified range may span two days. It will never span more because the maximum
|
|
||||||
/// length of a recording entry is less than a day (even a 23-hour "spring forward" day).
|
|
||||||
///
|
|
||||||
/// This function swallows/logs date formatting errors because they shouldn't happen and there's
|
|
||||||
/// not much that can be done about them. (The database operation has already gone through.)
|
|
||||||
fn adjust_days(
|
|
||||||
r: Range<recording::Time>,
|
|
||||||
sign: i64,
|
|
||||||
m: &mut BTreeMap<StreamDayKey, StreamDayValue>,
|
|
||||||
) {
|
|
||||||
// Find first day key.
|
|
||||||
let mut my_tm = time::at(time::Timespec {
|
|
||||||
sec: r.start.unix_seconds(),
|
|
||||||
nsec: 0,
|
|
||||||
});
|
|
||||||
let day = match StreamDayKey::new(my_tm) {
|
|
||||||
Ok(d) => d,
|
|
||||||
Err(ref e) => {
|
|
||||||
error!(
|
|
||||||
"Unable to fill first day key from {:?}: {}; will ignore.",
|
|
||||||
my_tm, e
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Determine the start of the next day.
|
|
||||||
// Use mytm to hold a non-normalized representation of the boundary.
|
|
||||||
my_tm.tm_isdst = -1;
|
|
||||||
my_tm.tm_hour = 0;
|
|
||||||
my_tm.tm_min = 0;
|
|
||||||
my_tm.tm_sec = 0;
|
|
||||||
my_tm.tm_mday += 1;
|
|
||||||
let boundary = my_tm.to_timespec();
|
|
||||||
let boundary_90k = boundary.sec * TIME_UNITS_PER_SEC;
|
|
||||||
|
|
||||||
// Adjust the first day.
|
|
||||||
let first_day_delta = StreamDayValue {
|
|
||||||
recordings: sign,
|
|
||||||
duration: recording::Duration(sign * (cmp::min(r.end.0, boundary_90k) - r.start.0)),
|
|
||||||
};
|
|
||||||
adjust_day(day, first_day_delta, m);
|
|
||||||
|
|
||||||
if r.end.0 <= boundary_90k {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fill day with the second day. This requires a normalized representation so recalculate.
|
|
||||||
// (The C mktime(3) already normalized for us once, but .to_timespec() discarded that result.)
|
|
||||||
let my_tm = time::at(boundary);
|
|
||||||
let day = match StreamDayKey::new(my_tm) {
|
|
||||||
Ok(d) => d,
|
|
||||||
Err(ref e) => {
|
|
||||||
error!(
|
|
||||||
"Unable to fill second day key from {:?}: {}; will ignore.",
|
|
||||||
my_tm, e
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let second_day_delta = StreamDayValue {
|
|
||||||
recordings: sign,
|
|
||||||
duration: recording::Duration(sign * (r.end.0 - boundary_90k)),
|
|
||||||
};
|
|
||||||
adjust_day(day, second_day_delta, m);
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Stream {
|
impl Stream {
|
||||||
/// Adds a single fully committed recording with the given properties to the in-memory state.
|
/// Adds a single fully committed recording with the given properties to the in-memory state.
|
||||||
fn add_recording(&mut self, r: Range<recording::Time>, sample_file_bytes: i32) {
|
fn add_recording(&mut self, r: Range<recording::Time>, sample_file_bytes: i32) {
|
||||||
@ -639,18 +503,17 @@ impl Stream {
|
|||||||
self.duration += r.end - r.start;
|
self.duration += r.end - r.start;
|
||||||
self.sample_file_bytes += sample_file_bytes as i64;
|
self.sample_file_bytes += sample_file_bytes as i64;
|
||||||
self.fs_bytes += round_up(i64::from(sample_file_bytes));
|
self.fs_bytes += round_up(i64::from(sample_file_bytes));
|
||||||
adjust_days(r, 1, &mut self.committed_days);
|
self.committed_days.adjust(r, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a days map including unflushed recordings.
|
/// Returns a days map including unflushed recordings.
|
||||||
pub fn days(&self) -> BTreeMap<StreamDayKey, StreamDayValue> {
|
pub fn days(&self) -> days::Map<days::StreamValue> {
|
||||||
let mut days = self.committed_days.clone();
|
let mut days = self.committed_days.clone();
|
||||||
for u in &self.uncommitted {
|
for u in &self.uncommitted {
|
||||||
let l = u.lock();
|
let l = u.lock();
|
||||||
adjust_days(
|
days.adjust(
|
||||||
l.start..l.start + recording::Duration(i64::from(l.wall_duration_90k)),
|
l.start..l.start + recording::Duration(i64::from(l.wall_duration_90k)),
|
||||||
1,
|
1,
|
||||||
&mut days,
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
days
|
days
|
||||||
@ -887,7 +750,7 @@ impl StreamStateChanger {
|
|||||||
bytes_to_add: 0,
|
bytes_to_add: 0,
|
||||||
fs_bytes_to_add: 0,
|
fs_bytes_to_add: 0,
|
||||||
duration: recording::Duration(0),
|
duration: recording::Duration(0),
|
||||||
committed_days: BTreeMap::new(),
|
committed_days: days::Map::new(),
|
||||||
record: sc.record,
|
record: sc.record,
|
||||||
cum_recordings: 0,
|
cum_recordings: 0,
|
||||||
cum_media_duration: recording::Duration(0),
|
cum_media_duration: recording::Duration(0),
|
||||||
@ -1191,7 +1054,7 @@ impl LockedDatabase {
|
|||||||
dir.garbage_needs_unlink.insert(row.id);
|
dir.garbage_needs_unlink.insert(row.id);
|
||||||
let d = recording::Duration(i64::from(row.wall_duration_90k));
|
let d = recording::Duration(i64::from(row.wall_duration_90k));
|
||||||
s.duration -= d;
|
s.duration -= d;
|
||||||
adjust_days(row.start..row.start + d, -1, &mut s.committed_days);
|
s.committed_days.adjust(row.start..row.start + d, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process add_recordings.
|
// Process add_recordings.
|
||||||
@ -1790,7 +1653,7 @@ impl LockedDatabase {
|
|||||||
bytes_to_add: 0,
|
bytes_to_add: 0,
|
||||||
fs_bytes_to_add: 0,
|
fs_bytes_to_add: 0,
|
||||||
duration: recording::Duration(0),
|
duration: recording::Duration(0),
|
||||||
committed_days: BTreeMap::new(),
|
committed_days: days::Map::new(),
|
||||||
cum_recordings: row.get(7)?,
|
cum_recordings: row.get(7)?,
|
||||||
cum_media_duration: recording::Duration(row.get(8)?),
|
cum_media_duration: recording::Duration(row.get(8)?),
|
||||||
cum_runs: row.get(9)?,
|
cum_runs: row.get(9)?,
|
||||||
@ -2458,13 +2321,11 @@ impl<'db, C: Clocks + Clone> ::std::ops::DerefMut for DatabaseGuard<'db, C> {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::adjust_days; // non-public.
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::recording::{self, TIME_UNITS_PER_SEC};
|
use crate::recording::{self, TIME_UNITS_PER_SEC};
|
||||||
use crate::testutil;
|
use crate::testutil;
|
||||||
use base::clock;
|
use base::clock;
|
||||||
use rusqlite::Connection;
|
use rusqlite::Connection;
|
||||||
use std::collections::BTreeMap;
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
fn setup_conn() -> Connection {
|
fn setup_conn() -> Connection {
|
||||||
@ -2566,131 +2427,6 @@ mod tests {
|
|||||||
// TODO: with_recording_playback.
|
// TODO: with_recording_playback.
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_adjust_days() {
|
|
||||||
testutil::init();
|
|
||||||
let mut m = BTreeMap::new();
|
|
||||||
|
|
||||||
// Create a day.
|
|
||||||
let test_time = recording::Time(130647162600000i64); // 2015-12-31 23:59:00 (Pacific).
|
|
||||||
let one_min = recording::Duration(60 * TIME_UNITS_PER_SEC);
|
|
||||||
let two_min = recording::Duration(2 * 60 * TIME_UNITS_PER_SEC);
|
|
||||||
let three_min = recording::Duration(3 * 60 * TIME_UNITS_PER_SEC);
|
|
||||||
let four_min = recording::Duration(4 * 60 * TIME_UNITS_PER_SEC);
|
|
||||||
let test_day1 = &StreamDayKey(*b"2015-12-31");
|
|
||||||
let test_day2 = &StreamDayKey(*b"2016-01-01");
|
|
||||||
adjust_days(test_time..test_time + one_min, 1, &mut m);
|
|
||||||
assert_eq!(1, m.len());
|
|
||||||
assert_eq!(
|
|
||||||
Some(&StreamDayValue {
|
|
||||||
recordings: 1,
|
|
||||||
duration: one_min
|
|
||||||
}),
|
|
||||||
m.get(test_day1)
|
|
||||||
);
|
|
||||||
|
|
||||||
// Add to a day.
|
|
||||||
adjust_days(test_time..test_time + one_min, 1, &mut m);
|
|
||||||
assert_eq!(1, m.len());
|
|
||||||
assert_eq!(
|
|
||||||
Some(&StreamDayValue {
|
|
||||||
recordings: 2,
|
|
||||||
duration: two_min
|
|
||||||
}),
|
|
||||||
m.get(test_day1)
|
|
||||||
);
|
|
||||||
|
|
||||||
// Subtract from a day.
|
|
||||||
adjust_days(test_time..test_time + one_min, -1, &mut m);
|
|
||||||
assert_eq!(1, m.len());
|
|
||||||
assert_eq!(
|
|
||||||
Some(&StreamDayValue {
|
|
||||||
recordings: 1,
|
|
||||||
duration: one_min
|
|
||||||
}),
|
|
||||||
m.get(test_day1)
|
|
||||||
);
|
|
||||||
|
|
||||||
// Remove a day.
|
|
||||||
adjust_days(test_time..test_time + one_min, -1, &mut m);
|
|
||||||
assert_eq!(0, m.len());
|
|
||||||
|
|
||||||
// Create two days.
|
|
||||||
adjust_days(test_time..test_time + three_min, 1, &mut m);
|
|
||||||
assert_eq!(2, m.len());
|
|
||||||
assert_eq!(
|
|
||||||
Some(&StreamDayValue {
|
|
||||||
recordings: 1,
|
|
||||||
duration: one_min
|
|
||||||
}),
|
|
||||||
m.get(test_day1)
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
Some(&StreamDayValue {
|
|
||||||
recordings: 1,
|
|
||||||
duration: two_min
|
|
||||||
}),
|
|
||||||
m.get(test_day2)
|
|
||||||
);
|
|
||||||
|
|
||||||
// Add to two days.
|
|
||||||
adjust_days(test_time..test_time + three_min, 1, &mut m);
|
|
||||||
assert_eq!(2, m.len());
|
|
||||||
assert_eq!(
|
|
||||||
Some(&StreamDayValue {
|
|
||||||
recordings: 2,
|
|
||||||
duration: two_min
|
|
||||||
}),
|
|
||||||
m.get(test_day1)
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
Some(&StreamDayValue {
|
|
||||||
recordings: 2,
|
|
||||||
duration: four_min
|
|
||||||
}),
|
|
||||||
m.get(test_day2)
|
|
||||||
);
|
|
||||||
|
|
||||||
// Subtract from two days.
|
|
||||||
adjust_days(test_time..test_time + three_min, -1, &mut m);
|
|
||||||
assert_eq!(2, m.len());
|
|
||||||
assert_eq!(
|
|
||||||
Some(&StreamDayValue {
|
|
||||||
recordings: 1,
|
|
||||||
duration: one_min
|
|
||||||
}),
|
|
||||||
m.get(test_day1)
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
Some(&StreamDayValue {
|
|
||||||
recordings: 1,
|
|
||||||
duration: two_min
|
|
||||||
}),
|
|
||||||
m.get(test_day2)
|
|
||||||
);
|
|
||||||
|
|
||||||
// Remove two days.
|
|
||||||
adjust_days(test_time..test_time + three_min, -1, &mut m);
|
|
||||||
assert_eq!(0, m.len());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_day_bounds() {
|
|
||||||
testutil::init();
|
|
||||||
assert_eq!(
|
|
||||||
StreamDayKey(*b"2017-10-10").bounds(), // normal day (24 hrs)
|
|
||||||
recording::Time(135685692000000)..recording::Time(135693468000000)
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
StreamDayKey(*b"2017-03-12").bounds(), // spring forward (23 hrs)
|
|
||||||
recording::Time(134037504000000)..recording::Time(134044956000000)
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
StreamDayKey(*b"2017-11-05").bounds(), // fall back (25 hrs)
|
|
||||||
recording::Time(135887868000000)..recording::Time(135895968000000)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_no_meta_or_version() {
|
fn test_no_meta_or_version() {
|
||||||
testutil::init();
|
testutil::init();
|
||||||
|
@ -8,6 +8,7 @@ pub mod auth;
|
|||||||
pub mod check;
|
pub mod check;
|
||||||
mod coding;
|
mod coding;
|
||||||
mod compare;
|
mod compare;
|
||||||
|
pub mod days;
|
||||||
pub mod db;
|
pub mod db;
|
||||||
pub mod dir;
|
pub mod dir;
|
||||||
mod fs;
|
mod fs;
|
||||||
|
@ -6,7 +6,6 @@ use db::auth::SessionHash;
|
|||||||
use failure::{format_err, Error};
|
use failure::{format_err, Error};
|
||||||
use serde::ser::{Error as _, SerializeMap, SerializeSeq, Serializer};
|
use serde::ser::{Error as _, SerializeMap, SerializeSeq, Serializer};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::BTreeMap;
|
|
||||||
use std::ops::Not;
|
use std::ops::Not;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
@ -86,7 +85,7 @@ pub struct Stream<'a> {
|
|||||||
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
#[serde(serialize_with = "Stream::serialize_days")]
|
#[serde(serialize_with = "Stream::serialize_days")]
|
||||||
pub days: Option<BTreeMap<db::StreamDayKey, db::StreamDayValue>>,
|
pub days: Option<db::days::Map<db::days::StreamValue>>,
|
||||||
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub config: Option<StreamConfig<'a>>,
|
pub config: Option<StreamConfig<'a>>,
|
||||||
@ -251,7 +250,7 @@ impl<'a> Stream<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn serialize_days<S>(
|
fn serialize_days<S>(
|
||||||
days: &Option<BTreeMap<db::StreamDayKey, db::StreamDayValue>>,
|
days: &Option<db::days::Map<db::days::StreamValue>>,
|
||||||
serializer: S,
|
serializer: S,
|
||||||
) -> Result<S::Ok, S::Error>
|
) -> Result<S::Ok, S::Error>
|
||||||
where
|
where
|
||||||
|
Loading…
Reference in New Issue
Block a user