schema 2: add a "record" bool to streams

This commit is contained in:
Scott Lamb 2018-01-30 15:29:19 -08:00
parent dc402bdc01
commit 57c44b5e35
6 changed files with 74 additions and 30 deletions

View File

@ -45,6 +45,7 @@ use super::{decode_size, encode_size};
struct Stream { struct Stream {
label: String, label: String,
used: i64, used: i64,
record: bool,
retain: Option<i64>, // None if unparseable retain: Option<i64>, // None if unparseable
} }
@ -63,7 +64,7 @@ fn update_limits_inner(model: &Model) -> Result<(), Error> {
let mut db = model.db.lock(); let mut db = model.db.lock();
let mut tx = db.tx()?; let mut tx = db.tx()?;
for (&id, stream) in &model.streams { for (&id, stream) in &model.streams {
tx.update_retention(id, stream.retain.unwrap())?; tx.update_retention(id, stream.record, stream.retain.unwrap())?;
} }
tx.commit() tx.commit()
} }
@ -114,6 +115,13 @@ fn edit_limit(model: &RefCell<Model>, siv: &mut Cursive, id: i32, content: &str)
} }
} }
fn edit_record(model: &RefCell<Model>, id: i32, record: bool) {
let mut model = model.borrow_mut();
let model: &mut Model = &mut *model;
let stream = model.streams.get_mut(&id).unwrap();
stream.record = record;
}
fn confirm_deletion(model: &RefCell<Model>, siv: &mut Cursive, to_delete: i64) { fn confirm_deletion(model: &RefCell<Model>, siv: &mut Cursive, to_delete: i64) {
let typed = siv.find_id::<views::EditView>("confirm") let typed = siv.find_id::<views::EditView>("confirm")
.unwrap() .unwrap()
@ -188,6 +196,7 @@ pub fn add_dialog(db: &Arc<db::Database>, dir: &Arc<dir::SampleFileDir>, siv: &m
streams.insert(id, Stream { streams.insert(id, Stream {
label: format!("{}: {}: {}", id, c.short_name, s.type_.as_str()), label: format!("{}: {}: {}", id, c.short_name, s.type_.as_str()),
used: s.sample_file_bytes, used: s.sample_file_bytes,
record: s.record,
retain: Some(s.retain_bytes), retain: Some(s.retain_bytes),
}); });
total_used += s.sample_file_bytes; total_used += s.sample_file_bytes;
@ -207,17 +216,28 @@ pub fn add_dialog(db: &Arc<db::Database>, dir: &Arc<dir::SampleFileDir>, siv: &m
})) }))
}; };
const RECORD_WIDTH: usize = 8;
const BYTES_WIDTH: usize = 20;
let mut list = views::ListView::new(); let mut list = views::ListView::new();
list.add_child( list.add_child(
"stream", "stream",
views::LinearLayout::horizontal() views::LinearLayout::horizontal()
.child(views::TextView::new("usage").fixed_width(25)) .child(views::TextView::new("record").fixed_width(RECORD_WIDTH))
.child(views::TextView::new("limit").fixed_width(25))); .child(views::TextView::new("usage").fixed_width(BYTES_WIDTH))
.child(views::TextView::new("limit").fixed_width(BYTES_WIDTH)));
for (&id, stream) in &model.borrow().streams { for (&id, stream) in &model.borrow().streams {
let mut record_cb = views::Checkbox::new();
record_cb.set_checked(stream.record);
record_cb.set_on_change({
let model = model.clone();
move |_siv, record| edit_record(&model, id, record)
});
list.add_child( list.add_child(
&stream.label, &stream.label,
views::LinearLayout::horizontal() views::LinearLayout::horizontal()
.child(views::TextView::new(encode_size(stream.used)).fixed_width(25)) .child(record_cb.fixed_width(RECORD_WIDTH))
.child(views::TextView::new(encode_size(stream.used)).fixed_width(BYTES_WIDTH))
.child(views::EditView::new() .child(views::EditView::new()
.content(encode_size(stream.retain.unwrap())) .content(encode_size(stream.retain.unwrap()))
.on_edit({ .on_edit({
@ -228,21 +248,24 @@ pub fn add_dialog(db: &Arc<db::Database>, dir: &Arc<dir::SampleFileDir>, siv: &m
let model = model.clone(); let model = model.clone();
move |siv, _| press_change(&model, siv) move |siv, _| press_change(&model, siv)
}) })
.fixed_width(25)) .fixed_width(20))
.child(views::TextView::new("").with_id(format!("{}_ok", id)).fixed_width(1))); .child(views::TextView::new("").with_id(format!("{}_ok", id)).fixed_width(1)));
} }
let over = model.borrow().total_retain > model.borrow().fs_capacity; let over = model.borrow().total_retain > model.borrow().fs_capacity;
list.add_child( list.add_child(
"total", "total",
views::LinearLayout::horizontal() views::LinearLayout::horizontal()
.child(views::TextView::new(encode_size(model.borrow().total_used)).fixed_width(25)) .child(views::DummyView{}.fixed_width(RECORD_WIDTH))
.child(views::TextView::new(encode_size(model.borrow().total_used))
.fixed_width(BYTES_WIDTH))
.child(views::TextView::new(encode_size(model.borrow().total_retain)) .child(views::TextView::new(encode_size(model.borrow().total_retain))
.with_id("total_retain").fixed_width(25)) .with_id("total_retain").fixed_width(BYTES_WIDTH))
.child(views::TextView::new(if over { "*" } else { " " }).with_id("total_ok"))); .child(views::TextView::new(if over { "*" } else { " " }).with_id("total_ok")));
list.add_child( list.add_child(
"filesystem", "filesystem",
views::LinearLayout::horizontal() views::LinearLayout::horizontal()
.child(views::TextView::new("").fixed_width(25)) .child(views::DummyView{}.fixed_width(3))
.child(views::DummyView{}.fixed_width(20))
.child(views::TextView::new(encode_size(model.borrow().fs_capacity)).fixed_width(25))); .child(views::TextView::new(encode_size(model.borrow().fs_capacity)).fixed_width(25)));
let mut change_button = views::Button::new("Change", { let mut change_button = views::Button::new("Change", {
let model = model.clone(); let model = model.clone();

View File

@ -106,7 +106,6 @@ pub fn run() -> Result<(), Error> {
let s = web::Service::new(db.clone(), dir.clone(), Some(&args.flag_ui_dir), resolve_zone())?; let s = web::Service::new(db.clone(), dir.clone(), Some(&args.flag_ui_dir), resolve_zone())?;
// Start a streamer for each stream. // Start a streamer for each stream.
// TODO: enabled only.
let shutdown_streamers = Arc::new(AtomicBool::new(false)); let shutdown_streamers = Arc::new(AtomicBool::new(false));
let mut streamers = Vec::new(); let mut streamers = Vec::new();
let syncer = if !args.flag_read_only { let syncer = if !args.flag_read_only {
@ -121,6 +120,9 @@ pub fn run() -> Result<(), Error> {
shutdown: &shutdown_streamers, shutdown: &shutdown_streamers,
}; };
for (i, (id, stream)) in l.streams_by_id().iter().enumerate() { for (i, (id, stream)) in l.streams_by_id().iter().enumerate() {
if !stream.record {
continue;
}
let camera = l.cameras_by_id().get(&stream.camera_id).unwrap(); let camera = l.cameras_by_id().get(&stream.camera_id).unwrap();
let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / streams as i64; let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / streams as i64;
let mut streamer = streamer::Streamer::new(&env, syncer_channel.clone(), *id, camera, let mut streamer = streamer::Streamer::new(&env, syncer_channel.clone(), *id, camera,

View File

@ -420,6 +420,7 @@ pub struct Stream {
/// Mapping of calendar day (in the server's time zone) to a summary of recordings on that day. /// Mapping of calendar day (in the server's time zone) to a summary of recordings on that day.
pub days: BTreeMap<StreamDayKey, StreamDayValue>, pub days: BTreeMap<StreamDayKey, StreamDayValue>,
pub record: bool,
next_recording_id: i32, next_recording_id: i32,
} }
@ -592,6 +593,7 @@ pub struct Transaction<'a> {
} }
/// A modification to be done to a `Stream` after a `Transaction` is committed. /// A modification to be done to a `Stream` after a `Transaction` is committed.
#[derive(Default)]
struct StreamModification { struct StreamModification {
/// Add this to `camera.duration`. Thus, positive values indicate a net addition; /// Add this to `camera.duration`. Thus, positive values indicate a net addition;
/// negative values indicate a net subtraction. /// negative values indicate a net subtraction.
@ -612,6 +614,9 @@ struct StreamModification {
/// Reset the retain_bytes to the specified value. /// Reset the retain_bytes to the specified value.
new_retain_bytes: Option<i64>, new_retain_bytes: Option<i64>,
/// Reset the record to the specified value.
new_record: Option<bool>,
} }
fn composite_id(stream_id: i32, recording_id: i32) -> i64 { fn composite_id(stream_id: i32, recording_id: i32) -> i64 {
@ -754,22 +759,34 @@ impl<'a> Transaction<'a> {
Ok(recording_id) Ok(recording_id)
} }
/// Updates the `retain_bytes` for the given stream to the specified limit. /// Updates the `record` and `retain_bytes` for the given stream.
/// Note this just resets the limit in the database; it's the caller's responsibility to ensure /// Note this just resets the limit in the database; it's the caller's responsibility to ensure
/// current usage is under the new limit if desired. /// current usage is under the new limit if desired.
pub fn update_retention(&mut self, stream_id: i32, new_limit: i64) -> Result<(), Error> { pub fn update_retention(&mut self, stream_id: i32, new_record: bool, new_limit: i64)
-> Result<(), Error> {
if new_limit < 0 { if new_limit < 0 {
return Err(Error::new(format!("can't set limit for stream {} to {}; must be >= 0", return Err(Error::new(format!("can't set limit for stream {} to {}; must be >= 0",
stream_id, new_limit))); stream_id, new_limit)));
} }
self.check_must_rollback()?; self.check_must_rollback()?;
let mut stmt = let mut stmt = self.tx.prepare_cached(r#"
self.tx.prepare_cached("update stream set retain_bytes = :retain where id = :id")?; update stream
let changes = stmt.execute_named(&[(":retain", &new_limit), (":id", &stream_id)])?; set
record = :record,
retain_bytes = :retain
where
id = :id
"#)?;
let changes = stmt.execute_named(&[
(":record", &new_record),
(":retain", &new_limit),
(":id", &stream_id),
])?;
if changes != 1 { if changes != 1 {
return Err(Error::new(format!("no such stream {}", stream_id))); return Err(Error::new(format!("no such stream {}", stream_id)));
} }
let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, stream_id); let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, stream_id);
m.new_record = Some(new_record);
m.new_retain_bytes = Some(new_limit); m.new_retain_bytes = Some(new_limit);
Ok(()) Ok(())
} }
@ -791,6 +808,9 @@ impl<'a> Transaction<'a> {
if let Some(id) = m.new_next_recording_id { if let Some(id) = m.new_next_recording_id {
stream.next_recording_id = id; stream.next_recording_id = id;
} }
if let Some(r) = m.new_record {
stream.record = r;
}
if let Some(b) = m.new_retain_bytes { if let Some(b) = m.new_retain_bytes {
stream.retain_bytes = b; stream.retain_bytes = b;
} }
@ -809,16 +829,7 @@ impl<'a> Transaction<'a> {
/// Looks up an existing entry in `mods` for a given stream or makes+inserts an identity entry. /// Looks up an existing entry in `mods` for a given stream or makes+inserts an identity entry.
fn get_mods_by_stream(mods: &mut fnv::FnvHashMap<i32, StreamModification>, stream_id: i32) fn get_mods_by_stream(mods: &mut fnv::FnvHashMap<i32, StreamModification>, stream_id: i32)
-> &mut StreamModification { -> &mut StreamModification {
mods.entry(stream_id).or_insert_with(|| { mods.entry(stream_id).or_insert_with(StreamModification::default)
StreamModification{
duration: recording::Duration(0),
sample_file_bytes: 0,
range: None,
days: BTreeMap::new(),
new_next_recording_id: None,
new_retain_bytes: None,
}
})
} }
/// Fills the `range` of each `StreamModification`. This is done prior to commit so that if the /// Fills the `range` of each `StreamModification`. This is done prior to commit so that if the
@ -877,8 +888,8 @@ struct StreamInserter<'tx> {
impl<'tx> StreamInserter<'tx> { impl<'tx> StreamInserter<'tx> {
fn new(tx: &'tx rusqlite::Transaction) -> Result<Self, Error> { fn new(tx: &'tx rusqlite::Transaction) -> Result<Self, Error> {
let stmt = tx.prepare(r#" let stmt = tx.prepare(r#"
insert into stream (camera_id, type, rtsp_path, retain_bytes, next_recording_id) insert into stream (camera_id, type, rtsp_path, record, retain_bytes, next_recording_id)
values (:camera_id, :type, :rtsp_path, 0, 1) values (:camera_id, :type, :rtsp_path, 0, 0, 1)
"#)?; "#)?;
Ok(StreamInserter { Ok(StreamInserter {
tx, tx,
@ -904,6 +915,7 @@ impl<'tx> StreamInserter<'tx> {
sample_file_bytes: 0, sample_file_bytes: 0,
duration: recording::Duration(0), duration: recording::Duration(0),
days: BTreeMap::new(), days: BTreeMap::new(),
record: false,
next_recording_id: 1, next_recording_id: 1,
}); });
Ok(()) Ok(())
@ -1237,7 +1249,8 @@ impl LockedDatabase {
camera_id, camera_id,
rtsp_path, rtsp_path,
retain_bytes, retain_bytes,
next_recording_id next_recording_id,
record
from from
stream; stream;
"#)?; "#)?;
@ -1260,6 +1273,7 @@ impl LockedDatabase {
duration: recording::Duration(0), duration: recording::Duration(0),
days: BTreeMap::new(), days: BTreeMap::new(),
next_recording_id: row.get_checked(5)?, next_recording_id: row.get_checked(5)?,
record: row.get_checked(6)?,
}); });
let c = self.state.cameras_by_id.get_mut(&camera_id) let c = self.state.cameras_by_id.get_mut(&camera_id)
.ok_or_else(|| Error::new("missing camera".to_owned()))?; .ok_or_else(|| Error::new("missing camera".to_owned()))?;
@ -1811,7 +1825,7 @@ mod tests {
let mut l = db.lock(); let mut l = db.lock();
let stream_id = l.cameras_by_id().get(&camera_id).unwrap().streams[0].unwrap(); let stream_id = l.cameras_by_id().get(&camera_id).unwrap().streams[0].unwrap();
let mut tx = l.tx().unwrap(); let mut tx = l.tx().unwrap();
tx.update_retention(stream_id, 42).unwrap(); tx.update_retention(stream_id, true, 42).unwrap();
tx.commit().unwrap(); tx.commit().unwrap();
} }
let camera_uuid = { db.lock().cameras_by_id().get(&camera_id).unwrap().uuid }; let camera_uuid = { db.lock().cameras_by_id().get(&camera_id).unwrap().uuid };

View File

@ -154,7 +154,7 @@ impl fmt::Display for Time {
/// A duration specified in 1/90,000ths of a second. /// A duration specified in 1/90,000ths of a second.
/// Durations are typically non-negative, but a `db::CameraDayValue::duration` may be negative. /// Durations are typically non-negative, but a `db::CameraDayValue::duration` may be negative.
#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)] #[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
pub struct Duration(pub i64); pub struct Duration(pub i64);
impl fmt::Display for Duration { impl fmt::Display for Duration {

View File

@ -71,6 +71,11 @@ create table stream (
camera_id integer not null references camera (id), camera_id integer not null references camera (id),
type text not null check (type in ('main', 'sub')), type text not null check (type in ('main', 'sub')),
-- If record is true, the stream should start recording when moonfire
-- starts. If false, no new recordings will be made, but old recordings
-- will not be deleted.
record integer not null check (record in (1, 0)),
-- The path (starting with "/") to use in rtsp:// URLs to for this stream. -- The path (starting with "/") to use in rtsp:// URLs to for this stream.
rtsp_path text not null, rtsp_path text not null,

View File

@ -97,7 +97,7 @@ impl TestDb {
}).unwrap()); }).unwrap());
test_camera_uuid = l.cameras_by_id().get(&TEST_CAMERA_ID).unwrap().uuid; test_camera_uuid = l.cameras_by_id().get(&TEST_CAMERA_ID).unwrap().uuid;
let mut tx = l.tx().unwrap(); let mut tx = l.tx().unwrap();
tx.update_retention(TEST_STREAM_ID, 1048576).unwrap(); tx.update_retention(TEST_STREAM_ID, true, 1048576).unwrap();
tx.commit().unwrap(); tx.commit().unwrap();
} }
let path = tmpdir.path().to_str().unwrap().to_owned(); let path = tmpdir.path().to_str().unwrap().to_owned();