From dc402bdc0122837e25d9af9b59a5c744707dba89 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Tue, 23 Jan 2018 11:05:07 -0800 Subject: [PATCH 01/47] schema version 2: support sub streams This allows each camera to have a main and a sub stream. Previously there was a field in the schema for the sub stream's url, but it didn't do anything. Now you can configure individual retention for main and sub streams. They show up grouped in the UI. No support for upgrading from schema version 1 yet. --- design/api.md | 137 +++---- package.json | 2 +- src/cmds/config/cameras.rs | 67 +++- src/cmds/config/mod.rs | 3 +- src/cmds/config/retention.rs | 68 ++-- src/cmds/run.rs | 16 +- src/db.rs | 702 ++++++++++++++++++++++------------- src/dir.rs | 62 ++-- src/json.rs | 81 ++-- src/mp4.rs | 20 +- src/recording.rs | 22 +- src/schema.sql | 35 +- src/streamer.rs | 29 +- src/testutil.rs | 15 +- src/web.rs | 93 +++-- ui-src/index.html | 4 +- ui-src/index.js | 179 +++++---- 17 files changed, 936 insertions(+), 599 deletions(-) diff --git a/design/api.md b/design/api.md index 60a689e..c35cad0 100644 --- a/design/api.md +++ b/design/api.md @@ -44,29 +44,32 @@ The `application/json` response will have a dict as follows: * `uuid`: in text format * `shortName`: a short name (typically one or two words) * `description`: a longer description (typically a phrase or paragraph) - * `retainBytes`: the configured total number of bytes of completed - recordings to retain. - * `minStartTime90k`: the start time of the earliest recording for this - camera, in 90kHz units since 1970-01-01 00:00:00 UTC. - * `maxEndTime90k`: the end time of the latest recording for this camera, - in 90kHz units since 1970-01-01 00:00:00 UTC. - * `totalDuration90k`: the total duration recorded, in 90 kHz units. - This is no greater than `maxEndTime90k - maxStartTime90k`; it will be - lesser if there are gaps in the recorded data. - * `totalSampleFileBytes`: the total number of bytes of sample data (the - `mdat` portion of a `.mp4` file). - * `days`: object representing calendar days (in the server's time zone) - with non-zero total duration of recordings for that day. The keys are - of the form `YYYY-mm-dd`; the values are objects with the following - attributes: - * `totalDuration90k` is the total duration recorded during that day. - If a recording spans a day boundary, some portion of it is accounted to - each day. - * `startTime90k` is the start of that calendar day in the server's time - zone. - * `endTime90k` is the end of that calendar day in the server's time zone. - It is usually 24 hours after the start time. It might be 23 hours or 25 - hours during spring forward or fall back, respectively. + * `streams`: a dict of stream type ("main" or "sub") to a dictionary + describing the stream: + * `retainBytes`: the configured total number of bytes of completed + recordings to retain. + * `minStartTime90k`: the start time of the earliest recording for + this camera, in 90kHz units since 1970-01-01 00:00:00 UTC. + * `maxEndTime90k`: the end time of the latest recording for this + camera, in 90kHz units since 1970-01-01 00:00:00 UTC. + * `totalDuration90k`: the total duration recorded, in 90 kHz units. + This is no greater than `maxEndTime90k - maxStartTime90k`; it will + be lesser if there are gaps in the recorded data. + * `totalSampleFileBytes`: the total number of bytes of sample data + (the `mdat` portion of a `.mp4` file). + * `days`: object representing calendar days (in the server's time + zone) with non-zero total duration of recordings for that day. The + keys are of the form `YYYY-mm-dd`; the values are objects with the + following attributes: + * `totalDuration90k` is the total duration recorded during that + day. If a recording spans a day boundary, some portion of it + is accounted to each day. + * `startTime90k` is the start of that calendar day in the + server's time zone. + * `endTime90k` is the end of that calendar day in the server's + time zone. It is usually 24 hours after the start time. It + might be 23 hours or 25 hours during spring forward or fall + back, respectively. Example response: @@ -78,23 +81,27 @@ Example response: "uuid": "fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe", "shortName": "driveway", "description": "Hikvision DS-2CD2032 overlooking the driveway from east", - "retainBytes": 536870912000, - "minStartTime90k": 130888729442361, - "maxEndTime90k": 130985466591817, - "totalDuration90k": 96736169725, - "totalSampleFileBytes": 446774393937, - "days": { - "2016-05-01": { - "endTime90k": 131595516000000, - "startTime90k": 131587740000000, - "totalDuration90k": 52617609 - }, - "2016-05-02": { - "endTime90k": 131603292000000, - "startTime90k": 131595516000000, - "totalDuration90k": 20946022 + "streams": { + "main": { + "retainBytes": 536870912000, + "minStartTime90k": 130888729442361, + "maxEndTime90k": 130985466591817, + "totalDuration90k": 96736169725, + "totalSampleFileBytes": 446774393937, + "days": { + "2016-05-01": { + "endTime90k": 131595516000000, + "startTime90k": 131587740000000, + "totalDuration90k": 52617609 + }, + "2016-05-02": { + "endTime90k": 131603292000000, + "startTime90k": 131595516000000, + "totalDuration90k": 20946022 + } + } } - }, + } }, ... ], @@ -109,29 +116,33 @@ Example response: ```json { - "days": { - "2016-05-01": { - "endTime90k": 131595516000000, - "startTime90k": 131587740000000, - "totalDuration90k": 52617609 - }, - "2016-05-02": { - "endTime90k": 131603292000000, - "startTime90k": 131595516000000, - "totalDuration90k": 20946022 + "description": "", + "streams": { + "main": { + "days": { + "2016-05-01": { + "endTime90k": 131595516000000, + "startTime90k": 131587740000000, + "totalDuration90k": 52617609 + }, + "2016-05-02": { + "endTime90k": 131603292000000, + "startTime90k": 131595516000000, + "totalDuration90k": 20946022 + } + }, + "maxEndTime90k": 131598273666690, + "minStartTime90k": 131590386129355, + "retainBytes": 104857600, + "totalDuration90k": 73563631, + "totalSampleFileBytes": 98901406 } }, - "description": "", - "maxEndTime90k": 131598273666690, - "minStartTime90k": 131590386129355, - "retainBytes": 104857600, - "shortName": "driveway", - "totalDuration90k": 73563631, - "totalSampleFileBytes": 98901406 + "shortName": "driveway" } ``` -### `/api/cameras//recordings` +### `/api/cameras///recordings` A GET returns information about recordings, in descending order. @@ -175,7 +186,7 @@ Each recording object has the following properties: Example request URI (with added whitespace between parameters): ``` -/api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/recordings +/api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/recordings ?startTime90k=130888729442361 &endTime90k=130985466591817 ``` @@ -204,7 +215,7 @@ Example response: } ``` -### `/api/cameras//view.mp4` +### `/api/cameras///view.mp4` A GET returns a `.mp4` file, with an etag and support for range requests. The MIME type will be `video/mp4`, with a `codecs` parameter as specified in [RFC @@ -230,27 +241,27 @@ Expected query parameters: Example request URI to retrieve all of recording id 1 from the given camera: ``` - /api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/view.mp4?s=1 + /api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/view.mp4?s=1 ``` Example request URI to retrieve all of recording ids 1–5 from the given camera, with timestamp subtitles: ``` - /api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/view.mp4?s=1-5&ts=true + /api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/view.mp4?s=1-5&ts=true ``` Example request URI to retrieve recording id 1, skipping its first 26 90,000ths of a second: ``` - /api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/view.mp4?s=1.26 + /api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/view.mp4?s=1.26 ``` TODO: error behavior on missing segment. It should be a 404, likely with an `application/json` body describing what portion if any (still) exists. -### `/api/cameras//view.m4s` +### `/api/cameras///view.m4s` A GET returns a `.mp4` suitable for use as a [HTML5 Media Source Extensions media segment][media-segment]. The MIME type will be `video/mp4`, with a diff --git a/package.json b/package.json index a009a22..d984618 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "url": "https://github.com/scottlamb/moonfire-nvr/issues" }, "scripts": { - "build": "webpack && ln ui-src/index.html ui-dist/" + "build": "webpack && ln -f ui-src/index.html ui-dist/" }, "dependencies": { "jquery": "^3.2.1", diff --git a/src/cmds/config/cameras.rs b/src/cmds/config/cameras.rs index 9193837..3fd55ec 100644 --- a/src/cmds/config/cameras.rs +++ b/src/cmds/config/cameras.rs @@ -51,14 +51,13 @@ fn get_change(siv: &mut Cursive) -> db::CameraChange { let p = siv.find_id::("password").unwrap().get_content().as_str().into(); let m = siv.find_id::("main_rtsp_path").unwrap().get_content().as_str().into(); let s = siv.find_id::("sub_rtsp_path").unwrap().get_content().as_str().into(); - db::CameraChange{ + db::CameraChange { short_name: sn, description: d, host: h, username: u, password: p, - main_rtsp_path: m, - sub_rtsp_path: s, + rtsp_paths: [m, s], } } @@ -145,8 +144,20 @@ fn confirm_deletion(siv: &mut Cursive, db: &Arc, dir: &Arc("confirm").unwrap().get_content(); if decode_size(typed.as_str()).ok() == Some(to_delete) { siv.pop_layer(); // deletion confirmation dialog - if let Err(e) = dir::lower_retention(dir.clone(), - &[dir::NewLimit{camera_id: id, limit: 0}]) { + + let mut zero_limits = Vec::new(); + { + let l = db.lock(); + for (&stream_id, stream) in l.streams_by_id() { + if stream.camera_id == id { + zero_limits.push(dir::NewLimit { + stream_id, + limit: 0, + }); + } + } + } + if let Err(e) = dir::lower_retention(dir.clone(), &zero_limits) { siv.add_layer(views::Dialog::text(format!("Unable to delete recordings: {}", e)) .title("Error") .dismiss_button("Abort")); @@ -162,7 +173,6 @@ fn confirm_deletion(siv: &mut Cursive, db: &Arc, dir: &Arc, dir: &Arc, id: i32) { - info!("actually_delete call"); siv.pop_layer(); // get rid of the add/edit camera dialog. let result = { let mut l = db.lock(); @@ -198,14 +208,14 @@ fn edit_camera_dialog(db: &Arc, dir: &Arc, siv .child(views::DummyView) .child(views::Button::new("Test", |siv| { let c = get_change(siv); - press_test(siv, &c, "main", &c.main_rtsp_path) + press_test(siv, &c, "main", &c.rtsp_paths[0]) }))) .child("sub_rtsp_path", views::LinearLayout::horizontal() .child(views::EditView::new().with_id("sub_rtsp_path").full_width()) .child(views::DummyView) .child(views::Button::new("Test", |siv| { let c = get_change(siv); - press_test(siv, &c, "sub", &c.sub_rtsp_path) + press_test(siv, &c, "sub", &c.rtsp_paths[1]) }))) .min_height(8); let layout = views::LinearLayout::vertical() @@ -214,22 +224,41 @@ fn edit_camera_dialog(db: &Arc, dir: &Arc, siv .child(views::TextArea::new().with_id("description").min_height(3)) .full_width(); let mut dialog = views::Dialog::around(layout); - let dialog = if let Some(id) = *item { + let dialog = if let Some(camera_id) = *item { let l = db.lock(); - let camera = l.cameras_by_id().get(&id).expect("missing camera"); + let camera = l.cameras_by_id().get(&camera_id).expect("missing camera"); dialog.find_id("uuid", |v: &mut views::TextView| v.set_content(camera.uuid.to_string())) .expect("missing TextView"); - let bytes = camera.sample_file_bytes; + let mut main_rtsp_path = ""; + let mut sub_rtsp_path = ""; + let mut bytes = 0; + for (_, s) in l.streams_by_id() { + if s.camera_id != camera_id { continue; } + bytes += s.sample_file_bytes; + match s.type_ { + db::StreamType::MAIN => main_rtsp_path = &s.rtsp_path, + db::StreamType::SUB => sub_rtsp_path = &s.rtsp_path, + }; + } let name = camera.short_name.clone(); - for &(view_id, content) in &[("short_name", &camera.short_name), - ("host", &camera.host), - ("username", &camera.username), - ("password", &camera.password), - ("main_rtsp_path", &camera.main_rtsp_path), - ("sub_rtsp_path", &camera.sub_rtsp_path)] { + for &(view_id, content) in &[("short_name", &*camera.short_name), + ("host", &*camera.host), + ("username", &*camera.username), + ("password", &*camera.password), + ("main_rtsp_path", main_rtsp_path), + ("sub_rtsp_path", sub_rtsp_path)] { dialog.find_id(view_id, |v: &mut views::EditView| v.set_content(content.to_string())) .expect("missing EditView"); } + for s in l.streams_by_id().values() { + if s.camera_id != camera_id { continue }; + let id = match s.type_ { + db::StreamType::MAIN => "main_rtsp_path", + db::StreamType::SUB => "sub_rtsp_path", + }; + dialog.find_id(id, |v: &mut views::EditView| v.set_content(s.rtsp_path.to_string())) + .expect("missing EditView"); + } dialog.find_id("description", |v: &mut views::TextArea| v.set_content(camera.description.to_string())) .expect("missing TextArea"); @@ -237,12 +266,12 @@ fn edit_camera_dialog(db: &Arc, dir: &Arc, siv .button("Edit", { let db = db.clone(); let dir = dir.clone(); - move |s| press_edit(s, &db, &dir, Some(id)) + move |s| press_edit(s, &db, &dir, Some(camera_id)) }) .button("Delete", { let db = db.clone(); let dir = dir.clone(); - move |s| press_delete(s, &db, &dir, id, name.clone(), bytes) + move |s| press_delete(s, &db, &dir, camera_id, name.clone(), bytes) }) } else { dialog.title("Add camera") diff --git a/src/cmds/config/mod.rs b/src/cmds/config/mod.rs index 0894e8b..504aaff 100644 --- a/src/cmds/config/mod.rs +++ b/src/cmds/config/mod.rs @@ -144,7 +144,8 @@ pub fn run() -> Result<(), Error> { move |siv, item| item(&db, &dir, siv) }) .item("Edit cameras".to_string(), cameras::add_dialog) - .item("Edit retention".to_string(), retention::add_dialog)) + .item("Edit retention".to_string(), retention::add_dialog) + ) .button("Quit", |siv| siv.quit()) .title("Main menu")); diff --git a/src/cmds/config/retention.rs b/src/cmds/config/retention.rs index f0560a6..1ca4e34 100644 --- a/src/cmds/config/retention.rs +++ b/src/cmds/config/retention.rs @@ -42,7 +42,7 @@ use std::rc::Rc; use std::sync::Arc; use super::{decode_size, encode_size}; -struct Camera { +struct Stream { label: String, used: i64, retain: Option, // None if unparseable @@ -55,15 +55,15 @@ struct Model { total_used: i64, total_retain: i64, errors: isize, - cameras: BTreeMap, + streams: BTreeMap, } /// Updates the limits in the database. Doesn't delete excess data (if any). fn update_limits_inner(model: &Model) -> Result<(), Error> { let mut db = model.db.lock(); let mut tx = db.tx()?; - for (&id, camera) in &model.cameras { - tx.update_retention(id, camera.retain.unwrap())?; + for (&id, stream) in &model.streams { + tx.update_retention(id, stream.retain.unwrap())?; } tx.commit() } @@ -77,12 +77,12 @@ fn update_limits(model: &Model, siv: &mut Cursive) { } fn edit_limit(model: &RefCell, siv: &mut Cursive, id: i32, content: &str) { - info!("on_edit called for id {}", id); + debug!("on_edit called for id {}", id); let mut model = model.borrow_mut(); let model: &mut Model = &mut *model; - let camera = model.cameras.get_mut(&id).unwrap(); + let stream = model.streams.get_mut(&id).unwrap(); let new_value = decode_size(content).ok(); - let delta = new_value.unwrap_or(0) - camera.retain.unwrap_or(0); + let delta = new_value.unwrap_or(0) - stream.retain.unwrap_or(0); let old_errors = model.errors; if delta != 0 { let prev_over = model.total_retain > model.fs_capacity; @@ -91,7 +91,6 @@ fn edit_limit(model: &RefCell, siv: &mut Cursive, id: i32, content: &str) .unwrap() .set_content(encode_size(model.total_retain)); let now_over = model.total_retain > model.fs_capacity; - info!("now_over: {}", now_over); if now_over != prev_over { model.errors += if now_over { 1 } else { -1 }; siv.find_id::("total_ok") @@ -99,13 +98,13 @@ fn edit_limit(model: &RefCell, siv: &mut Cursive, id: i32, content: &str) .set_content(if now_over { "*" } else { " " }); } } - if new_value.is_none() != camera.retain.is_none() { + if new_value.is_none() != stream.retain.is_none() { model.errors += if new_value.is_none() { 1 } else { -1 }; siv.find_id::(&format!("{}_ok", id)) .unwrap() .set_content(if new_value.is_none() { "*" } else { " " }); } - camera.retain = new_value; + stream.retain = new_value; info!("model.errors = {}", model.errors); if (model.errors == 0) != (old_errors == 0) { info!("toggling change state: errors={}", model.errors); @@ -119,7 +118,7 @@ fn confirm_deletion(model: &RefCell, siv: &mut Cursive, to_delete: i64) { let typed = siv.find_id::("confirm") .unwrap() .get_content(); - info!("confirm, typed: {} vs expected: {}", typed.as_str(), to_delete); + debug!("confirm, typed: {} vs expected: {}", typed.as_str(), to_delete); if decode_size(typed.as_str()).ok() == Some(to_delete) { actually_delete(model, siv); } else { @@ -132,8 +131,8 @@ fn confirm_deletion(model: &RefCell, siv: &mut Cursive, to_delete: i64) { fn actually_delete(model: &RefCell, siv: &mut Cursive) { let model = &*model.borrow(); let new_limits: Vec<_> = - model.cameras.iter() - .map(|(&id, c)| dir::NewLimit{camera_id: id, limit: c.retain.unwrap()}) + model.streams.iter() + .map(|(&id, s)| dir::NewLimit {stream_id: id, limit: s.retain.unwrap()}) .collect(); siv.pop_layer(); // deletion confirmation siv.pop_layer(); // retention dialog @@ -150,11 +149,11 @@ fn press_change(model: &Rc>, siv: &mut Cursive) { if model.borrow().errors > 0 { return; } - let to_delete = model.borrow().cameras.values().map( - |c| ::std::cmp::max(c.used - c.retain.unwrap(), 0)).sum(); - info!("change press, to_delete={}", to_delete); + let to_delete = model.borrow().streams.values().map( + |s| ::std::cmp::max(s.used - s.retain.unwrap(), 0)).sum(); + debug!("change press, to_delete={}", to_delete); if to_delete > 0 { - let prompt = format!("Some cameras' usage exceeds new limit. Please confirm the amount \ + let prompt = format!("Some streams' usage exceeds new limit. Please confirm the amount \ of data to delete by typing it back:\n\n{}", encode_size(to_delete)); let dialog = views::Dialog::around( views::LinearLayout::vertical() @@ -179,19 +178,20 @@ fn press_change(model: &Rc>, siv: &mut Cursive) { pub fn add_dialog(db: &Arc, dir: &Arc, siv: &mut Cursive) { let model = { - let mut cameras = BTreeMap::new(); + let mut streams = BTreeMap::new(); let mut total_used = 0; let mut total_retain = 0; { let db = db.lock(); - for (&id, camera) in db.cameras_by_id() { - cameras.insert(id, Camera{ - label: format!("{}: {}", id, camera.short_name), - used: camera.sample_file_bytes, - retain: Some(camera.retain_bytes), + for (&id, s) in db.streams_by_id() { + let c = db.cameras_by_id().get(&s.camera_id).expect("stream without camera"); + streams.insert(id, Stream { + label: format!("{}: {}: {}", id, c.short_name, s.type_.as_str()), + used: s.sample_file_bytes, + retain: Some(s.retain_bytes), }); - total_used += camera.sample_file_bytes; - total_retain += camera.retain_bytes; + total_used += s.sample_file_bytes; + total_retain += s.retain_bytes; } } let stat = dir.statfs().unwrap(); @@ -199,27 +199,27 @@ pub fn add_dialog(db: &Arc, dir: &Arc, siv: &m Rc::new(RefCell::new(Model{ dir: dir.clone(), db: db.clone(), - fs_capacity: fs_capacity, - total_used: total_used, - total_retain: total_retain, + fs_capacity, + total_used, + total_retain, errors: (total_retain > fs_capacity) as isize, - cameras: cameras, + streams, })) }; let mut list = views::ListView::new(); list.add_child( - "camera", + "stream", views::LinearLayout::horizontal() .child(views::TextView::new("usage").fixed_width(25)) .child(views::TextView::new("limit").fixed_width(25))); - for (&id, camera) in &model.borrow().cameras { + for (&id, stream) in &model.borrow().streams { list.add_child( - &camera.label, + &stream.label, views::LinearLayout::horizontal() - .child(views::TextView::new(encode_size(camera.used)).fixed_width(25)) + .child(views::TextView::new(encode_size(stream.used)).fixed_width(25)) .child(views::EditView::new() - .content(encode_size(camera.retain.unwrap())) + .content(encode_size(stream.retain.unwrap())) .on_edit({ let model = model.clone(); move |siv, content, _pos| edit_limit(&model, siv, id, content) diff --git a/src/cmds/run.rs b/src/cmds/run.rs index bd69b6d..ddfea99 100644 --- a/src/cmds/run.rs +++ b/src/cmds/run.rs @@ -98,18 +98,21 @@ pub fn run() -> Result<(), Error> { &args.flag_db_dir, if args.flag_read_only { super::OpenMode::ReadOnly } else { super::OpenMode::ReadWrite })?; let db = Arc::new(db::Database::new(conn).unwrap()); + + // TODO: multiple sample file dirs. let dir = dir::SampleFileDir::new(&args.flag_sample_file_dir, db.clone()).unwrap(); info!("Database is loaded."); let s = web::Service::new(db.clone(), dir.clone(), Some(&args.flag_ui_dir), resolve_zone())?; - // Start a streamer for each camera. + // Start a streamer for each stream. + // TODO: enabled only. let shutdown_streamers = Arc::new(AtomicBool::new(false)); let mut streamers = Vec::new(); let syncer = if !args.flag_read_only { let (syncer_channel, syncer_join) = dir::start_syncer(dir.clone()).unwrap(); let l = db.lock(); - let cameras = l.cameras_by_id().len(); + let streams = l.streams_by_id().len(); let env = streamer::Environment{ db: &db, dir: &dir, @@ -117,12 +120,13 @@ pub fn run() -> Result<(), Error> { opener: &*stream::FFMPEG, shutdown: &shutdown_streamers, }; - for (i, (id, camera)) in l.cameras_by_id().iter().enumerate() { - let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / cameras as i64; + for (i, (id, stream)) in l.streams_by_id().iter().enumerate() { + let camera = l.cameras_by_id().get(&stream.camera_id).unwrap(); + let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / streams as i64; let mut streamer = streamer::Streamer::new(&env, syncer_channel.clone(), *id, camera, - rotate_offset_sec, + stream, rotate_offset_sec, streamer::ROTATE_INTERVAL_SEC); - let name = format!("stream-{}", streamer.short_name()); + let name = format!("s-{}", streamer.short_name()); streamers.push(thread::Builder::new().name(name).spawn(move|| { streamer.run(); }).expect("can't create thread")); diff --git a/src/db.rs b/src/db.rs index 79ed59e..8f7e06d 100644 --- a/src/db.rs +++ b/src/db.rs @@ -65,6 +65,7 @@ use std::cell::RefCell; use std::cmp; use std::io::Write; use std::ops::Range; +use std::mem; use std::str; use std::string::String; use std::sync::Arc; @@ -73,7 +74,7 @@ use time; use uuid::Uuid; /// Expected schema version. See `guide/schema.md` for more information. -pub const EXPECTED_VERSION: i32 = 1; +pub const EXPECTED_VERSION: i32 = 2; const GET_RECORDING_PLAYBACK_SQL: &'static str = r#" select @@ -110,10 +111,10 @@ const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#" "#; const INSERT_RECORDING_SQL: &'static str = r#" - insert into recording (composite_id, camera_id, run_offset, flags, sample_file_bytes, + insert into recording (composite_id, stream_id, run_offset, flags, sample_file_bytes, start_time_90k, duration_90k, local_time_delta_90k, video_samples, video_sync_samples, video_sample_entry_id) - values (:composite_id, :camera_id, :run_offset, :flags, :sample_file_bytes, + values (:composite_id, :stream_id, :run_offset, :flags, :sample_file_bytes, :start_time_90k, :duration_90k, :local_time_delta_90k, :video_samples, :video_sync_samples, :video_sample_entry_id) "#; @@ -125,7 +126,7 @@ const INSERT_RECORDING_PLAYBACK_SQL: &'static str = r#" "#; const UPDATE_NEXT_RECORDING_ID_SQL: &'static str = - "update camera set next_recording_id = :next_recording_id where id = :camera_id"; + "update stream set next_recording_id = :next_recording_id where id = :stream_id"; const LIST_OLDEST_SAMPLE_FILES_SQL: &'static str = r#" select @@ -152,24 +153,24 @@ const DELETE_RECORDING_PLAYBACK_SQL: &'static str = r#" delete from recording_playback where composite_id = :composite_id "#; -const CAMERA_MIN_START_SQL: &'static str = r#" +const STREAM_MIN_START_SQL: &'static str = r#" select start_time_90k from recording where - camera_id = :camera_id + stream_id = :stream_id order by start_time_90k limit 1 "#; -const CAMERA_MAX_START_SQL: &'static str = r#" +const STREAM_MAX_START_SQL: &'static str = r#" select start_time_90k, duration_90k from recording where - camera_id = :camera_id + stream_id = :stream_id order by start_time_90k desc; "#; @@ -236,7 +237,7 @@ pub struct ListRecordingsRow { pub start: recording::Time, pub video_sample_entry: Arc, - pub camera_id: i32, + pub stream_id: i32, pub id: i32, /// This is a recording::Duration, but a single recording's duration fits into an i32. @@ -257,7 +258,7 @@ pub struct ListAggregatedRecordingsRow { pub video_sync_samples: i64, pub sample_file_bytes: i64, pub video_sample_entry: Arc, - pub camera_id: i32, + pub stream_id: i32, pub flags: i32, pub run_start_id: i32, } @@ -286,7 +287,7 @@ pub enum RecordingFlags { /// A recording to pass to `insert_recording`. #[derive(Debug)] pub struct RecordingToInsert { - pub camera_id: i32, + pub stream_id: i32, pub run_offset: i32, pub flags: i32, pub sample_file_bytes: i32, @@ -304,7 +305,7 @@ pub struct RecordingToInsert { #[derive(Debug)] pub struct ListOldestSampleFilesRow { pub uuid: Uuid, - pub camera_id: i32, + pub stream_id: i32, pub recording_id: i32, pub time: Range, pub sample_file_bytes: i32, @@ -312,11 +313,11 @@ pub struct ListOldestSampleFilesRow { /// A calendar day in `YYYY-mm-dd` format. #[derive(Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd)] -pub struct CameraDayKey([u8; 10]); +pub struct StreamDayKey([u8; 10]); -impl CameraDayKey { +impl StreamDayKey { fn new(tm: time::Tm) -> Result { - let mut s = CameraDayKey([0u8; 10]); + let mut s = StreamDayKey([0u8; 10]); write!(&mut s.0[..], "{}", tm.strftime("%Y-%m-%d")?)?; Ok(s) } @@ -335,13 +336,13 @@ impl CameraDayKey { } } -impl AsRef for CameraDayKey { +impl AsRef for StreamDayKey { fn as_ref(&self) -> &str { str::from_utf8(&self.0[..]).expect("days are always UTF-8") } } /// In-memory state about a particular camera on a particular day. #[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub struct CameraDayValue { +pub struct StreamDayValue { /// The number of recordings that overlap with this day. Note that `adjust_day` automatically /// prunes days with 0 recordings. pub recordings: i64, @@ -362,11 +363,53 @@ pub struct Camera { pub host: String, pub username: String, pub password: String, - pub main_rtsp_path: String, - pub sub_rtsp_path: String, + pub streams: [Option; 2], +} + +#[derive(Copy, Clone, Debug)] +pub enum StreamType { MAIN, SUB } + +impl StreamType { + pub fn from_index(i: usize) -> Option { + match i { + 0 => Some(StreamType::MAIN), + 1 => Some(StreamType::SUB), + _ => None, + } + } + + pub fn index(self) -> usize { + match self { + StreamType::MAIN => 0, + StreamType::SUB => 1, + } + } + + pub fn as_str(self) -> &'static str { + match self { + StreamType::MAIN => "main", + StreamType::SUB => "sub", + } + } + + pub fn parse(type_: &str) -> Option { + match type_ { + "main" => Some(StreamType::MAIN), + "sub" => Some(StreamType::SUB), + _ => None, + } + } +} + +#[derive(Debug)] +pub struct Stream { + pub id: i32, + pub camera_id: i32, + pub type_: StreamType, + pub rtsp_path: String, pub retain_bytes: i64, - /// The time range of recorded data associated with this camera (minimum start time and maximum + /// The time range of recorded data associated with this stream (minimum start time and maximum /// end time). `None` iff there are no recordings for this camera. pub range: Option>, pub sample_file_bytes: i64, @@ -376,8 +419,7 @@ pub struct Camera { pub duration: recording::Duration, /// Mapping of calendar day (in the server's time zone) to a summary of recordings on that day. - pub days: BTreeMap, - + pub days: BTreeMap, next_recording_id: i32, } @@ -389,14 +431,13 @@ pub struct CameraChange { pub host: String, pub username: String, pub password: String, - pub main_rtsp_path: String, - pub sub_rtsp_path: String, + pub rtsp_paths: [String; 2], } /// Adds non-zero `delta` to the day represented by `day` in the map `m`. /// Inserts a map entry if absent; removes the entry if it has 0 entries on exit. -fn adjust_day(day: CameraDayKey, delta: CameraDayValue, - m: &mut BTreeMap) { +fn adjust_day(day: StreamDayKey, delta: StreamDayValue, + m: &mut BTreeMap) { use ::std::collections::btree_map::Entry; match m.entry(day) { Entry::Vacant(e) => { e.insert(delta); }, @@ -421,10 +462,10 @@ fn adjust_day(day: CameraDayKey, delta: CameraDayValue, /// This function swallows/logs date formatting errors because they shouldn't happen and there's /// not much that can be done about them. (The database operation has already gone through.) fn adjust_days(r: Range, sign: i64, - m: &mut BTreeMap) { + m: &mut BTreeMap) { // Find first day key. let mut my_tm = time::at(time::Timespec{sec: r.start.unix_seconds(), nsec: 0}); - let day = match CameraDayKey::new(my_tm) { + let day = match StreamDayKey::new(my_tm) { Ok(d) => d, Err(ref e) => { error!("Unable to fill first day key from {:?}: {}; will ignore.", my_tm, e); @@ -443,7 +484,7 @@ fn adjust_days(r: Range, sign: i64, let boundary_90k = boundary.sec * TIME_UNITS_PER_SEC; // Adjust the first day. - let first_day_delta = CameraDayValue{ + let first_day_delta = StreamDayValue{ recordings: sign, duration: recording::Duration(sign * (cmp::min(r.end.0, boundary_90k) - r.start.0)), }; @@ -456,21 +497,21 @@ fn adjust_days(r: Range, sign: i64, // Fill day with the second day. This requires a normalized representation so recalculate. // (The C mktime(3) already normalized for us once, but .to_timespec() discarded that result.) let my_tm = time::at(boundary); - let day = match CameraDayKey::new(my_tm) { + let day = match StreamDayKey::new(my_tm) { Ok(d) => d, Err(ref e) => { error!("Unable to fill second day key from {:?}: {}; will ignore.", my_tm, e); return; } }; - let second_day_delta = CameraDayValue{ + let second_day_delta = StreamDayValue{ recordings: sign, duration: recording::Duration(sign * (r.end.0 - boundary_90k)), }; adjust_day(day, second_day_delta, m); } -impl Camera { +impl Stream { /// Adds a single recording with the given properties to the in-memory state. fn add_recording(&mut self, r: Range, sample_file_bytes: i32) { self.range = Some(match self.range { @@ -484,9 +525,10 @@ impl Camera { } /// Initializes the recordings associated with the given camera. -fn init_recordings(conn: &mut rusqlite::Connection, camera_id: i32, camera: &mut Camera) +fn init_recordings(conn: &mut rusqlite::Connection, stream_id: i32, camera: &Camera, + stream: &mut Stream) -> Result<(), Error> { - info!("Loading recordings for camera {}", camera.short_name); + info!("Loading recordings for camera {} stream {:?}", camera.short_name, stream.type_); let mut stmt = conn.prepare(r#" select recording.start_time_90k, @@ -495,19 +537,19 @@ fn init_recordings(conn: &mut rusqlite::Connection, camera_id: i32, camera: &mut from recording where - camera_id = :camera_id + stream_id = :stream_id "#)?; - let mut rows = stmt.query_named(&[(":camera_id", &camera_id)])?; + let mut rows = stmt.query_named(&[(":stream_id", &stream_id)])?; let mut i = 0; while let Some(row) = rows.next() { let row = row?; let start = recording::Time(row.get_checked(0)?); let duration = recording::Duration(row.get_checked(1)?); let bytes = row.get_checked(2)?; - camera.add_recording(start .. start + duration, bytes); + stream.add_recording(start .. start + duration, bytes); i += 1; } - info!("Loaded {} recordings for camera {}", i, camera.short_name); + info!("Loaded {} recordings for camera {} stream {:?}", i, camera.short_name, stream.type_); Ok(()) } @@ -523,6 +565,7 @@ pub struct LockedDatabase { #[derive(Debug)] struct State { cameras_by_id: BTreeMap, + streams_by_id: BTreeMap, cameras_by_uuid: BTreeMap, video_sample_entries: BTreeMap>, list_recordings_by_time_sql: String, @@ -533,7 +576,7 @@ struct State { /// be applied to the in-memory state on successful commit. pub struct Transaction<'a> { state: &'a mut State, - mods_by_camera: fnv::FnvHashMap, + mods_by_stream: fnv::FnvHashMap, tx: rusqlite::Transaction<'a>, /// True if due to an earlier error the transaction must be rolled back rather than committed. @@ -548,8 +591,8 @@ pub struct Transaction<'a> { pub bypass_reservation_for_testing: bool, } -/// A modification to be done to a `Camera` after a `Transaction` is committed. -struct CameraModification { +/// A modification to be done to a `Stream` after a `Transaction` is committed. +struct StreamModification { /// Add this to `camera.duration`. Thus, positive values indicate a net addition; /// negative values indicate a net subtraction. duration: recording::Duration, @@ -557,10 +600,10 @@ struct CameraModification { /// Add this to `camera.sample_file_bytes`. sample_file_bytes: i64, - /// Add this to `camera.days`. - days: BTreeMap, + /// Add this to `stream.days`. + days: BTreeMap, - /// Reset the Camera range to this value. This should be populated immediately prior to the + /// Reset the Stream range to this value. This should be populated immediately prior to the /// commit. range: Option>, @@ -571,8 +614,8 @@ struct CameraModification { new_retain_bytes: Option, } -fn composite_id(camera_id: i32, recording_id: i32) -> i64 { - (camera_id as i64) << 32 | recording_id as i64 +fn composite_id(stream_id: i32, recording_id: i32) -> i64 { + (stream_id as i64) << 32 | recording_id as i64 } impl<'a> Transaction<'a> { @@ -601,23 +644,23 @@ impl<'a> Transaction<'a> { self.check_must_rollback()?; self.must_rollback = true; for row in rows { - let composite_id = &composite_id(row.camera_id, row.recording_id); + let composite_id = &composite_id(row.stream_id, row.recording_id); let changes = del1.execute_named(&[(":composite_id", composite_id)])?; if changes != 1 { return Err(Error::new(format!("no such recording {}/{} (uuid {})", - row.camera_id, row.recording_id, row.uuid))); + row.stream_id, row.recording_id, row.uuid))); } let changes = del2.execute_named(&[(":composite_id", composite_id)])?; if changes != 1 { return Err(Error::new(format!("no such recording_playback {}/{} (uuid {})", - row.camera_id, row.recording_id, row.uuid))); + row.stream_id, row.recording_id, row.uuid))); } let uuid = &row.uuid.as_bytes()[..]; insert.execute_named(&[ (":uuid", &uuid), (":state", &(ReservationState::Deleting as i64)) ])?; - let m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, row.camera_id); + let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, row.stream_id); m.duration -= row.time.end - row.time.start; m.sample_file_bytes -= row.sample_file_bytes as i64; adjust_days(row.time.clone(), -1, &mut m.days); @@ -656,9 +699,10 @@ impl<'a> Transaction<'a> { } // Unreserve the sample file uuid and insert the recording row. - let cam = match self.state.cameras_by_id.get_mut(&r.camera_id) { - None => return Err(Error::new(format!("no such camera id {}", r.camera_id))), - Some(c) => c, + // TODO: var used? + let stream = match self.state.streams_by_id.get_mut(&r.stream_id) { + None => return Err(Error::new(format!("no such stream id {}", r.stream_id))), + Some(s) => s, }; let uuid = &r.sample_file_uuid.as_bytes()[..]; { @@ -669,15 +713,15 @@ impl<'a> Transaction<'a> { } } self.must_rollback = true; - let m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, r.camera_id); + let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, r.stream_id); let recording_id; { - recording_id = m.new_next_recording_id.unwrap_or(cam.next_recording_id); - let composite_id = composite_id(r.camera_id, recording_id); + recording_id = m.new_next_recording_id.unwrap_or(stream.next_recording_id); + let composite_id = composite_id(r.stream_id, recording_id); let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_SQL)?; stmt.execute_named(&[ (":composite_id", &composite_id), - (":camera_id", &(r.camera_id as i64)), + (":stream_id", &(r.stream_id as i64)), (":run_offset", &r.run_offset), (":flags", &r.flags), (":sample_file_bytes", &r.sample_file_bytes), @@ -699,7 +743,7 @@ impl<'a> Transaction<'a> { ])?; let mut stmt = self.tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?; stmt.execute_named(&[ - (":camera_id", &(r.camera_id as i64)), + (":stream_id", &(r.stream_id as i64)), (":next_recording_id", &m.new_next_recording_id), ])?; } @@ -710,22 +754,22 @@ impl<'a> Transaction<'a> { Ok(recording_id) } - /// Updates the `retain_bytes` for the given camera to the specified limit. + /// Updates the `retain_bytes` for the given stream to the specified limit. /// Note this just resets the limit in the database; it's the caller's responsibility to ensure /// current usage is under the new limit if desired. - pub fn update_retention(&mut self, camera_id: i32, new_limit: i64) -> Result<(), Error> { + pub fn update_retention(&mut self, stream_id: i32, new_limit: i64) -> Result<(), Error> { if new_limit < 0 { - return Err(Error::new(format!("can't set limit for camera {} to {}; must be >= 0", - camera_id, new_limit))); + return Err(Error::new(format!("can't set limit for stream {} to {}; must be >= 0", + stream_id, new_limit))); } self.check_must_rollback()?; let mut stmt = - self.tx.prepare_cached("update camera set retain_bytes = :retain where id = :id")?; - let changes = stmt.execute_named(&[(":retain", &new_limit), (":id", &camera_id)])?; + self.tx.prepare_cached("update stream set retain_bytes = :retain where id = :id")?; + let changes = stmt.execute_named(&[(":retain", &new_limit), (":id", &stream_id)])?; if changes != 1 { - return Err(Error::new(format!("no such camera {}", camera_id))); + return Err(Error::new(format!("no such stream {}", stream_id))); } - let m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, camera_id); + let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, stream_id); m.new_retain_bytes = Some(new_limit); Ok(()) } @@ -735,20 +779,20 @@ impl<'a> Transaction<'a> { self.check_must_rollback()?; self.precommit()?; self.tx.commit()?; - for (&camera_id, m) in &self.mods_by_camera { - let camera = self.state.cameras_by_id.get_mut(&camera_id) - .expect("modified camera must exist"); - camera.duration += m.duration; - camera.sample_file_bytes += m.sample_file_bytes; + for (&stream_id, m) in &self.mods_by_stream { + let stream = self.state.streams_by_id.get_mut(&stream_id) + .expect("modified stream must exist"); + stream.duration += m.duration; + stream.sample_file_bytes += m.sample_file_bytes; for (k, v) in &m.days { - adjust_day(*k, *v, &mut camera.days); + adjust_day(*k, *v, &mut stream.days); } - camera.range = m.range.clone(); + stream.range = m.range.clone(); if let Some(id) = m.new_next_recording_id { - camera.next_recording_id = id; + stream.next_recording_id = id; } if let Some(b) = m.new_retain_bytes { - camera.retain_bytes = b; + stream.retain_bytes = b; } } Ok(()) @@ -762,11 +806,11 @@ impl<'a> Transaction<'a> { Ok(()) } - /// Looks up an existing entry in `mods` for a given camera or makes+inserts an identity entry. - fn get_mods_by_camera(mods: &mut fnv::FnvHashMap, camera_id: i32) - -> &mut CameraModification { - mods.entry(camera_id).or_insert_with(|| { - CameraModification{ + /// Looks up an existing entry in `mods` for a given stream or makes+inserts an identity entry. + fn get_mods_by_stream(mods: &mut fnv::FnvHashMap, stream_id: i32) + -> &mut StreamModification { + mods.entry(stream_id).or_insert_with(|| { + StreamModification{ duration: recording::Duration(0), sample_file_bytes: 0, range: None, @@ -777,14 +821,14 @@ impl<'a> Transaction<'a> { }) } - /// Fills the `range` of each `CameraModification`. This is done prior to commit so that if the + /// Fills the `range` of each `StreamModification`. This is done prior to commit so that if the /// commit succeeds, there's no possibility that the correct state can't be retrieved. fn precommit(&mut self) -> Result<(), Error> { // Recompute start and end times for each camera. - for (&camera_id, m) in &mut self.mods_by_camera { + for (&stream_id, m) in &mut self.mods_by_stream { // The minimum is straightforward, taking advantage of the start_time_90k index. - let mut stmt = self.tx.prepare_cached(CAMERA_MIN_START_SQL)?; - let mut rows = stmt.query_named(&[(":camera_id", &camera_id)])?; + let mut stmt = self.tx.prepare_cached(STREAM_MIN_START_SQL)?; + let mut rows = stmt.query_named(&[(":stream_id", &stream_id)])?; let min_start = match rows.next() { Some(row) => recording::Time(row?.get_checked(0)?), None => continue, // no data; leave m.range alone. @@ -794,8 +838,8 @@ impl<'a> Transaction<'a> { // straightforward because recordings could overlap. All recordings starting in the // last MAX_RECORDING_DURATION must be examined in order to take advantage of the // start_time_90k index. - let mut stmt = self.tx.prepare_cached(CAMERA_MAX_START_SQL)?; - let mut rows = stmt.query_named(&[(":camera_id", &camera_id)])?; + let mut stmt = self.tx.prepare_cached(STREAM_MAX_START_SQL)?; + let mut rows = stmt.query_named(&[(":stream_id", &stream_id)])?; let mut maxes_opt = None; while let Some(row) = rows.next() { let row = row?; @@ -814,8 +858,8 @@ impl<'a> Transaction<'a> { let max_end = match maxes_opt { Some(Range{end: e, ..}) => e, None => { - return Err(Error::new(format!("missing max for camera {} which had min {}", - camera_id, min_start))); + return Err(Error::new(format!("missing max for stream {} which had min {}", + stream_id, min_start))); } }; m.range = Some(min_start .. max_end); @@ -824,9 +868,54 @@ impl<'a> Transaction<'a> { } } +struct StreamInserter<'tx> { + tx: &'tx rusqlite::Transaction<'tx>, + stmt: rusqlite::Statement<'tx>, + new_streams: BTreeMap, +} + +impl<'tx> StreamInserter<'tx> { + fn new(tx: &'tx rusqlite::Transaction) -> Result { + let stmt = tx.prepare(r#" + insert into stream (camera_id, type, rtsp_path, retain_bytes, next_recording_id) + values (:camera_id, :type, :rtsp_path, 0, 1) + "#)?; + Ok(StreamInserter { + tx, + stmt, + new_streams: BTreeMap::new(), + }) + } + + fn add(&mut self, camera_id: i32, type_: StreamType, rtsp_path: String) -> Result<(), Error> { + self.stmt.execute_named(&[ + (":camera_id", &camera_id), + (":type", &type_.as_str()), + (":rtsp_path", &rtsp_path) + ])?; + let id = self.tx.last_insert_rowid() as i32; + self.new_streams.insert(id, Stream { + id, + type_, + camera_id, + rtsp_path, + retain_bytes: 0, + range: None, + sample_file_bytes: 0, + duration: recording::Duration(0), + days: BTreeMap::new(), + next_recording_id: 1, + }); + Ok(()) + } + + fn streams(self) -> BTreeMap { self.new_streams } +} + impl LockedDatabase { /// Returns an immutable view of the cameras by id. pub fn cameras_by_id(&self) -> &BTreeMap { &self.state.cameras_by_id } + pub fn streams_by_id(&self) -> &BTreeMap { &self.state.streams_by_id } /// Returns an immutable view of the video sample entries. pub fn video_sample_entries(&self) -> btree_map::Values> { @@ -840,7 +929,7 @@ impl LockedDatabase { pub fn tx(&mut self) -> Result { Ok(Transaction{ state: &mut self.state, - mods_by_camera: fnv::FnvHashMap::default(), + mods_by_stream: fnv::FnvHashMap::default(), tx: self.conn.transaction()?, must_rollback: false, bypass_reservation_for_testing: false, @@ -857,30 +946,30 @@ impl LockedDatabase { /// Lists the specified recordings in ascending order by start time, passing them to a supplied /// function. Given that the function is called with the database lock held, it should be quick. - pub fn list_recordings_by_time(&self, camera_id: i32, desired_time: Range, + pub fn list_recordings_by_time(&self, stream_id: i32, desired_time: Range, f: F) -> Result<(), Error> where F: FnMut(ListRecordingsRow) -> Result<(), Error> { let mut stmt = self.conn.prepare_cached(&self.state.list_recordings_by_time_sql)?; let rows = stmt.query_named(&[ - (":camera_id", &camera_id), + (":stream_id", &stream_id), (":start_time_90k", &desired_time.start.0), (":end_time_90k", &desired_time.end.0)])?; - self.list_recordings_inner(camera_id, rows, f) + self.list_recordings_inner(stream_id, rows, f) } /// Lists the specified recordigs in ascending order by id. - pub fn list_recordings_by_id(&self, camera_id: i32, desired_ids: Range, f: F) + pub fn list_recordings_by_id(&self, stream_id: i32, desired_ids: Range, f: F) -> Result<(), Error> where F: FnMut(ListRecordingsRow) -> Result<(), Error> { let mut stmt = self.conn.prepare_cached(LIST_RECORDINGS_BY_ID_SQL)?; let rows = stmt.query_named(&[ - (":start", &composite_id(camera_id, desired_ids.start)), - (":end", &composite_id(camera_id, desired_ids.end)), + (":start", &composite_id(stream_id, desired_ids.start)), + (":end", &composite_id(stream_id, desired_ids.end)), ])?; - self.list_recordings_inner(camera_id, rows, f) + self.list_recordings_inner(stream_id, rows, f) } - fn list_recordings_inner(&self, camera_id: i32, mut rows: rusqlite::Rows, mut f: F) + fn list_recordings_inner(&self, stream_id: i32, mut rows: rusqlite::Rows, mut f: F) -> Result<(), Error> where F: FnMut(ListRecordingsRow) -> Result<(), Error> { while let Some(row) = rows.next() { @@ -892,12 +981,12 @@ impl LockedDatabase { None => { return Err(Error::new(format!( "recording {}/{} references nonexistent video_sample_entry {}", - camera_id, id, vse_id))); + stream_id, id, vse_id))); }, }; let out = ListRecordingsRow{ - camera_id: camera_id, - id: id, + stream_id, + id, run_offset: row.get_checked(1)?, flags: row.get_checked(2)?, start: recording::Time(row.get_checked(3)?), @@ -915,7 +1004,7 @@ impl LockedDatabase { /// Calls `list_recordings_by_time` and aggregates consecutive recordings. /// Rows are given to the callback in arbitrary order. Callers which care about ordering /// should do their own sorting. - pub fn list_aggregated_recordings(&self, camera_id: i32, + pub fn list_aggregated_recordings(&self, stream_id: i32, desired_time: Range, forced_split: recording::Duration, mut f: F) -> Result<(), Error> @@ -934,7 +1023,7 @@ impl LockedDatabase { // their timestamps overlap. Tracking all active runs prevents that interleaving from // causing problems.) let mut aggs: BTreeMap = BTreeMap::new(); - self.list_recordings_by_time(camera_id, desired_time, |row| { + self.list_recordings_by_time(stream_id, desired_time, |row| { let run_start_id = row.id - row.run_offset; let needs_flush = if let Some(a) = aggs.get(&run_start_id) { let new_dur = a.time.end - a.time.start + @@ -951,8 +1040,8 @@ impl LockedDatabase { let need_insert = if let Some(ref mut a) = aggs.get_mut(&run_start_id) { if a.time.end != row.start { return Err(Error::new(format!( - "camera {} recording {} ends at {}; {} starts at {}; expected same", - camera_id, a.ids.end - 1, a.time.end, row.id, row.start))); + "stream {} recording {} ends at {}; {} starts at {}; expected same", + stream_id, a.ids.end - 1, a.time.end, row.id, row.start))); } a.time.end.0 += row.duration_90k as i64; a.ids.end = row.id + 1; @@ -971,7 +1060,7 @@ impl LockedDatabase { video_sync_samples: row.video_sync_samples as i64, sample_file_bytes: row.sample_file_bytes as i64, video_sample_entry: row.video_sample_entry, - camera_id: camera_id, + stream_id, run_start_id: row.id - row.run_offset, flags: row.flags, }); @@ -987,16 +1076,16 @@ impl LockedDatabase { /// Calls `f` with a single `recording_playback` row. /// Note the lock is held for the duration of `f`. /// This uses a LRU cache to reduce the number of retrievals from the database. - pub fn with_recording_playback(&self, camera_id: i32, recording_id: i32, f: F) + pub fn with_recording_playback(&self, stream_id: i32, recording_id: i32, f: F) -> Result where F: FnOnce(&RecordingPlayback) -> Result { - let composite_id = composite_id(camera_id, recording_id); + let composite_id = composite_id(stream_id, recording_id); let mut cache = self.state.playback_cache.borrow_mut(); if let Some(r) = cache.get_mut(&composite_id) { - trace!("cache hit for recording {}/{}", camera_id, recording_id); + trace!("cache hit for recording {}/{}", stream_id, recording_id); return f(&RecordingPlayback::new(r)); } - trace!("cache miss for recording {}/{}", camera_id, recording_id); + trace!("cache miss for recording {}/{}", stream_id, recording_id); let mut stmt = self.conn.prepare_cached(GET_RECORDING_PLAYBACK_SQL)?; let mut rows = stmt.query_named(&[(":composite_id", &composite_id)])?; if let Some(row) = rows.next() { @@ -1011,7 +1100,7 @@ impl LockedDatabase { cache.insert(composite_id, data); return result; } - Err(Error::new(format!("no such recording {}/{}", camera_id, recording_id))) + Err(Error::new(format!("no such recording {}/{}", stream_id, recording_id))) } /// Lists all reserved sample files. @@ -1029,12 +1118,12 @@ impl LockedDatabase { /// Lists the oldest sample files (to delete to free room). /// `f` should return true as long as further rows are desired. - pub fn list_oldest_sample_files(&self, camera_id: i32, mut f: F) -> Result<(), Error> + pub fn list_oldest_sample_files(&self, stream_id: i32, mut f: F) -> Result<(), Error> where F: FnMut(ListOldestSampleFilesRow) -> bool { let mut stmt = self.conn.prepare_cached(LIST_OLDEST_SAMPLE_FILES_SQL)?; let mut rows = stmt.query_named(&[ - (":start", &composite_id(camera_id, 0)), - (":end", &composite_id(camera_id + 1, 0)), + (":start", &composite_id(stream_id, 0)), + (":end", &composite_id(stream_id + 1, 0)), ])?; while let Some(row) = rows.next() { let row = row?; @@ -1044,7 +1133,7 @@ impl LockedDatabase { let uuid: FromSqlUuid = row.get_checked(1)?; let should_continue = f(ListOldestSampleFilesRow{ recording_id: composite_id as i32, - camera_id: (composite_id >> 32) as i32, + stream_id: (composite_id >> 32) as i32, uuid: uuid.0, time: start .. start + duration, sample_file_bytes: row.get_checked(4)?, @@ -1106,17 +1195,13 @@ impl LockedDatabase { info!("Loading cameras"); let mut stmt = self.conn.prepare(r#" select - camera.id, - camera.uuid, - camera.short_name, - camera.description, - camera.host, - camera.username, - camera.password, - camera.main_rtsp_path, - camera.sub_rtsp_path, - camera.retain_bytes, - next_recording_id + id, + uuid, + short_name, + description, + host, + username, + password from camera; "#)?; @@ -1125,7 +1210,7 @@ impl LockedDatabase { let row = row?; let id = row.get_checked(0)?; let uuid: FromSqlUuid = row.get_checked(1)?; - self.state.cameras_by_id.insert(id, Camera{ + self.state.cameras_by_id.insert(id, Camera { id: id, uuid: uuid.0, short_name: row.get_checked(2)?, @@ -1133,14 +1218,7 @@ impl LockedDatabase { host: row.get_checked(4)?, username: row.get_checked(5)?, password: row.get_checked(6)?, - main_rtsp_path: row.get_checked(7)?, - sub_rtsp_path: row.get_checked(8)?, - retain_bytes: row.get_checked(9)?, - range: None, - sample_file_bytes: 0, - duration: recording::Duration(0), - days: BTreeMap::new(), - next_recording_id: row.get_checked(10)?, + streams: Default::default(), }); self.state.cameras_by_uuid.insert(uuid.0, id); } @@ -1148,6 +1226,49 @@ impl LockedDatabase { Ok(()) } + /// Initializes the streams, but not their matching recordings. + /// To be called during construction. + fn init_streams(&mut self) -> Result<(), Error> { + info!("Loading streams"); + let mut stmt = self.conn.prepare(r#" + select + id, + type, + camera_id, + rtsp_path, + retain_bytes, + next_recording_id + from + stream; + "#)?; + let mut rows = stmt.query(&[])?; + while let Some(row) = rows.next() { + let row = row?; + let id = row.get_checked(0)?; + let type_: String = row.get_checked(1)?; + let type_ = StreamType::parse(&type_).ok_or_else( + || Error::new(format!("no such stream type {}", type_)))?; + let camera_id = row.get_checked(2)?; + self.state.streams_by_id.insert(id, Stream { + id, + type_, + camera_id, + rtsp_path: row.get_checked(3)?, + retain_bytes: row.get_checked(4)?, + range: None, + sample_file_bytes: 0, + duration: recording::Duration(0), + days: BTreeMap::new(), + next_recording_id: row.get_checked(5)?, + }); + let c = self.state.cameras_by_id.get_mut(&camera_id) + .ok_or_else(|| Error::new("missing camera".to_owned()))?; + c.streams[type_.index()] = Some(id); + } + info!("Loaded {} streams", self.state.streams_by_id.len()); + Ok(()) + } + /// Inserts the specified video sample entry if absent. /// On success, returns the id of a new or existing row. pub fn insert_video_sample_entry(&mut self, w: u16, h: u16, data: Vec, @@ -1192,97 +1313,162 @@ impl LockedDatabase { } /// Adds a camera. - pub fn add_camera(&mut self, camera: CameraChange) -> Result { + pub fn add_camera(&mut self, mut camera: CameraChange) -> Result { let uuid = Uuid::new_v4(); let uuid_bytes = &uuid.as_bytes()[..]; - let mut stmt = self.conn.prepare_cached(r#" - insert into camera (uuid, short_name, description, host, username, password, - main_rtsp_path, sub_rtsp_path, retain_bytes, next_recording_id) - values (:uuid, :short_name, :description, :host, :username, :password, - :main_rtsp_path, :sub_rtsp_path, 0, 1) - "#)?; - stmt.execute_named(&[ - (":uuid", &uuid_bytes), - (":short_name", &camera.short_name), - (":description", &camera.description), - (":host", &camera.host), - (":username", &camera.username), - (":password", &camera.password), - (":main_rtsp_path", &camera.main_rtsp_path), - (":sub_rtsp_path", &camera.sub_rtsp_path), - ])?; - let id = self.conn.last_insert_rowid() as i32; - self.state.cameras_by_id.insert(id, Camera{ - id: id, - uuid: uuid, + let tx = self.conn.transaction()?; + let mut new_streams; + let camera_id; + { + let mut stmt = tx.prepare_cached(r#" + insert into camera (uuid, short_name, description, host, username, password) + values (:uuid, :short_name, :description, :host, :username, :password) + "#)?; + stmt.execute_named(&[ + (":uuid", &uuid_bytes), + (":short_name", &camera.short_name), + (":description", &camera.description), + (":host", &camera.host), + (":username", &camera.username), + (":password", &camera.password), + ])?; + camera_id = tx.last_insert_rowid() as i32; + let mut inserter = StreamInserter::new(&tx)?; + for (i, ref mut rtsp_path) in camera.rtsp_paths.iter_mut().enumerate() { + if rtsp_path.is_empty() { continue; } + inserter.add(camera_id, StreamType::from_index(i).unwrap(), + mem::replace(rtsp_path, String::new()))?; + } + new_streams = inserter.streams(); + } + tx.commit()?; + let mut streams = [None, None]; + for (&id, s) in &new_streams { + streams[s.type_.index()] = Some(id); + } + self.state.streams_by_id.append(&mut new_streams); + self.state.cameras_by_id.insert(camera_id, Camera { + id: camera_id, + uuid, short_name: camera.short_name, description: camera.description, host: camera.host, username: camera.username, password: camera.password, - main_rtsp_path: camera.main_rtsp_path, - sub_rtsp_path: camera.sub_rtsp_path, - retain_bytes: 0, - range: None, - sample_file_bytes: 0, - duration: recording::Duration(0), - days: BTreeMap::new(), - next_recording_id: 1, + streams, }); - self.state.cameras_by_uuid.insert(uuid, id); - Ok(id) + self.state.cameras_by_uuid.insert(uuid, camera_id); + Ok(camera_id) } /// Updates a camera. - pub fn update_camera(&mut self, id: i32, camera: CameraChange) -> Result<(), Error> { - let mut stmt = self.conn.prepare_cached(r#" - update camera set - short_name = :short_name, - description = :description, - host = :host, - username = :username, - password = :password, - main_rtsp_path = :main_rtsp_path, - sub_rtsp_path = :sub_rtsp_path - where - id = :id - "#)?; - stmt.execute_named(&[ - (":id", &id), - (":short_name", &camera.short_name), - (":description", &camera.description), - (":host", &camera.host), - (":username", &camera.username), - (":password", &camera.password), - (":main_rtsp_path", &camera.main_rtsp_path), - (":sub_rtsp_path", &camera.sub_rtsp_path), - ])?; - let c = self.state.cameras_by_id.get_mut(&id).unwrap(); + pub fn update_camera(&mut self, camera_id: i32, mut camera: CameraChange) -> Result<(), Error> { + let tx = self.conn.transaction()?; + let mut new_streams; + let mut stream_rtsp_changes = BTreeMap::new(); + { + let mut stream_ids = [None; 2]; + let mut stream_update_stmt = tx.prepare_cached(r#" + update stream set + rtsp_path = :rtsp_path + where + id = :id + "#)?; + for (&stream_id, stream) in &self.state.streams_by_id { + if stream.camera_id != camera_id { + continue; + } + stream_ids[stream.type_.index()] = Some(stream_id); + let p = mem::replace(&mut camera.rtsp_paths[stream.type_.index()], String::new()); + let rows = stream_update_stmt.execute_named(&[ + (":id", &stream_id), + (":rtsp_path", &p), + ])?; + if rows != 1 { + return Err(Error::new(format!("Stream {} missing from database", + stream_id))); + } + stream_rtsp_changes.insert(stream_id, p); + } + let mut inserter = StreamInserter::new(&tx)?; + for (index, id) in stream_ids.iter().enumerate() { + if id.is_none() && !camera.rtsp_paths[index].is_empty() { + inserter.add(camera_id, StreamType::from_index(index).unwrap(), + mem::replace(&mut camera.rtsp_paths[index], String::new()))?; + } + } + new_streams = inserter.streams(); + let mut stmt = tx.prepare_cached(r#" + update camera set + short_name = :short_name, + description = :description, + host = :host, + username = :username, + password = :password + where + id = :id + "#)?; + let rows = stmt.execute_named(&[ + (":id", &camera_id), + (":short_name", &camera.short_name), + (":description", &camera.description), + (":host", &camera.host), + (":username", &camera.username), + (":password", &camera.password), + ])?; + if rows != 1 { + return Err(Error::new(format!("Camera {} missing from database", camera_id))); + } + } + tx.commit()?; + let c = self.state.cameras_by_id.get_mut(&camera_id).unwrap(); c.short_name = camera.short_name; c.description = camera.description; c.host = camera.host; c.username = camera.username; c.password = camera.password; - c.main_rtsp_path = camera.main_rtsp_path; - c.sub_rtsp_path = camera.sub_rtsp_path; + for (&id, s) in &new_streams { + c.streams[s.type_.index()] = Some(id); + } + self.state.streams_by_id.append(&mut new_streams); + for (id, p) in &mut stream_rtsp_changes { + let mut s = self.state.streams_by_id.get_mut(id) + .ok_or_else(|| Error::new(format!("stream {} missing", id)))?; + mem::swap(&mut s.rtsp_path, p); + } Ok(()) } - /// Deletes a camera. The camera must have no recordings. + /// Deletes a camera and its streams. The camera must have no recordings. pub fn delete_camera(&mut self, id: i32) -> Result<(), Error> { - let (has_recordings, uuid) = - self.state.cameras_by_id.get(&id) - .map(|c| (c.range.is_some(), c.uuid)) - .ok_or_else(|| Error::new(format!("No such camera {} to remove", id)))?; - if has_recordings { - return Err(Error::new(format!("Can't remove camera {}; has recordings.", id))); - }; - let mut stmt = self.conn.prepare_cached(r"delete from camera where id = :id")?; - let rows = stmt.execute_named(&[(":id", &id)])?; - if rows != 1 { - return Err(Error::new(format!("Camera {} missing from database", id))); + let uuid = self.state.cameras_by_id.get(&id) + .map(|c| c.uuid) + .ok_or_else(|| Error::new(format!("No such camera {} to remove", id)))?; + let mut streams_to_delete = Vec::new(); + let tx = self.conn.transaction()?; + { + let mut stream_stmt = tx.prepare_cached(r"delete from stream where id = :id")?; + for (stream_id, stream) in &self.state.streams_by_id { + if stream.camera_id != id { continue }; + if stream.range.is_some() { + return Err(Error::new(format!("Can't remove camera {}; has recordings.", id))); + } + let rows = stream_stmt.execute_named(&[(":id", stream_id)])?; + if rows != 1 { + return Err(Error::new(format!("Stream {} missing from database", id))); + } + streams_to_delete.push(*stream_id); + } + let mut cam_stmt = tx.prepare_cached(r"delete from camera where id = :id")?; + let rows = cam_stmt.execute_named(&[(":id", &id)])?; + if rows != 1 { + return Err(Error::new(format!("Camera {} missing from database", id))); + } + } + tx.commit()?; + for id in streams_to_delete { + self.state.streams_by_id.remove(&id); } - self.state.cameras_by_id.remove(&id); self.state.cameras_by_uuid.remove(&uuid); return Ok(()) } @@ -1326,7 +1512,7 @@ impl Database { from recording where - camera_id = :camera_id and + stream_id = :stream_id and recording.start_time_90k > :start_time_90k - {} and recording.start_time_90k < :end_time_90k and recording.start_time_90k + recording.duration_90k > :start_time_90k @@ -1357,9 +1543,10 @@ impl Database { } let db = Database(Mutex::new(LockedDatabase{ conn: conn, - state: State{ + state: State { cameras_by_id: BTreeMap::new(), cameras_by_uuid: BTreeMap::new(), + streams_by_id: BTreeMap::new(), video_sample_entries: BTreeMap::new(), playback_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())), list_recordings_by_time_sql: list_recordings_by_time_sql, @@ -1369,9 +1556,11 @@ impl Database { let l = &mut *db.lock(); l.init_video_sample_entries().annotate_err("init_video_sample_entries")?; l.init_cameras().annotate_err("init_cameras")?; - for (&camera_id, ref mut camera) in &mut l.state.cameras_by_id { - // TODO: we could use one thread per camera if we had multiple db conns. - init_recordings(&mut l.conn, camera_id, camera) + l.init_streams().annotate_err("init_streams")?; + for (&stream_id, ref mut stream) in &mut l.state.streams_by_id { + // TODO: we could use one thread per stream if we had multiple db conns. + let camera = l.state.cameras_by_id.get(&stream.camera_id).unwrap(); + init_recordings(&mut l.conn, stream_id, camera, stream) .annotate_err("init_recordings")?; } } @@ -1422,21 +1611,22 @@ mod tests { assert_eq!("test-camera", row.host); assert_eq!("foo", row.username); assert_eq!("bar", row.password); - assert_eq!("/main", row.main_rtsp_path); - assert_eq!("/sub", row.sub_rtsp_path); - assert_eq!(42, row.retain_bytes); - assert_eq!(None, row.range); - assert_eq!(recording::Duration(0), row.duration); - assert_eq!(0, row.sample_file_bytes); + //assert_eq!("/main", row.main_rtsp_path); + //assert_eq!("/sub", row.sub_rtsp_path); + //assert_eq!(42, row.retain_bytes); + //assert_eq!(None, row.range); + //assert_eq!(recording::Duration(0), row.duration); + //assert_eq!(0, row.sample_file_bytes); } } assert_eq!(1, rows); + let stream_id = camera_id; // TODO rows = 0; { let db = db.lock(); let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); - db.list_recordings_by_time(camera_id, all_time, |_row| { + db.list_recordings_by_time(stream_id, all_time, |_row| { rows += 1; Ok(()) }).unwrap(); @@ -1444,30 +1634,24 @@ mod tests { assert_eq!(0, rows); } - fn assert_single_recording(db: &Database, camera_uuid: Uuid, r: &RecordingToInsert) { - let mut rows = 0; - let mut camera_id = -1; + fn assert_single_recording(db: &Database, stream_id: i32, r: &RecordingToInsert) { { let db = db.lock(); - for row in db.cameras_by_id().values() { - rows += 1; - camera_id = row.id; - assert_eq!(camera_uuid, row.uuid); - assert_eq!(Some(r.time.clone()), row.range); - assert_eq!(r.sample_file_bytes as i64, row.sample_file_bytes); - assert_eq!(r.time.end - r.time.start, row.duration); - } + let stream = db.streams_by_id().get(&stream_id).unwrap(); + assert_eq!(Some(r.time.clone()), stream.range); + assert_eq!(r.sample_file_bytes as i64, stream.sample_file_bytes); + assert_eq!(r.time.end - r.time.start, stream.duration); + db.cameras_by_id().get(&stream.camera_id).unwrap(); } - assert_eq!(1, rows); // TODO(slamb): test that the days logic works correctly. - rows = 0; + let mut rows = 0; let mut recording_id = -1; { let db = db.lock(); let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); - db.list_recordings_by_time(camera_id, all_time, |row| { + db.list_recordings_by_time(stream_id, all_time, |row| { rows += 1; recording_id = row.id; assert_eq!(r.time, @@ -1482,7 +1666,7 @@ mod tests { assert_eq!(1, rows); rows = 0; - db.lock().list_oldest_sample_files(camera_id, |row| { + db.lock().list_oldest_sample_files(stream_id, |row| { rows += 1; assert_eq!(recording_id, row.recording_id); assert_eq!(r.sample_file_uuid, row.uuid); @@ -1514,21 +1698,21 @@ mod tests { let two_min = recording::Duration(2 * 60 * TIME_UNITS_PER_SEC); let three_min = recording::Duration(3 * 60 * TIME_UNITS_PER_SEC); let four_min = recording::Duration(4 * 60 * TIME_UNITS_PER_SEC); - let test_day1 = &CameraDayKey(*b"2015-12-31"); - let test_day2 = &CameraDayKey(*b"2016-01-01"); + let test_day1 = &StreamDayKey(*b"2015-12-31"); + let test_day2 = &StreamDayKey(*b"2016-01-01"); adjust_days(test_time .. test_time + one_min, 1, &mut m); assert_eq!(1, m.len()); - assert_eq!(Some(&CameraDayValue{recordings: 1, duration: one_min}), m.get(test_day1)); + assert_eq!(Some(&StreamDayValue{recordings: 1, duration: one_min}), m.get(test_day1)); // Add to a day. adjust_days(test_time .. test_time + one_min, 1, &mut m); assert_eq!(1, m.len()); - assert_eq!(Some(&CameraDayValue{recordings: 2, duration: two_min}), m.get(test_day1)); + assert_eq!(Some(&StreamDayValue{recordings: 2, duration: two_min}), m.get(test_day1)); // Subtract from a day. adjust_days(test_time .. test_time + one_min, -1, &mut m); assert_eq!(1, m.len()); - assert_eq!(Some(&CameraDayValue{recordings: 1, duration: one_min}), m.get(test_day1)); + assert_eq!(Some(&StreamDayValue{recordings: 1, duration: one_min}), m.get(test_day1)); // Remove a day. adjust_days(test_time .. test_time + one_min, -1, &mut m); @@ -1537,20 +1721,20 @@ mod tests { // Create two days. adjust_days(test_time .. test_time + three_min, 1, &mut m); assert_eq!(2, m.len()); - assert_eq!(Some(&CameraDayValue{recordings: 1, duration: one_min}), m.get(test_day1)); - assert_eq!(Some(&CameraDayValue{recordings: 1, duration: two_min}), m.get(test_day2)); + assert_eq!(Some(&StreamDayValue{recordings: 1, duration: one_min}), m.get(test_day1)); + assert_eq!(Some(&StreamDayValue{recordings: 1, duration: two_min}), m.get(test_day2)); // Add to two days. adjust_days(test_time .. test_time + three_min, 1, &mut m); assert_eq!(2, m.len()); - assert_eq!(Some(&CameraDayValue{recordings: 2, duration: two_min}), m.get(test_day1)); - assert_eq!(Some(&CameraDayValue{recordings: 2, duration: four_min}), m.get(test_day2)); + assert_eq!(Some(&StreamDayValue{recordings: 2, duration: two_min}), m.get(test_day1)); + assert_eq!(Some(&StreamDayValue{recordings: 2, duration: four_min}), m.get(test_day2)); // Subtract from two days. adjust_days(test_time .. test_time + three_min, -1, &mut m); assert_eq!(2, m.len()); - assert_eq!(Some(&CameraDayValue{recordings: 1, duration: one_min}), m.get(test_day1)); - assert_eq!(Some(&CameraDayValue{recordings: 1, duration: two_min}), m.get(test_day2)); + assert_eq!(Some(&StreamDayValue{recordings: 1, duration: one_min}), m.get(test_day1)); + assert_eq!(Some(&StreamDayValue{recordings: 1, duration: two_min}), m.get(test_day2)); // Remove two days. adjust_days(test_time .. test_time + three_min, -1, &mut m); @@ -1560,11 +1744,11 @@ mod tests { #[test] fn test_day_bounds() { testutil::init(); - assert_eq!(CameraDayKey(*b"2017-10-10").bounds(), // normal day (24 hrs) + assert_eq!(StreamDayKey(*b"2017-10-10").bounds(), // normal day (24 hrs) recording::Time(135685692000000) .. recording::Time(135693468000000)); - assert_eq!(CameraDayKey(*b"2017-03-12").bounds(), // spring forward (23 hrs) + assert_eq!(StreamDayKey(*b"2017-03-12").bounds(), // spring forward (23 hrs) recording::Time(134037504000000) .. recording::Time(134044956000000)); - assert_eq!(CameraDayKey(*b"2017-11-05").bounds(), // fall back (25 hrs) + assert_eq!(StreamDayKey(*b"2017-11-05").bounds(), // fall back (25 hrs) recording::Time(135887868000000) .. recording::Time(135895968000000)); } @@ -1579,10 +1763,10 @@ mod tests { fn test_version_too_old() { testutil::init(); let c = setup_conn(); - c.execute_batch("delete from version; insert into version values (0, 0, '');").unwrap(); + c.execute_batch("delete from version; insert into version values (1, 0, '');").unwrap(); let e = Database::new(c).unwrap_err(); assert!(e.description().starts_with( - "Database schema version 0 is too old (expected 1)"), "got: {:?}", + "Database schema version 1 is too old (expected 2)"), "got: {:?}", e.description()); } @@ -1590,10 +1774,10 @@ mod tests { fn test_version_too_new() { testutil::init(); let c = setup_conn(); - c.execute_batch("delete from version; insert into version values (2, 0, '');").unwrap(); + c.execute_batch("delete from version; insert into version values (3, 0, '');").unwrap(); let e = Database::new(c).unwrap_err(); assert!(e.description().starts_with( - "Database schema version 2 is too new (expected 1)"), "got: {:?}", e.description()); + "Database schema version 3 is too new (expected 2)"), "got: {:?}", e.description()); } /// Basic test of running some queries on a fresh database. @@ -1618,13 +1802,16 @@ mod tests { host: "test-camera".to_owned(), username: "foo".to_owned(), password: "bar".to_owned(), - main_rtsp_path: "/main".to_owned(), - sub_rtsp_path: "/sub".to_owned(), + rtsp_paths: [ + "/main".to_owned(), + "/sub".to_owned(), + ], }).unwrap(); { let mut l = db.lock(); + let stream_id = l.cameras_by_id().get(&camera_id).unwrap().streams[0].unwrap(); let mut tx = l.tx().unwrap(); - tx.update_retention(camera_id, 42).unwrap(); + tx.update_retention(stream_id, 42).unwrap(); tx.commit().unwrap(); } let camera_uuid = { db.lock().cameras_by_id().get(&camera_id).unwrap().uuid }; @@ -1656,8 +1843,9 @@ mod tests { // Inserting a recording should succeed and remove its uuid from the reserved table. let start = recording::Time(1430006400 * TIME_UNITS_PER_SEC); - let recording = RecordingToInsert{ - camera_id: camera_id, + let stream_id = camera_id; // TODO + let recording = RecordingToInsert { + stream_id, sample_file_bytes: 42, run_offset: 0, flags: 0, @@ -1680,19 +1868,19 @@ mod tests { vec![uuid_to_keep]); // Queries should return the correct result (with caches update on insert). - assert_single_recording(&db, camera_uuid, &recording); + assert_single_recording(&db, stream_id, &recording); // Queries on a fresh database should return the correct result (with caches populated from // existing database contents rather than built on insert). let conn = db.close(); let db = Database::new(conn).unwrap(); - assert_single_recording(&db, camera_uuid, &recording); + assert_single_recording(&db, stream_id, &recording); // Deleting a recording should succeed, update the min/max times, and re-reserve the uuid. { let mut db = db.lock(); let mut v = Vec::new(); - db.list_oldest_sample_files(camera_id, |r| { v.push(r); true }).unwrap(); + db.list_oldest_sample_files(stream_id, |r| { v.push(r); true }).unwrap(); assert_eq!(1, v.len()); let mut tx = db.tx().unwrap(); tx.delete_recordings(&v).unwrap(); diff --git a/src/dir.rs b/src/dir.rs index e23485d..32495a2 100644 --- a/src/dir.rs +++ b/src/dir.rs @@ -254,7 +254,7 @@ pub fn start_syncer(dir: Arc) } pub struct NewLimit { - pub camera_id: i32, + pub stream_id: i32, pub limit: i64, } @@ -272,30 +272,30 @@ pub fn lower_retention(dir: Arc, limits: &[NewLimit]) -> Result<( let mut to_delete = Vec::new(); for l in limits { let before = to_delete.len(); - let camera = db.cameras_by_id().get(&l.camera_id) - .ok_or_else(|| Error::new(format!("no such camera {}", l.camera_id)))?; - if l.limit >= camera.sample_file_bytes { continue } - get_rows_to_delete(db, l.camera_id, camera, camera.retain_bytes - l.limit, + let stream = db.streams_by_id().get(&l.stream_id) + .ok_or_else(|| Error::new(format!("no such stream {}", l.stream_id)))?; + if l.limit >= stream.sample_file_bytes { continue } + get_rows_to_delete(db, l.stream_id, stream, stream.retain_bytes - l.limit, &mut to_delete)?; - info!("camera {}, {}->{}, deleting {} rows", camera.short_name, - camera.sample_file_bytes, l.limit, to_delete.len() - before); + info!("stream {}, {}->{}, deleting {} rows", stream.id, + stream.sample_file_bytes, l.limit, to_delete.len() - before); } Ok(to_delete) }) } -/// Gets rows to delete to bring a camera's disk usage within bounds. -fn get_rows_to_delete(db: &db::LockedDatabase, camera_id: i32, - camera: &db::Camera, extra_bytes_needed: i64, +/// Gets rows to delete to bring a stream's disk usage within bounds. +fn get_rows_to_delete(db: &db::LockedDatabase, stream_id: i32, + stream: &db::Stream, extra_bytes_needed: i64, to_delete: &mut Vec) -> Result<(), Error> { - let bytes_needed = camera.sample_file_bytes + extra_bytes_needed - camera.retain_bytes; + let bytes_needed = stream.sample_file_bytes + extra_bytes_needed - stream.retain_bytes; let mut bytes_to_delete = 0; if bytes_needed <= 0 { - debug!("{}: have remaining quota of {}", camera.short_name, -bytes_needed); + debug!("{}: have remaining quota of {}", stream.id, -bytes_needed); return Ok(()); } let mut n = 0; - db.list_oldest_sample_files(camera_id, |row| { + db.list_oldest_sample_files(stream_id, |row| { bytes_to_delete += row.sample_file_bytes as i64; to_delete.push(row); n += 1; @@ -303,10 +303,10 @@ fn get_rows_to_delete(db: &db::LockedDatabase, camera_id: i32, })?; if bytes_needed > bytes_to_delete { return Err(Error::new(format!("{}: couldn't find enough files to delete: {} left.", - camera.short_name, bytes_needed))); + stream.id, bytes_needed))); } info!("{}: deleting {} bytes in {} recordings ({} bytes needed)", - camera.short_name, bytes_to_delete, n, bytes_needed); + stream.id, bytes_to_delete, n, bytes_needed); Ok(()) } @@ -343,12 +343,12 @@ impl Syncer { } } - /// Rotates files for all cameras and deletes stale reserved uuids from previous runs. + /// Rotates files for all streams and deletes stale reserved uuids from previous runs. fn initial_rotation(&mut self) -> Result<(), Error> { self.do_rotation(|db| { let mut to_delete = Vec::new(); - for (camera_id, camera) in db.cameras_by_id() { - get_rows_to_delete(&db, *camera_id, camera, 0, &mut to_delete)?; + for (stream_id, stream) in db.streams_by_id() { + get_rows_to_delete(&db, *stream_id, stream, 0, &mut to_delete)?; } Ok(to_delete) }) @@ -389,7 +389,7 @@ impl Syncer { fn save(&mut self, recording: db::RecordingToInsert, f: fs::File) { if let Err(e) = self.save_helper(&recording, f) { error!("camera {}: will discard recording {} due to error while saving: {}", - recording.camera_id, recording.sample_file_uuid, e); + recording.stream_id, recording.sample_file_uuid, e); self.to_unlink.push(recording.sample_file_uuid); return; } @@ -416,10 +416,10 @@ impl Syncer { let mut db = self.dir.db.lock(); let mut new_next_uuid = l.next_uuid; { - let camera = - db.cameras_by_id().get(&recording.camera_id) - .ok_or_else(|| Error::new(format!("no such camera {}", recording.camera_id)))?; - get_rows_to_delete(&db, recording.camera_id, camera, + let stream = + db.streams_by_id().get(&recording.stream_id) + .ok_or_else(|| Error::new(format!("no such stream {}", recording.stream_id)))?; + get_rows_to_delete(&db, recording.stream_id, stream, recording.sample_file_bytes as i64, &mut to_delete)?; } let mut tx = db.tx()?; @@ -490,7 +490,7 @@ struct InnerWriter<'a> { adjuster: ClockAdjuster, - camera_id: i32, + stream_id: i32, video_sample_entry_id: i32, run_offset: i32, @@ -567,20 +567,20 @@ pub struct PreviousWriter { impl<'a> Writer<'a> { /// Opens the writer; for use by `SampleFileDir` (which should supply `f`). - fn open(f: fs::File, uuid: Uuid, prev: Option, camera_id: i32, + fn open(f: fs::File, uuid: Uuid, prev: Option, stream_id: i32, video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel) -> Result { Ok(Writer(Some(InnerWriter{ - syncer_channel: syncer_channel, - f: f, + syncer_channel, + f, index: recording::SampleIndexEncoder::new(), - uuid: uuid, + uuid, corrupt: false, hasher: hash::Hasher::new(hash::MessageDigest::sha1())?, prev_end: prev.map(|p| p.end_time), local_start: recording::Time(i64::max_value()), adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)), - camera_id: camera_id, - video_sample_entry_id: video_sample_entry_id, + stream_id, + video_sample_entry_id, run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0), unflushed_sample: None, }))) @@ -663,7 +663,7 @@ impl<'a> InnerWriter<'a> { else { 0 }; let local_start_delta = self.local_start - start; let recording = db::RecordingToInsert{ - camera_id: self.camera_id, + stream_id: self.stream_id, sample_file_bytes: self.index.sample_file_bytes, time: start .. end, local_time_delta: local_start_delta, diff --git a/src/json.rs b/src/json.rs index df5c9bb..1dca6bf 100644 --- a/src/json.rs +++ b/src/json.rs @@ -29,6 +29,7 @@ // along with this program. If not, see . use db; +use error::Error; use serde::ser::{SerializeMap, SerializeSeq, Serializer}; use std::collections::BTreeMap; use uuid::Uuid; @@ -41,17 +42,25 @@ pub struct TopLevel<'a> { // Use a custom serializer which presents the map's values as a sequence and includes the // "days" attribute or not, according to the bool in the tuple. #[serde(serialize_with = "TopLevel::serialize_cameras")] - pub cameras: (&'a BTreeMap, bool), + pub cameras: (&'a db::LockedDatabase, bool), } -/// JSON serialization wrapper for a single camera when processing `/cameras/` and -/// `/cameras//`. See `design/api.md` for details. +/// JSON serialization wrapper for a single camera when processing `/api/` and +/// `/api/cameras//`. See `design/api.md` for details. #[derive(Debug, Serialize)] #[serde(rename_all="camelCase")] pub struct Camera<'a> { pub uuid: Uuid, pub short_name: &'a str, pub description: &'a str, + + #[serde(serialize_with = "Camera::serialize_streams")] + pub streams: [Option>; 2], +} + +#[derive(Debug, Serialize)] +#[serde(rename_all="camelCase")] +pub struct Stream<'a> { pub retain_bytes: i64, pub min_start_time_90k: Option, pub max_end_time_90k: Option, @@ -59,26 +68,54 @@ pub struct Camera<'a> { pub total_sample_file_bytes: i64, #[serde(skip_serializing_if = "Option::is_none")] - #[serde(serialize_with = "Camera::serialize_days")] - pub days: Option<&'a BTreeMap>, + #[serde(serialize_with = "Stream::serialize_days")] + pub days: Option<&'a BTreeMap>, } impl<'a> Camera<'a> { - pub fn new(c: &'a db::Camera, include_days: bool) -> Self { - Camera{ + pub fn wrap(c: &'a db::Camera, db: &'a db::LockedDatabase, include_days: bool) -> Result { + Ok(Camera { uuid: c.uuid, short_name: &c.short_name, description: &c.description, - retain_bytes: c.retain_bytes, - min_start_time_90k: c.range.as_ref().map(|r| r.start.0), - max_end_time_90k: c.range.as_ref().map(|r| r.end.0), - total_duration_90k: c.duration.0, - total_sample_file_bytes: c.sample_file_bytes, - days: if include_days { Some(&c.days) } else { None }, - } + streams: [ + Stream::wrap(db, c.streams[0], include_days)?, + Stream::wrap(db, c.streams[1], include_days)?, + ], + }) } - fn serialize_days(days: &Option<&BTreeMap>, + fn serialize_streams(streams: &[Option>; 2], serializer: S) -> Result + where S: Serializer { + let mut map = serializer.serialize_map(Some(streams.len()))?; + for (i, s) in streams.iter().enumerate() { + if let &Some(ref s) = s { + map.serialize_key(db::StreamType::from_index(i).expect("invalid stream type index").as_str())?; + map.serialize_value(s)?; + } + } + map.end() + } +} + +impl<'a> Stream<'a> { + fn wrap(db: &'a db::LockedDatabase, id: Option, include_days: bool) -> Result, Error> { + let id = match id { + Some(id) => id, + None => return Ok(None), + }; + let s = db.streams_by_id().get(&id).ok_or_else(|| Error::new(format!("missing stream {}", id)))?; + Ok(Some(Stream { + retain_bytes: s.retain_bytes, + min_start_time_90k: s.range.as_ref().map(|r| r.start.0), + max_end_time_90k: s.range.as_ref().map(|r| r.end.0), + total_duration_90k: s.duration.0, + total_sample_file_bytes: s.sample_file_bytes, + days: if include_days { Some(&s.days) } else { None }, + })) + } + + fn serialize_days(days: &Option<&BTreeMap>, serializer: S) -> Result where S: Serializer { let days = match *days { @@ -89,7 +126,7 @@ impl<'a> Camera<'a> { for (k, v) in days { map.serialize_key(k.as_ref())?; let bounds = k.bounds(); - map.serialize_value(&CameraDayValue{ + map.serialize_value(&StreamDayValue{ start_time_90k: bounds.start.0, end_time_90k: bounds.end.0, total_duration_90k: v.duration.0, @@ -101,7 +138,7 @@ impl<'a> Camera<'a> { #[derive(Debug, Serialize)] #[serde(rename_all="camelCase")] -struct CameraDayValue { +struct StreamDayValue { pub start_time_90k: i64, pub end_time_90k: i64, pub total_duration_90k: i64, @@ -109,12 +146,14 @@ struct CameraDayValue { impl<'a> TopLevel<'a> { /// Serializes cameras as a list (rather than a map), optionally including the `days` field. - fn serialize_cameras(cameras: &(&BTreeMap, bool), + fn serialize_cameras(cameras: &(&db::LockedDatabase, bool), serializer: S) -> Result where S: Serializer { - let mut seq = serializer.serialize_seq(Some(cameras.0.len()))?; - for c in cameras.0.values() { - seq.serialize_element(&Camera::new(c, cameras.1))?; + let (db, include_days) = *cameras; + let cs = db.cameras_by_id(); + let mut seq = serializer.serialize_seq(Some(cs.len()))?; + for (_, c) in cs { + seq.serialize_element(&Camera::wrap(c, db, include_days).unwrap())?; // TODO: no unwrap. } seq.end() } diff --git a/src/mp4.rs b/src/mp4.rs index 8635313..df91de9 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -381,7 +381,7 @@ impl Segment { self.index_once.call_once(|| { let index = unsafe { &mut *self.index.get() }; *index = db.lock() - .with_recording_playback(self.s.camera_id, self.s.recording_id, + .with_recording_playback(self.s.stream_id, self.s.recording_id, |playback| self.build_index(playback)) .map_err(|e| { error!("Unable to build index for segment: {:?}", e); }); }); @@ -629,7 +629,7 @@ impl Slice { } let truns = mp4.0.db.lock() - .with_recording_playback(s.s.camera_id, s.s.recording_id, + .with_recording_playback(s.s.stream_id, s.s.recording_id, |playback| s.truns(playback, pos, len)) .map_err(|e| { Error::new(format!("Unable to build index for segment: {:?}", e)) })?; let truns = ARefs::new(truns); @@ -762,7 +762,7 @@ impl FileBuilder { if prev.s.have_trailing_zero() { return Err(Error::new(format!( "unable to append recording {}/{} after recording {}/{} with trailing zero", - row.camera_id, row.id, prev.s.camera_id, prev.s.recording_id))); + row.stream_id, row.id, prev.s.stream_id, prev.s.recording_id))); } } let s = Segment::new(db, &row, rel_range_90k, self.next_frame_num)?; @@ -811,7 +811,7 @@ impl FileBuilder { // Update the etag to reflect this segment. let mut data = [0_u8; 24]; let mut cursor = io::Cursor::new(&mut data[..]); - cursor.write_i32::(s.s.camera_id)?; + cursor.write_i32::(s.s.stream_id)?; cursor.write_i32::(s.s.recording_id)?; cursor.write_i64::(s.s.start.0)?; cursor.write_i32::(d.start)?; @@ -1452,7 +1452,7 @@ impl FileInner { fn get_video_sample_data(&self, i: usize, r: Range) -> Result { let s = &self.segments[i]; let uuid = { - self.db.lock().with_recording_playback(s.s.camera_id, s.s.recording_id, + self.db.lock().with_recording_playback(s.s.stream_id, s.s.recording_id, |p| Ok(p.sample_file_uuid))? }; let f = self.dir.open_sample_file(uuid)?; @@ -1541,7 +1541,7 @@ mod tests { use strutil; use super::*; use stream::{self, Opener, Stream}; - use testutil::{self, TestDb, TEST_CAMERA_ID}; + use testutil::{self, TestDb, TEST_STREAM_ID}; fn fill_slice(slice: &mut [u8], e: &E, start: u64) { let mut p = 0; @@ -1765,7 +1765,7 @@ mod tests { extra_data.width, extra_data.height, extra_data.sample_entry, extra_data.rfc6381_codec).unwrap(); let mut output = db.dir.create_writer(&db.syncer_channel, None, - TEST_CAMERA_ID, video_sample_entry_id).unwrap(); + TEST_STREAM_ID, video_sample_entry_id).unwrap(); // end_pts is the pts of the end of the most recent frame (start + duration). // It's needed because dir::Writer calculates a packet's duration from its pts and the @@ -1799,7 +1799,7 @@ mod tests { let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); { let db = db.lock(); - db.list_recordings_by_time(TEST_CAMERA_ID, all_time, |r| { + db.list_recordings_by_time(TEST_STREAM_ID, all_time, |r| { let d = r.duration_90k; assert!(skip_90k + shorten_90k < d); builder.append(&*db, r, skip_90k .. d - shorten_90k).unwrap(); @@ -2259,7 +2259,7 @@ mod bench { let segment = { let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); let mut row = None; - db.list_recordings_by_time(testutil::TEST_CAMERA_ID, all_time, |r| { + db.list_recordings_by_time(testutil::TEST_STREAM_ID, all_time, |r| { row = Some(r); Ok(()) }).unwrap(); @@ -2267,7 +2267,7 @@ mod bench { let rel_range_90k = 0 .. row.duration_90k; super::Segment::new(&db, &row, rel_range_90k, 1).unwrap() }; - db.with_recording_playback(segment.s.camera_id, segment.s.recording_id, |playback| { + db.with_recording_playback(segment.s.stream_id, segment.s.recording_id, |playback| { let v = segment.build_index(playback).unwrap(); // warm. b.bytes = v.len() as u64; // define the benchmark performance in terms of output bytes. b.iter(|| segment.build_index(playback).unwrap()); diff --git a/src/recording.rs b/src/recording.rs index cb2c49e..4547909 100644 --- a/src/recording.rs +++ b/src/recording.rs @@ -354,7 +354,7 @@ impl SampleIndexEncoder { /// Used by the `Mp4FileBuilder` class to splice together recordings into a single virtual .mp4. #[derive(Debug)] pub struct Segment { - pub camera_id: i32, + pub stream_id: i32, pub recording_id: i32, pub start: Time, @@ -381,8 +381,8 @@ impl Segment { pub fn new(db: &db::LockedDatabase, recording: &db::ListRecordingsRow, desired_range_90k: Range) -> Result { - let mut self_ = Segment{ - camera_id: recording.camera_id, + let mut self_ = Segment { + stream_id: recording.stream_id, recording_id: recording.id, start: recording.start, begin: None, @@ -413,7 +413,7 @@ impl Segment { // Slow path. Need to iterate through the index. trace!("recording::Segment::new slow path, desired_range_90k={:?}, recording={:#?}", self_.desired_range_90k, recording); - db.with_recording_playback(self_.camera_id, self_.recording_id, |playback| { + db.with_recording_playback(self_.stream_id, self_.recording_id, |playback| { let mut begin = Box::new(SampleIndexIterator::new()); let data = &(&playback).video_index; let mut it = SampleIndexIterator::new(); @@ -481,7 +481,7 @@ impl Segment { pub fn foreach(&self, playback: &db::RecordingPlayback, mut f: F) -> Result<(), Error> where F: FnMut(&SampleIndexIterator) -> Result<(), Error> { trace!("foreach on recording {}/{}: {} frames, actual_start_90k: {}", - self.camera_id, self.recording_id, self.frames, self.actual_start_90k()); + self.stream_id, self.recording_id, self.frames, self.actual_start_90k()); let data = &(&playback).video_index; let mut it = match self.begin { Some(ref b) => **b, @@ -490,11 +490,11 @@ impl Segment { if it.uninitialized() { if !it.next(data)? { return Err(Error::new(format!("recording {}/{}: no frames", - self.camera_id, self.recording_id))); + self.stream_id, self.recording_id))); } if !it.is_key() { return Err(Error::new(format!("recording {}/{}: doesn't start with key frame", - self.camera_id, self.recording_id))); + self.stream_id, self.recording_id))); } } let mut have_frame = true; @@ -502,7 +502,7 @@ impl Segment { for i in 0 .. self.frames { if !have_frame { return Err(Error::new(format!("recording {}/{}: expected {} frames, found only {}", - self.camera_id, self.recording_id, self.frames, + self.stream_id, self.recording_id, self.frames, i+1))); } if it.is_key() { @@ -510,7 +510,7 @@ impl Segment { if key_frame > self.key_frames { return Err(Error::new(format!( "recording {}/{}: more than expected {} key frames", - self.camera_id, self.recording_id, self.key_frames))); + self.stream_id, self.recording_id, self.key_frames))); } } @@ -522,7 +522,7 @@ impl Segment { } if key_frame < self.key_frames { return Err(Error::new(format!("recording {}/{}: expected {} key frames, found only {}", - self.camera_id, self.recording_id, self.key_frames, + self.stream_id, self.recording_id, self.key_frames, key_frame))); } Ok(()) @@ -656,7 +656,7 @@ mod tests { fn get_frames(db: &db::Database, segment: &Segment, f: F) -> Vec where F: Fn(&SampleIndexIterator) -> T { let mut v = Vec::new(); - db.lock().with_recording_playback(segment.camera_id, segment.recording_id, |playback| { + db.lock().with_recording_playback(segment.stream_id, segment.recording_id, |playback| { segment.foreach(playback, |it| { v.push(f(it)); Ok(()) }) }).unwrap(); v diff --git a/src/schema.sql b/src/schema.sql index fe55230..5e643af 100644 --- a/src/schema.sql +++ b/src/schema.sql @@ -63,31 +63,34 @@ create table camera ( username text, -- The password to use when accessing the camera. - password text, + password text +); - -- The path (starting with "/") to use in rtsp:// URLs to reference this - -- camera's "main" (full-quality) video stream. - main_rtsp_path text, +create table stream ( + id integer primary key, + camera_id integer not null references camera (id), + type text not null check (type in ('main', 'sub')), - -- The path (starting with "/") to use in rtsp:// URLs to reference this - -- camera's "sub" (low-bandwidth) video stream. - sub_rtsp_path text, + -- The path (starting with "/") to use in rtsp:// URLs to for this stream. + rtsp_path text not null, -- The number of bytes of video to retain, excluding the currently-recording -- file. Older files will be deleted as necessary to stay within this limit. retain_bytes integer not null check (retain_bytes >= 0), - -- The low 32 bits of the next recording id to assign for this camera. + -- The low 32 bits of the next recording id to assign for this stream. -- Typically this is the maximum current recording + 1, but it does -- not decrease if that recording is deleted. - next_recording_id integer not null check (next_recording_id >= 0) + next_recording_id integer not null check (next_recording_id >= 0), + + unique (camera_id, type) ); -- Each row represents a single completed recorded segment of video. -- Recordings are typically ~60 seconds; never more than 5 minutes. create table recording ( - -- The high 32 bits of composite_id are taken from the camera's id, which - -- improves locality. The low 32 bits are taken from the camera's + -- The high 32 bits of composite_id are taken from the stream's id, which + -- improves locality. The low 32 bits are taken from the stream's -- next_recording_id (which should be post-incremented in the same -- transaction). It'd be simpler to use a "without rowid" table and separate -- fields to make up the primary key, but @@ -98,7 +101,7 @@ create table recording ( -- This field is redundant with id above, but used to enforce the reference -- constraint and to structure the recording_start_time index. - camera_id integer not null references camera (id), + stream_id integer not null references stream (id), -- The offset of this recording within a run. 0 means this was the first -- recording made from a RTSP session. The start of the run has id @@ -135,12 +138,12 @@ create table recording ( video_sync_samples integer not null check (video_sync_samples > 0), video_sample_entry_id integer references video_sample_entry (id), - check (composite_id >> 32 = camera_id) + check (composite_id >> 32 = stream_id) ); create index recording_cover on recording ( - -- Typical queries use "where camera_id = ? order by start_time_90k". - camera_id, + -- Typical queries use "where stream_id = ? order by start_time_90k". + stream_id, start_time_90k, -- These fields are not used for ordering; they cover most queries so @@ -202,4 +205,4 @@ create table video_sample_entry ( ); insert into version (id, unix_time, notes) - values (1, cast(strftime('%s', 'now') as int), 'db creation'); + values (2, cast(strftime('%s', 'now') as int), 'db creation'); diff --git a/src/streamer.rs b/src/streamer.rs index 73702f4..61054d5 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -29,7 +29,7 @@ // along with this program. If not, see . use clock::{Clocks, TimerGuard}; -use db::{Camera, Database}; +use db::{Camera, Database, Stream}; use dir; use error::Error; use h264; @@ -62,7 +62,7 @@ pub struct Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { syncer_channel: dir::SyncerChannel, clocks: &'a C, opener: &'a stream::Opener, - camera_id: i32, + stream_id: i32, short_name: String, url: String, redacted_url: String, @@ -77,9 +77,9 @@ struct WriterState<'a> { impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { pub fn new<'b>(env: &Environment<'a, 'b, C, S>, syncer_channel: dir::SyncerChannel, - camera_id: i32, c: &Camera, rotate_offset_sec: i64, + stream_id: i32, c: &Camera, s: &Stream, rotate_offset_sec: i64, rotate_interval_sec: i64) -> Self { - Streamer{ + Streamer { shutdown: env.shutdown.clone(), rotate_offset_sec: rotate_offset_sec, rotate_interval_sec: rotate_interval_sec, @@ -88,10 +88,10 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { syncer_channel: syncer_channel, clocks: env.clocks, opener: env.opener, - camera_id: camera_id, - short_name: c.short_name.to_owned(), - url: format!("rtsp://{}:{}@{}{}", c.username, c.password, c.host, c.main_rtsp_path), - redacted_url: format!("rtsp://{}:redacted@{}{}", c.username, c.host, c.main_rtsp_path), + stream_id: stream_id, + short_name: format!("{}-{}", c.short_name, s.type_.as_str()), + url: format!("rtsp://{}:{}@{}{}", c.username, c.password, c.host, s.rtsp_path), + redacted_url: format!("rtsp://{}:redacted@{}{}", c.username, c.host, s.rtsp_path), } } @@ -167,7 +167,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { let r = r + if prev.is_none() { self.rotate_interval_sec } else { 0 }; let _t = TimerGuard::new(self.clocks, || "creating writer"); - let w = self.dir.create_writer(&self.syncer_channel, prev, self.camera_id, + let w = self.dir.create_writer(&self.syncer_channel, prev, self.stream_id, video_sample_entry_id)?; WriterState{ writer: w, @@ -358,8 +358,9 @@ mod tests { { let l = db.db.lock(); let camera = l.cameras_by_id().get(&testutil::TEST_CAMERA_ID).unwrap(); - stream = super::Streamer::new(&env, db.syncer_channel.clone(), testutil::TEST_CAMERA_ID, - camera, 0, 3); + let s = l.streams_by_id().get(&testutil::TEST_STREAM_ID).unwrap(); + stream = super::Streamer::new(&env, db.syncer_channel.clone(), testutil::TEST_STREAM_ID, + camera, s, 0, 3); } stream.run(); assert!(opener.streams.lock().unwrap().is_empty()); @@ -370,7 +371,7 @@ mod tests { // 3-second boundaries (such as 2016-04-26 00:00:03), rotation happens somewhat later: // * the first rotation is always skipped // * the second rotation is deferred until a key frame. - assert_eq!(get_frames(&db, testutil::TEST_CAMERA_ID, 1), &[ + assert_eq!(get_frames(&db, testutil::TEST_STREAM_ID, 1), &[ Frame{start_90k: 0, duration_90k: 90379, is_key: true}, Frame{start_90k: 90379, duration_90k: 89884, is_key: false}, Frame{start_90k: 180263, duration_90k: 89749, is_key: false}, @@ -380,12 +381,12 @@ mod tests { Frame{start_90k: 540015, duration_90k: 90021, is_key: false}, // pts_time 6.0001... Frame{start_90k: 630036, duration_90k: 89958, is_key: false}, ]); - assert_eq!(get_frames(&db, testutil::TEST_CAMERA_ID, 2), &[ + assert_eq!(get_frames(&db, testutil::TEST_STREAM_ID, 2), &[ Frame{start_90k: 0, duration_90k: 90011, is_key: true}, Frame{start_90k: 90011, duration_90k: 0, is_key: false}, ]); let mut recordings = Vec::new(); - db.list_recordings_by_id(testutil::TEST_CAMERA_ID, 1..3, |r| { + db.list_recordings_by_id(testutil::TEST_STREAM_ID, 1..3, |r| { recordings.push(r); Ok(()) }).unwrap(); diff --git a/src/testutil.rs b/src/testutil.rs index ec2eae7..d4d8793 100644 --- a/src/testutil.rs +++ b/src/testutil.rs @@ -45,6 +45,7 @@ static INIT: sync::Once = sync::ONCE_INIT; /// id of the camera created by `TestDb::new` below. pub const TEST_CAMERA_ID: i32 = 1; +pub const TEST_STREAM_ID: i32 = 1; /// Performs global initialization for tests. /// * set up logging. (Note the output can be confusing unless `RUST_TEST_THREADS=1` is set in @@ -89,12 +90,14 @@ impl TestDb { host: "test-camera".to_owned(), username: "foo".to_owned(), password: "bar".to_owned(), - main_rtsp_path: "/main".to_owned(), - sub_rtsp_path: "/sub".to_owned(), + rtsp_paths: [ + "/main".to_owned(), + "/sub".to_owned(), + ], }).unwrap()); test_camera_uuid = l.cameras_by_id().get(&TEST_CAMERA_ID).unwrap().uuid; let mut tx = l.tx().unwrap(); - tx.update_retention(TEST_CAMERA_ID, 1048576).unwrap(); + tx.update_retention(TEST_STREAM_ID, 1048576).unwrap(); tx.commit().unwrap(); } let path = tmpdir.path().to_str().unwrap().to_owned(); @@ -121,7 +124,7 @@ impl TestDb { tx.bypass_reservation_for_testing = true; const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); row_id = tx.insert_recording(&db::RecordingToInsert{ - camera_id: TEST_CAMERA_ID, + stream_id: TEST_STREAM_ID, sample_file_bytes: encoder.sample_file_bytes, time: START_TIME .. START_TIME + recording::Duration(encoder.total_duration_90k as i64), @@ -138,7 +141,7 @@ impl TestDb { tx.commit().unwrap(); } let mut row = None; - db.list_recordings_by_id(TEST_CAMERA_ID, row_id .. row_id + 1, + db.list_recordings_by_id(TEST_STREAM_ID, row_id .. row_id + 1, |r| { row = Some(r); Ok(()) }).unwrap(); row.unwrap() } @@ -155,7 +158,7 @@ pub fn add_dummy_recordings_to_db(db: &db::Database, num: usize) { const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); const DURATION: recording::Duration = recording::Duration(5399985); let mut recording = db::RecordingToInsert{ - camera_id: TEST_CAMERA_ID, + stream_id: TEST_STREAM_ID, sample_file_bytes: 30104460, flags: 0, time: START_TIME .. (START_TIME + DURATION), diff --git a/src/web.rs b/src/web.rs index d8ce81d..b0eac70 100644 --- a/src/web.rs +++ b/src/web.rs @@ -66,13 +66,13 @@ lazy_static! { } enum Path { - TopLevel, // "/api/" - InitSegment([u8; 20]), // "/api/init/.mp4" - Camera(Uuid), // "/api/cameras//" - CameraRecordings(Uuid), // "/api/cameras//recordings" - CameraViewMp4(Uuid), // "/api/cameras//view.mp4" - CameraViewMp4Segment(Uuid), // "/api/cameras//view.m4s" - Static, // "" + TopLevel, // "/api/" + InitSegment([u8; 20]), // "/api/init/.mp4" + Camera(Uuid), // "/api/cameras//" + StreamRecordings(Uuid, db::StreamType), // "/api/cameras///recordings" + StreamViewMp4(Uuid, db::StreamType), // "/api/cameras///view.mp4" + StreamViewMp4Segment(Uuid, db::StreamType), // "/api/cameras///view.m4s" + Static, // "" NotFound, } @@ -101,18 +101,33 @@ fn decode_path(path: &str) -> Path { None => { return Path::NotFound; }, Some(s) => s, }; - let (uuid, path) = path.split_at(slash); + let uuid = &path[0 .. slash]; + let path = &path[slash+1 .. ]; // TODO(slamb): require uuid to be in canonical format. let uuid = match Uuid::parse_str(uuid) { Ok(u) => u, Err(_) => { return Path::NotFound }, }; + + if path.is_empty() { + return Path::Camera(uuid); + } + + let slash = match path.find('/') { + None => { return Path::NotFound; }, + Some(s) => s, + }; + let (type_, path) = path.split_at(slash); + + let type_ = match db::StreamType::parse(type_) { + None => { return Path::NotFound; }, + Some(t) => t, + }; match path { - "/" => Path::Camera(uuid), - "/recordings" => Path::CameraRecordings(uuid), - "/view.mp4" => Path::CameraViewMp4(uuid), - "/view.m4s" => Path::CameraViewMp4Segment(uuid), + "/recordings" => Path::StreamRecordings(uuid, type_), + "/view.mp4" => Path::StreamViewMp4(uuid, type_), + "/view.m4s" => Path::StreamViewMp4Segment(uuid, type_), _ => Path::NotFound, } } @@ -200,7 +215,7 @@ impl ServiceInner { let db = self.db.lock(); serde_json::to_writer(&mut w, &json::TopLevel { time_zone_name: &self.time_zone_name, - cameras: (db.cameras_by_id(), days), + cameras: (&db, days), })?; } Ok(resp) @@ -212,12 +227,12 @@ impl ServiceInner { let db = self.db.lock(); let camera = db.get_camera(uuid) .ok_or_else(|| Error::new("no such camera".to_owned()))?; - serde_json::to_writer(&mut w, &json::Camera::new(camera, true))? + serde_json::to_writer(&mut w, &json::Camera::wrap(camera, &db, true)?)? }; Ok(resp) } - fn camera_recordings(&self, req: &Request, uuid: Uuid) + fn stream_recordings(&self, req: &Request, uuid: Uuid, type_: db::StreamType) -> Result, Error> { let (r, split) = { let mut time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); @@ -240,7 +255,9 @@ impl ServiceInner { let db = self.db.lock(); let camera = db.get_camera(uuid) .ok_or_else(|| Error::new("no such camera".to_owned()))?; - db.list_aggregated_recordings(camera.id, r, split, |row| { + let stream_id = camera.streams[type_.index()] + .ok_or_else(|| Error::new("no such stream".to_owned()))?; + db.list_aggregated_recordings(stream_id, r, split, |row| { let end = row.ids.end - 1; // in api, ids are inclusive. out.recordings.push(json::Recording { start_id: row.ids.start, @@ -276,23 +293,23 @@ impl ServiceInner { self.not_found() } - fn camera_view_mp4(&self, uuid: Uuid, type_: mp4::Type, query: Option<&str>, req: &Request) - -> Result, Error> { - let camera_id = { + fn stream_view_mp4(&self, req: &Request, uuid: Uuid, stream_type_: db::StreamType, + mp4_type_: mp4::Type) -> Result, Error> { + let stream_id = { let db = self.db.lock(); let camera = db.get_camera(uuid) .ok_or_else(|| Error::new("no such camera".to_owned()))?; - camera.id + camera.streams[stream_type_.index()].ok_or_else(|| Error::new("no such stream".to_owned()))? }; - let mut builder = mp4::FileBuilder::new(type_); - if let Some(q) = query { + let mut builder = mp4::FileBuilder::new(mp4_type_); + if let Some(q) = req.uri().query() { for (key, value) in form_urlencoded::parse(q.as_bytes()) { let (key, value) = (key.borrow(), value.borrow()); match key { "s" => { let s = Segments::parse(value).map_err( |_| Error::new(format!("invalid s parameter: {}", value)))?; - debug!("camera_view_mp4: appending s={:?}", s); + debug!("stream_view_mp4: appending s={:?}", s); let mut est_segments = (s.ids.end - s.ids.start) as usize; if let Some(end) = s.end_time { // There should be roughly ceil((end - start) / @@ -309,15 +326,15 @@ impl ServiceInner { let db = self.db.lock(); let mut prev = None; let mut cur_off = 0; - db.list_recordings_by_id(camera_id, s.ids.clone(), |r| { + db.list_recordings_by_id(stream_id, s.ids.clone(), |r| { // Check for missing recordings. match prev { None if r.id == s.ids.start => {}, None => return Err(Error::new(format!("no such recording {}/{}", - camera_id, s.ids.start))), + stream_id, s.ids.start))), Some(id) if r.id != id + 1 => { return Err(Error::new(format!("no such recording {}/{}", - camera_id, id + 1))); + stream_id, id + 1))); }, _ => {}, }; @@ -330,11 +347,11 @@ impl ServiceInner { let start = cmp::max(0, s.start_time - cur_off); let end = cmp::min(d, end_time - cur_off); let times = start as i32 .. end as i32; - debug!("...appending recording {}/{} with times {:?} (out of dur {})", - r.camera_id, r.id, times, d); + debug!("...appending recording {}/{} with times {:?} \ + (out of dur {})", r.stream_id, r.id, times, d); builder.append(&db, r, start as i32 .. end as i32)?; } else { - debug!("...skipping recording {}/{} dur {}", r.camera_id, r.id, d); + debug!("...skipping recording {}/{} dur {}", r.stream_id, r.id, d); } cur_off += d; Ok(()) @@ -344,11 +361,11 @@ impl ServiceInner { match prev { Some(id) if s.ids.end != id + 1 => { return Err(Error::new(format!("no such recording {}/{}", - camera_id, s.ids.end - 1))); + stream_id, s.ids.end - 1))); }, None => { return Err(Error::new(format!("no such recording {}/{}", - camera_id, s.ids.start))); + stream_id, s.ids.start))); }, _ => {}, }; @@ -451,12 +468,12 @@ impl server::Service for Service { Path::InitSegment(sha1) => self.0.init_segment(sha1, &req), Path::TopLevel => self.0.top_level(&req), Path::Camera(uuid) => self.0.camera(&req, uuid), - Path::CameraRecordings(uuid) => self.0.camera_recordings(&req, uuid), - Path::CameraViewMp4(uuid) => { - self.0.camera_view_mp4(uuid, mp4::Type::Normal, req.uri().query(), &req) + Path::StreamRecordings(uuid, type_) => self.0.stream_recordings(&req, uuid, type_), + Path::StreamViewMp4(uuid, type_) => { + self.0.stream_view_mp4(&req, uuid, type_, mp4::Type::Normal) }, - Path::CameraViewMp4Segment(uuid) => { - self.0.camera_view_mp4(uuid, mp4::Type::MediaSegment, req.uri().query(), &req) + Path::StreamViewMp4Segment(uuid, type_) => { + self.0.stream_view_mp4(&req, uuid, type_, mp4::Type::MediaSegment) }, Path::NotFound => self.0.not_found(), Path::Static => self.0.static_file(&req), @@ -539,10 +556,10 @@ mod bench { } #[bench] - fn serve_camera_recordings(b: &mut Bencher) { + fn serve_stream_recordings(b: &mut Bencher) { testutil::init(); let server = &*SERVER; - let url = reqwest::Url::parse(&format!("{}/api/cameras/{}/recordings", server.base_url, + let url = reqwest::Url::parse(&format!("{}/api/cameras/{}/main/recordings", server.base_url, server.test_camera_uuid)).unwrap(); let mut buf = Vec::new(); let client = reqwest::Client::new(); diff --git a/ui-src/index.html b/ui-src/index.html index c3b4590..ce6bfc5 100644 --- a/ui-src/index.html +++ b/ui-src/index.html @@ -40,8 +40,10 @@