json-based config for cameras and streams

for #155

The config interface code for changing cameras is quite messy but
seems to work for now.
This commit is contained in:
Scott Lamb
2021-09-10 16:31:03 -07:00
parent 070400095d
commit dafd9041d6
14 changed files with 694 additions and 315 deletions

View File

@@ -54,7 +54,6 @@ use std::str;
use std::string::String;
use std::sync::Arc;
use std::vec::Vec;
use url::Url;
use uuid::Uuid;
/// Expected schema version. See `guide/schema.md` for more information.
@@ -359,24 +358,25 @@ pub struct Camera {
pub id: i32,
pub uuid: Uuid,
pub short_name: String,
pub description: String,
pub onvif_host: String,
pub username: Option<String>,
pub password: Option<String>,
pub streams: [Option<i32>; 2],
pub config: crate::json::CameraConfig,
pub streams: [Option<i32>; NUM_STREAM_TYPES],
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum StreamType {
Main,
Sub,
Ext,
}
pub const NUM_STREAM_TYPES: usize = 3;
impl StreamType {
pub fn from_index(i: usize) -> Option<Self> {
match i {
0 => Some(StreamType::Main),
1 => Some(StreamType::Sub),
2 => Some(StreamType::Ext),
_ => None,
}
}
@@ -385,6 +385,7 @@ impl StreamType {
match self {
StreamType::Main => 0,
StreamType::Sub => 1,
StreamType::Ext => 2,
}
}
@@ -392,6 +393,7 @@ impl StreamType {
match self {
StreamType::Main => "main",
StreamType::Sub => "sub",
StreamType::Ext => "ext",
}
}
@@ -399,6 +401,7 @@ impl StreamType {
match type_ {
"main" => Some(StreamType::Main),
"sub" => Some(StreamType::Sub),
"ext" => Some(StreamType::Ext),
_ => None,
}
}
@@ -410,16 +413,15 @@ impl ::std::fmt::Display for StreamType {
}
}
pub const ALL_STREAM_TYPES: [StreamType; 2] = [StreamType::Main, StreamType::Sub];
pub const ALL_STREAM_TYPES: [StreamType; NUM_STREAM_TYPES] =
[StreamType::Main, StreamType::Sub, StreamType::Ext];
pub struct Stream {
pub id: i32,
pub camera_id: i32,
pub sample_file_dir_id: Option<i32>,
pub type_: StreamType,
pub rtsp_url: String,
pub retain_bytes: i64,
pub flush_if_sec: i64,
pub config: crate::json::StreamConfig,
/// The time range of recorded data associated with this stream (minimum start time and maximum
/// end time). `None` iff there are no recordings for this camera.
@@ -455,7 +457,6 @@ pub struct Stream {
/// Mapping of calendar day (in the server's time zone) to a summary of committed recordings on
/// that day.
pub committed_days: days::Map<days::StreamValue>,
pub record: bool,
/// The `cum_recordings` currently committed to the database.
pub(crate) cum_recordings: i32,
@@ -500,24 +501,19 @@ pub struct LiveSegment {
#[derive(Clone, Debug, Default)]
pub struct StreamChange {
pub sample_file_dir_id: Option<i32>,
pub rtsp_url: Option<Url>,
pub record: bool,
pub flush_if_sec: i64,
pub config: crate::json::StreamConfig,
}
/// Information about a camera, used by `add_camera` and `update_camera`.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct CameraChange {
pub short_name: String,
pub description: String,
pub onvif_host: String,
pub username: Option<String>,
pub password: Option<String>,
pub config: crate::json::CameraConfig,
/// `StreamType t` is represented by `streams[t.index()]`. A default StreamChange will
/// correspond to no stream in the database, provided there are no existing recordings for that
/// stream.
pub streams: [StreamChange; 2],
pub streams: [StreamChange; NUM_STREAM_TYPES],
}
impl Stream {
@@ -528,7 +524,7 @@ impl Stream {
None => r.start..r.end,
});
self.duration += r.end - r.start;
self.sample_file_bytes += sample_file_bytes as i64;
self.sample_file_bytes += i64::from(sample_file_bytes);
self.fs_bytes += round_up(i64::from(sample_file_bytes));
self.committed_days.adjust(r, 1);
}
@@ -645,7 +641,7 @@ impl ::std::fmt::Display for CompositeId {
/// Inserts, updates, or removes streams in the `State` object to match a set of `StreamChange`
/// structs.
struct StreamStateChanger {
sids: [Option<i32>; 2],
sids: [Option<i32>; NUM_STREAM_TYPES],
streams: Vec<(i32, Option<(i32, StreamType, StreamChange)>)>,
}
@@ -659,8 +655,8 @@ impl StreamStateChanger {
streams_by_id: &BTreeMap<i32, Stream>,
change: &mut CameraChange,
) -> Result<Self, Error> {
let mut sids = [None; 2];
let mut streams = Vec::with_capacity(2);
let mut sids = [None; NUM_STREAM_TYPES];
let mut streams = Vec::with_capacity(NUM_STREAM_TYPES);
let existing_streams = existing.map(|e| e.streams).unwrap_or_default();
for (i, ref mut sc) in change.streams.iter_mut().enumerate() {
let type_ = StreamType::from_index(i).unwrap();
@@ -681,11 +677,7 @@ impl StreamStateChanger {
);
}
}
if !have_data
&& sc.rtsp_url.is_none()
&& sc.sample_file_dir_id.is_none()
&& !sc.record
{
if !have_data && sc.config.is_empty() && sc.sample_file_dir_id.is_none() {
// Delete stream.
let mut stmt = tx.prepare_cached(
r#"
@@ -701,18 +693,14 @@ impl StreamStateChanger {
let mut stmt = tx.prepare_cached(
r#"
update stream set
rtsp_url = :rtsp_url,
record = :record,
flush_if_sec = :flush_if_sec,
config = :config,
sample_file_dir_id = :sample_file_dir_id
where
id = :id
"#,
)?;
let rows = stmt.execute(named_params! {
":rtsp_url": &sc.rtsp_url.as_ref().map(Url::as_str),
":record": sc.record,
":flush_if_sec": sc.flush_if_sec,
":config": &sc.config,
":sample_file_dir_id": sc.sample_file_dir_id,
":id": sid,
})?;
@@ -724,28 +712,24 @@ impl StreamStateChanger {
streams.push((sid, Some((camera_id, type_, sc))));
}
} else {
if sc.rtsp_url.is_none() && sc.sample_file_dir_id.is_none() && !sc.record {
if sc.config.is_empty() && sc.sample_file_dir_id.is_none() {
// Do nothing; there is no record and we want to keep it that way.
continue;
}
// Insert stream.
let mut stmt = tx.prepare_cached(
r#"
insert into stream (camera_id, sample_file_dir_id, type, rtsp_url, record,
retain_bytes, flush_if_sec, cum_recordings,
cum_media_duration_90k, cum_runs)
values (:camera_id, :sample_file_dir_id, :type, :rtsp_url, :record,
0, :flush_if_sec, 0,
0, 0)
insert into stream (camera_id, sample_file_dir_id, type, config,
cum_recordings, cum_media_duration_90k, cum_runs)
values (:camera_id, :sample_file_dir_id, :type, :config,
0, 0, 0)
"#,
)?;
stmt.execute(named_params! {
":camera_id": camera_id,
":sample_file_dir_id": sc.sample_file_dir_id,
":type": type_.as_str(),
":rtsp_url": &sc.rtsp_url.as_ref().map(Url::as_str),
":record": sc.record,
":flush_if_sec": sc.flush_if_sec,
":config": &sc.config,
})?;
let id = tx.last_insert_rowid() as i32;
sids[i] = Some(id);
@@ -758,19 +742,20 @@ impl StreamStateChanger {
/// Applies the change to the given `streams_by_id`. The caller is expected to set
/// `Camera::streams` to the return value.
fn apply(mut self, streams_by_id: &mut BTreeMap<i32, Stream>) -> [Option<i32>; 2] {
fn apply(
mut self,
streams_by_id: &mut BTreeMap<i32, Stream>,
) -> [Option<i32>; NUM_STREAM_TYPES] {
for (id, stream) in self.streams.drain(..) {
use ::std::collections::btree_map::Entry;
match (streams_by_id.entry(id), stream) {
(Entry::Vacant(e), Some((camera_id, type_, mut sc))) => {
(Entry::Vacant(e), Some((camera_id, type_, sc))) => {
e.insert(Stream {
id,
type_,
camera_id,
sample_file_dir_id: sc.sample_file_dir_id,
rtsp_url: sc.rtsp_url.take().map(String::from).unwrap_or_default(),
retain_bytes: 0,
flush_if_sec: sc.flush_if_sec,
config: sc.config,
range: None,
sample_file_bytes: 0,
fs_bytes: 0,
@@ -781,7 +766,6 @@ impl StreamStateChanger {
fs_bytes_to_add: 0,
duration: recording::Duration(0),
committed_days: days::Map::default(),
record: sc.record,
cum_recordings: 0,
cum_media_duration: recording::Duration(0),
cum_runs: 0,
@@ -791,12 +775,10 @@ impl StreamStateChanger {
});
}
(Entry::Vacant(_), None) => {}
(Entry::Occupied(e), Some((_, _, mut sc))) => {
(Entry::Occupied(e), Some((_, _, sc))) => {
let e = e.into_mut();
e.sample_file_dir_id = sc.sample_file_dir_id;
e.rtsp_url = sc.rtsp_url.take().map(String::from).unwrap_or_default();
e.record = sc.record;
e.flush_if_sec = sc.flush_if_sec;
e.config = sc.config;
}
(Entry::Occupied(e), None) => {
e.remove();
@@ -1596,10 +1578,7 @@ impl LockedDatabase {
id,
uuid,
short_name,
description,
onvif_host,
username,
password
config
from
camera;
"#,
@@ -1614,10 +1593,7 @@ impl LockedDatabase {
id,
uuid: uuid.0,
short_name: row.get(2)?,
description: row.get(3)?,
onvif_host: row.get(4)?,
username: row.get(5)?,
password: row.get(6)?,
config: row.get(3)?,
streams: Default::default(),
},
);
@@ -1638,13 +1614,10 @@ impl LockedDatabase {
type,
camera_id,
sample_file_dir_id,
rtsp_url,
retain_bytes,
flush_if_sec,
config,
cum_recordings,
cum_media_duration_90k,
cum_runs,
record
cum_runs
from
stream;
"#,
@@ -1660,7 +1633,6 @@ impl LockedDatabase {
.cameras_by_id
.get_mut(&camera_id)
.ok_or_else(|| format_err!("missing camera {} for stream {}", camera_id, id))?;
let flush_if_sec = row.get(6)?;
self.streams_by_id.insert(
id,
Stream {
@@ -1668,9 +1640,7 @@ impl LockedDatabase {
type_,
camera_id,
sample_file_dir_id: row.get(3)?,
rtsp_url: row.get(4)?,
retain_bytes: row.get(5)?,
flush_if_sec,
config: row.get(4)?,
range: None,
sample_file_bytes: 0,
fs_bytes: 0,
@@ -1681,10 +1651,9 @@ impl LockedDatabase {
fs_bytes_to_add: 0,
duration: recording::Duration(0),
committed_days: days::Map::default(),
cum_recordings: row.get(7)?,
cum_media_duration: recording::Duration(row.get(8)?),
cum_runs: row.get(9)?,
record: row.get(10)?,
cum_recordings: row.get(5)?,
cum_media_duration: recording::Duration(row.get(6)?),
cum_runs: row.get(7)?,
uncommitted: VecDeque::new(),
synced_recordings: 0,
on_live_segment: Vec::new(),
@@ -1854,19 +1823,14 @@ impl LockedDatabase {
{
let mut stmt = tx.prepare_cached(
r#"
insert into camera (uuid, short_name, description, onvif_host, username,
password)
values (:uuid, :short_name, :description, :onvif_host, :username,
:password)
insert into camera (uuid, short_name, config)
values (:uuid, :short_name, :config)
"#,
)?;
stmt.execute(named_params! {
":uuid": uuid_bytes,
":short_name": &camera.short_name,
":description": &camera.description,
":onvif_host": &camera.onvif_host,
":username": &camera.username,
":password": &camera.password,
":config": &camera.config,
})?;
camera_id = tx.last_insert_rowid() as i32;
streams =
@@ -1880,10 +1844,7 @@ impl LockedDatabase {
id: camera_id,
uuid,
short_name: camera.short_name,
description: camera.description,
onvif_host: camera.onvif_host,
username: camera.username,
password: camera.password,
config: camera.config,
streams,
},
);
@@ -1891,6 +1852,36 @@ impl LockedDatabase {
Ok(camera_id)
}
/// Returns a `CameraChange` for the given camera which does nothing.
///
/// The caller can modify it to taste then pass it to `update_camera`.
/// TODO: consider renaming this to `update_camera` and creating a bulk
/// `apply_camera_changes`.
pub fn null_camera_change(&mut self, camera_id: i32) -> Result<CameraChange, Error> {
let camera = self
.cameras_by_id
.get(&camera_id)
.ok_or_else(|| format_err!("no such camera {}", camera_id))?;
let mut change = CameraChange {
short_name: camera.short_name.clone(),
config: camera.config.clone(),
streams: Default::default(),
};
for i in 0..NUM_STREAM_TYPES {
if let Some(stream_id) = camera.streams[i] {
let s = self
.streams_by_id
.get(&stream_id)
.expect("cameras reference valid streams");
change.streams[i] = StreamChange {
sample_file_dir_id: s.sample_file_dir_id,
config: s.config.clone(),
};
}
}
Ok(change)
}
/// Updates a camera.
pub fn update_camera(&mut self, camera_id: i32, mut camera: CameraChange) -> Result<(), Error> {
let tx = self.conn.transaction()?;
@@ -1906,10 +1897,7 @@ impl LockedDatabase {
r#"
update camera set
short_name = :short_name,
description = :description,
onvif_host = :onvif_host,
username = :username,
password = :password
config = :config
where
id = :id
"#,
@@ -1917,10 +1905,7 @@ impl LockedDatabase {
let rows = stmt.execute(named_params! {
":id": camera_id,
":short_name": &camera.short_name,
":description": &camera.description,
":onvif_host": &camera.onvif_host,
":username": &camera.username,
":password": &camera.password,
":config": &camera.config,
})?;
if rows != 1 {
bail!("Camera {} missing from database", camera_id);
@@ -1928,16 +1913,14 @@ impl LockedDatabase {
}
tx.commit()?;
c.short_name = camera.short_name;
c.description = camera.description;
c.onvif_host = camera.onvif_host;
c.username = camera.username;
c.password = camera.password;
c.config = camera.config;
c.streams = streams.apply(&mut self.streams_by_id);
Ok(())
}
/// Deletes a camera and its streams. The camera must have no recordings.
pub fn delete_camera(&mut self, id: i32) -> Result<(), Error> {
// TODO: also verify there are no uncommitted recordings.
let uuid = self
.cameras_by_id
.get(&id)
@@ -1975,35 +1958,34 @@ impl LockedDatabase {
Ok(())
}
// TODO: it'd make more sense to have a bulk camera/stream edit API than
// this specific one.
pub fn update_retention(&mut self, changes: &[RetentionChange]) -> Result<(), Error> {
// TODO: should validate there's only one change per id.
let tx = self.conn.transaction()?;
{
let mut stmt = tx.prepare_cached(
r#"
update stream
set
record = :record,
retain_bytes = :retain
config = :config
where
id = :id
"#,
)?;
for c in changes {
if c.new_limit < 0 {
bail!(
"can't set limit for stream {} to {}; must be >= 0",
c.stream_id,
c.new_limit
);
}
let stream = self
.streams_by_id
.get(&c.stream_id)
.ok_or_else(|| format_err!("no such stream id {}", c.stream_id))?;
let mut new_config = stream.config.clone();
new_config.mode = (if c.new_record { "record" } else { "" }).into();
new_config.retain_bytes = c.new_limit;
let rows = stmt.execute(named_params! {
":record": c.new_record,
":retain": c.new_limit,
":config": &new_config,
":id": c.stream_id,
})?;
if rows != 1 {
bail!("no such stream {}", c.stream_id);
}
assert_eq!(rows, 1, "missing stream {}", c.stream_id);
}
}
tx.commit()?;
@@ -2012,8 +1994,8 @@ impl LockedDatabase {
.streams_by_id
.get_mut(&c.stream_id)
.expect("stream in db but not state");
s.record = c.new_record;
s.retain_bytes = c.new_limit;
s.config.mode = (if c.new_record { "record" } else { "" }).into();
s.config.retain_bytes = c.new_limit;
}
Ok(())
}
@@ -2369,6 +2351,7 @@ mod tests {
use crate::testutil;
use base::clock;
use rusqlite::Connection;
use url::Url;
use uuid::Uuid;
fn setup_conn() -> Connection {
@@ -2386,9 +2369,12 @@ mod tests {
rows += 1;
camera_id = row.id;
assert_eq!(uuid, row.uuid);
assert_eq!("test-camera", row.onvif_host);
assert_eq!(Some("foo"), row.username.as_deref());
assert_eq!(Some("bar"), row.password.as_deref());
assert_eq!(
"http://test-camera/",
row.config.onvif_base_url.as_ref().unwrap().as_str()
);
assert_eq!("foo", &row.config.username);
assert_eq!("bar", &row.config.password);
//assert_eq!("/main", row.main_rtsp_url);
//assert_eq!("/sub", row.sub_rtsp_url);
//assert_eq!(42, row.retain_bytes);
@@ -2537,23 +2523,33 @@ mod tests {
let sample_file_dir_id = { db.lock() }.add_sample_file_dir(path).unwrap();
let mut c = CameraChange {
short_name: "testcam".to_owned(),
description: "".to_owned(),
onvif_host: "test-camera".to_owned(),
username: Some("foo".to_owned()),
password: Some("bar".to_owned()),
config: crate::json::CameraConfig {
description: "".to_owned(),
onvif_base_url: Some(Url::parse("http://test-camera/").unwrap()),
username: "foo".to_owned(),
password: "bar".to_owned(),
..Default::default()
},
streams: [
StreamChange {
sample_file_dir_id: Some(sample_file_dir_id),
rtsp_url: Some(Url::parse("rtsp://test-camera/main").unwrap()),
record: false,
flush_if_sec: 1,
config: crate::json::StreamConfig {
url: Some(Url::parse("rtsp://test-camera/main").unwrap()),
mode: crate::json::STREAM_MODE_RECORD.to_owned(),
flush_if_sec: 1,
..Default::default()
},
},
StreamChange {
sample_file_dir_id: Some(sample_file_dir_id),
rtsp_url: Some(Url::parse("rtsp://test-camera/sub").unwrap()),
record: true,
flush_if_sec: 1,
config: crate::json::StreamConfig {
url: Some(Url::parse("rtsp://test-camera/sub").unwrap()),
mode: crate::json::STREAM_MODE_RECORD.to_owned(),
flush_if_sec: 1,
..Default::default()
},
},
StreamChange::default(),
],
};
let camera_id = db.lock().add_camera(c.clone()).unwrap();
@@ -2573,19 +2569,27 @@ mod tests {
.unwrap();
{
let main = l.streams_by_id().get(&main_stream_id).unwrap();
assert!(main.record);
assert_eq!(main.retain_bytes, 42);
assert_eq!(main.flush_if_sec, 1);
assert_eq!(main.config.mode, crate::json::STREAM_MODE_RECORD);
assert_eq!(main.config.retain_bytes, 42);
assert_eq!(main.config.flush_if_sec, 1);
}
assert_eq!(
l.streams_by_id().get(&sub_stream_id).unwrap().flush_if_sec,
l.streams_by_id()
.get(&sub_stream_id)
.unwrap()
.config
.flush_if_sec,
1
);
c.streams[1].flush_if_sec = 2;
c.streams[1].config.flush_if_sec = 2;
l.update_camera(camera_id, c).unwrap();
assert_eq!(
l.streams_by_id().get(&sub_stream_id).unwrap().flush_if_sec,
l.streams_by_id()
.get(&sub_stream_id)
.unwrap()
.config
.flush_if_sec,
2
);
}
@@ -2608,6 +2612,7 @@ mod tests {
.streams_by_id()
.get(&sub_stream_id)
.unwrap()
.config
.flush_if_sec,
2
);