diff --git a/design/api.md b/design/api.md index 60a689e..c35cad0 100644 --- a/design/api.md +++ b/design/api.md @@ -44,29 +44,32 @@ The `application/json` response will have a dict as follows: * `uuid`: in text format * `shortName`: a short name (typically one or two words) * `description`: a longer description (typically a phrase or paragraph) - * `retainBytes`: the configured total number of bytes of completed - recordings to retain. - * `minStartTime90k`: the start time of the earliest recording for this - camera, in 90kHz units since 1970-01-01 00:00:00 UTC. - * `maxEndTime90k`: the end time of the latest recording for this camera, - in 90kHz units since 1970-01-01 00:00:00 UTC. - * `totalDuration90k`: the total duration recorded, in 90 kHz units. - This is no greater than `maxEndTime90k - maxStartTime90k`; it will be - lesser if there are gaps in the recorded data. - * `totalSampleFileBytes`: the total number of bytes of sample data (the - `mdat` portion of a `.mp4` file). - * `days`: object representing calendar days (in the server's time zone) - with non-zero total duration of recordings for that day. The keys are - of the form `YYYY-mm-dd`; the values are objects with the following - attributes: - * `totalDuration90k` is the total duration recorded during that day. - If a recording spans a day boundary, some portion of it is accounted to - each day. - * `startTime90k` is the start of that calendar day in the server's time - zone. - * `endTime90k` is the end of that calendar day in the server's time zone. - It is usually 24 hours after the start time. It might be 23 hours or 25 - hours during spring forward or fall back, respectively. + * `streams`: a dict of stream type ("main" or "sub") to a dictionary + describing the stream: + * `retainBytes`: the configured total number of bytes of completed + recordings to retain. + * `minStartTime90k`: the start time of the earliest recording for + this camera, in 90kHz units since 1970-01-01 00:00:00 UTC. + * `maxEndTime90k`: the end time of the latest recording for this + camera, in 90kHz units since 1970-01-01 00:00:00 UTC. + * `totalDuration90k`: the total duration recorded, in 90 kHz units. + This is no greater than `maxEndTime90k - maxStartTime90k`; it will + be lesser if there are gaps in the recorded data. + * `totalSampleFileBytes`: the total number of bytes of sample data + (the `mdat` portion of a `.mp4` file). + * `days`: object representing calendar days (in the server's time + zone) with non-zero total duration of recordings for that day. The + keys are of the form `YYYY-mm-dd`; the values are objects with the + following attributes: + * `totalDuration90k` is the total duration recorded during that + day. If a recording spans a day boundary, some portion of it + is accounted to each day. + * `startTime90k` is the start of that calendar day in the + server's time zone. + * `endTime90k` is the end of that calendar day in the server's + time zone. It is usually 24 hours after the start time. It + might be 23 hours or 25 hours during spring forward or fall + back, respectively. Example response: @@ -78,23 +81,27 @@ Example response: "uuid": "fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe", "shortName": "driveway", "description": "Hikvision DS-2CD2032 overlooking the driveway from east", - "retainBytes": 536870912000, - "minStartTime90k": 130888729442361, - "maxEndTime90k": 130985466591817, - "totalDuration90k": 96736169725, - "totalSampleFileBytes": 446774393937, - "days": { - "2016-05-01": { - "endTime90k": 131595516000000, - "startTime90k": 131587740000000, - "totalDuration90k": 52617609 - }, - "2016-05-02": { - "endTime90k": 131603292000000, - "startTime90k": 131595516000000, - "totalDuration90k": 20946022 + "streams": { + "main": { + "retainBytes": 536870912000, + "minStartTime90k": 130888729442361, + "maxEndTime90k": 130985466591817, + "totalDuration90k": 96736169725, + "totalSampleFileBytes": 446774393937, + "days": { + "2016-05-01": { + "endTime90k": 131595516000000, + "startTime90k": 131587740000000, + "totalDuration90k": 52617609 + }, + "2016-05-02": { + "endTime90k": 131603292000000, + "startTime90k": 131595516000000, + "totalDuration90k": 20946022 + } + } } - }, + } }, ... ], @@ -109,29 +116,33 @@ Example response: ```json { - "days": { - "2016-05-01": { - "endTime90k": 131595516000000, - "startTime90k": 131587740000000, - "totalDuration90k": 52617609 - }, - "2016-05-02": { - "endTime90k": 131603292000000, - "startTime90k": 131595516000000, - "totalDuration90k": 20946022 + "description": "", + "streams": { + "main": { + "days": { + "2016-05-01": { + "endTime90k": 131595516000000, + "startTime90k": 131587740000000, + "totalDuration90k": 52617609 + }, + "2016-05-02": { + "endTime90k": 131603292000000, + "startTime90k": 131595516000000, + "totalDuration90k": 20946022 + } + }, + "maxEndTime90k": 131598273666690, + "minStartTime90k": 131590386129355, + "retainBytes": 104857600, + "totalDuration90k": 73563631, + "totalSampleFileBytes": 98901406 } }, - "description": "", - "maxEndTime90k": 131598273666690, - "minStartTime90k": 131590386129355, - "retainBytes": 104857600, - "shortName": "driveway", - "totalDuration90k": 73563631, - "totalSampleFileBytes": 98901406 + "shortName": "driveway" } ``` -### `/api/cameras//recordings` +### `/api/cameras///recordings` A GET returns information about recordings, in descending order. @@ -175,7 +186,7 @@ Each recording object has the following properties: Example request URI (with added whitespace between parameters): ``` -/api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/recordings +/api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/recordings ?startTime90k=130888729442361 &endTime90k=130985466591817 ``` @@ -204,7 +215,7 @@ Example response: } ``` -### `/api/cameras//view.mp4` +### `/api/cameras///view.mp4` A GET returns a `.mp4` file, with an etag and support for range requests. The MIME type will be `video/mp4`, with a `codecs` parameter as specified in [RFC @@ -230,27 +241,27 @@ Expected query parameters: Example request URI to retrieve all of recording id 1 from the given camera: ``` - /api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/view.mp4?s=1 + /api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/view.mp4?s=1 ``` Example request URI to retrieve all of recording ids 1–5 from the given camera, with timestamp subtitles: ``` - /api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/view.mp4?s=1-5&ts=true + /api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/view.mp4?s=1-5&ts=true ``` Example request URI to retrieve recording id 1, skipping its first 26 90,000ths of a second: ``` - /api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/view.mp4?s=1.26 + /api/cameras/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/main/view.mp4?s=1.26 ``` TODO: error behavior on missing segment. It should be a 404, likely with an `application/json` body describing what portion if any (still) exists. -### `/api/cameras//view.m4s` +### `/api/cameras///view.m4s` A GET returns a `.mp4` suitable for use as a [HTML5 Media Source Extensions media segment][media-segment]. The MIME type will be `video/mp4`, with a diff --git a/package.json b/package.json index a009a22..d984618 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "url": "https://github.com/scottlamb/moonfire-nvr/issues" }, "scripts": { - "build": "webpack && ln ui-src/index.html ui-dist/" + "build": "webpack && ln -f ui-src/index.html ui-dist/" }, "dependencies": { "jquery": "^3.2.1", diff --git a/src/cmds/config/cameras.rs b/src/cmds/config/cameras.rs index 9193837..3fd55ec 100644 --- a/src/cmds/config/cameras.rs +++ b/src/cmds/config/cameras.rs @@ -51,14 +51,13 @@ fn get_change(siv: &mut Cursive) -> db::CameraChange { let p = siv.find_id::("password").unwrap().get_content().as_str().into(); let m = siv.find_id::("main_rtsp_path").unwrap().get_content().as_str().into(); let s = siv.find_id::("sub_rtsp_path").unwrap().get_content().as_str().into(); - db::CameraChange{ + db::CameraChange { short_name: sn, description: d, host: h, username: u, password: p, - main_rtsp_path: m, - sub_rtsp_path: s, + rtsp_paths: [m, s], } } @@ -145,8 +144,20 @@ fn confirm_deletion(siv: &mut Cursive, db: &Arc, dir: &Arc("confirm").unwrap().get_content(); if decode_size(typed.as_str()).ok() == Some(to_delete) { siv.pop_layer(); // deletion confirmation dialog - if let Err(e) = dir::lower_retention(dir.clone(), - &[dir::NewLimit{camera_id: id, limit: 0}]) { + + let mut zero_limits = Vec::new(); + { + let l = db.lock(); + for (&stream_id, stream) in l.streams_by_id() { + if stream.camera_id == id { + zero_limits.push(dir::NewLimit { + stream_id, + limit: 0, + }); + } + } + } + if let Err(e) = dir::lower_retention(dir.clone(), &zero_limits) { siv.add_layer(views::Dialog::text(format!("Unable to delete recordings: {}", e)) .title("Error") .dismiss_button("Abort")); @@ -162,7 +173,6 @@ fn confirm_deletion(siv: &mut Cursive, db: &Arc, dir: &Arc, dir: &Arc, id: i32) { - info!("actually_delete call"); siv.pop_layer(); // get rid of the add/edit camera dialog. let result = { let mut l = db.lock(); @@ -198,14 +208,14 @@ fn edit_camera_dialog(db: &Arc, dir: &Arc, siv .child(views::DummyView) .child(views::Button::new("Test", |siv| { let c = get_change(siv); - press_test(siv, &c, "main", &c.main_rtsp_path) + press_test(siv, &c, "main", &c.rtsp_paths[0]) }))) .child("sub_rtsp_path", views::LinearLayout::horizontal() .child(views::EditView::new().with_id("sub_rtsp_path").full_width()) .child(views::DummyView) .child(views::Button::new("Test", |siv| { let c = get_change(siv); - press_test(siv, &c, "sub", &c.sub_rtsp_path) + press_test(siv, &c, "sub", &c.rtsp_paths[1]) }))) .min_height(8); let layout = views::LinearLayout::vertical() @@ -214,22 +224,41 @@ fn edit_camera_dialog(db: &Arc, dir: &Arc, siv .child(views::TextArea::new().with_id("description").min_height(3)) .full_width(); let mut dialog = views::Dialog::around(layout); - let dialog = if let Some(id) = *item { + let dialog = if let Some(camera_id) = *item { let l = db.lock(); - let camera = l.cameras_by_id().get(&id).expect("missing camera"); + let camera = l.cameras_by_id().get(&camera_id).expect("missing camera"); dialog.find_id("uuid", |v: &mut views::TextView| v.set_content(camera.uuid.to_string())) .expect("missing TextView"); - let bytes = camera.sample_file_bytes; + let mut main_rtsp_path = ""; + let mut sub_rtsp_path = ""; + let mut bytes = 0; + for (_, s) in l.streams_by_id() { + if s.camera_id != camera_id { continue; } + bytes += s.sample_file_bytes; + match s.type_ { + db::StreamType::MAIN => main_rtsp_path = &s.rtsp_path, + db::StreamType::SUB => sub_rtsp_path = &s.rtsp_path, + }; + } let name = camera.short_name.clone(); - for &(view_id, content) in &[("short_name", &camera.short_name), - ("host", &camera.host), - ("username", &camera.username), - ("password", &camera.password), - ("main_rtsp_path", &camera.main_rtsp_path), - ("sub_rtsp_path", &camera.sub_rtsp_path)] { + for &(view_id, content) in &[("short_name", &*camera.short_name), + ("host", &*camera.host), + ("username", &*camera.username), + ("password", &*camera.password), + ("main_rtsp_path", main_rtsp_path), + ("sub_rtsp_path", sub_rtsp_path)] { dialog.find_id(view_id, |v: &mut views::EditView| v.set_content(content.to_string())) .expect("missing EditView"); } + for s in l.streams_by_id().values() { + if s.camera_id != camera_id { continue }; + let id = match s.type_ { + db::StreamType::MAIN => "main_rtsp_path", + db::StreamType::SUB => "sub_rtsp_path", + }; + dialog.find_id(id, |v: &mut views::EditView| v.set_content(s.rtsp_path.to_string())) + .expect("missing EditView"); + } dialog.find_id("description", |v: &mut views::TextArea| v.set_content(camera.description.to_string())) .expect("missing TextArea"); @@ -237,12 +266,12 @@ fn edit_camera_dialog(db: &Arc, dir: &Arc, siv .button("Edit", { let db = db.clone(); let dir = dir.clone(); - move |s| press_edit(s, &db, &dir, Some(id)) + move |s| press_edit(s, &db, &dir, Some(camera_id)) }) .button("Delete", { let db = db.clone(); let dir = dir.clone(); - move |s| press_delete(s, &db, &dir, id, name.clone(), bytes) + move |s| press_delete(s, &db, &dir, camera_id, name.clone(), bytes) }) } else { dialog.title("Add camera") diff --git a/src/cmds/config/mod.rs b/src/cmds/config/mod.rs index 0894e8b..504aaff 100644 --- a/src/cmds/config/mod.rs +++ b/src/cmds/config/mod.rs @@ -144,7 +144,8 @@ pub fn run() -> Result<(), Error> { move |siv, item| item(&db, &dir, siv) }) .item("Edit cameras".to_string(), cameras::add_dialog) - .item("Edit retention".to_string(), retention::add_dialog)) + .item("Edit retention".to_string(), retention::add_dialog) + ) .button("Quit", |siv| siv.quit()) .title("Main menu")); diff --git a/src/cmds/config/retention.rs b/src/cmds/config/retention.rs index f0560a6..1ca4e34 100644 --- a/src/cmds/config/retention.rs +++ b/src/cmds/config/retention.rs @@ -42,7 +42,7 @@ use std::rc::Rc; use std::sync::Arc; use super::{decode_size, encode_size}; -struct Camera { +struct Stream { label: String, used: i64, retain: Option, // None if unparseable @@ -55,15 +55,15 @@ struct Model { total_used: i64, total_retain: i64, errors: isize, - cameras: BTreeMap, + streams: BTreeMap, } /// Updates the limits in the database. Doesn't delete excess data (if any). fn update_limits_inner(model: &Model) -> Result<(), Error> { let mut db = model.db.lock(); let mut tx = db.tx()?; - for (&id, camera) in &model.cameras { - tx.update_retention(id, camera.retain.unwrap())?; + for (&id, stream) in &model.streams { + tx.update_retention(id, stream.retain.unwrap())?; } tx.commit() } @@ -77,12 +77,12 @@ fn update_limits(model: &Model, siv: &mut Cursive) { } fn edit_limit(model: &RefCell, siv: &mut Cursive, id: i32, content: &str) { - info!("on_edit called for id {}", id); + debug!("on_edit called for id {}", id); let mut model = model.borrow_mut(); let model: &mut Model = &mut *model; - let camera = model.cameras.get_mut(&id).unwrap(); + let stream = model.streams.get_mut(&id).unwrap(); let new_value = decode_size(content).ok(); - let delta = new_value.unwrap_or(0) - camera.retain.unwrap_or(0); + let delta = new_value.unwrap_or(0) - stream.retain.unwrap_or(0); let old_errors = model.errors; if delta != 0 { let prev_over = model.total_retain > model.fs_capacity; @@ -91,7 +91,6 @@ fn edit_limit(model: &RefCell, siv: &mut Cursive, id: i32, content: &str) .unwrap() .set_content(encode_size(model.total_retain)); let now_over = model.total_retain > model.fs_capacity; - info!("now_over: {}", now_over); if now_over != prev_over { model.errors += if now_over { 1 } else { -1 }; siv.find_id::("total_ok") @@ -99,13 +98,13 @@ fn edit_limit(model: &RefCell, siv: &mut Cursive, id: i32, content: &str) .set_content(if now_over { "*" } else { " " }); } } - if new_value.is_none() != camera.retain.is_none() { + if new_value.is_none() != stream.retain.is_none() { model.errors += if new_value.is_none() { 1 } else { -1 }; siv.find_id::(&format!("{}_ok", id)) .unwrap() .set_content(if new_value.is_none() { "*" } else { " " }); } - camera.retain = new_value; + stream.retain = new_value; info!("model.errors = {}", model.errors); if (model.errors == 0) != (old_errors == 0) { info!("toggling change state: errors={}", model.errors); @@ -119,7 +118,7 @@ fn confirm_deletion(model: &RefCell, siv: &mut Cursive, to_delete: i64) { let typed = siv.find_id::("confirm") .unwrap() .get_content(); - info!("confirm, typed: {} vs expected: {}", typed.as_str(), to_delete); + debug!("confirm, typed: {} vs expected: {}", typed.as_str(), to_delete); if decode_size(typed.as_str()).ok() == Some(to_delete) { actually_delete(model, siv); } else { @@ -132,8 +131,8 @@ fn confirm_deletion(model: &RefCell, siv: &mut Cursive, to_delete: i64) { fn actually_delete(model: &RefCell, siv: &mut Cursive) { let model = &*model.borrow(); let new_limits: Vec<_> = - model.cameras.iter() - .map(|(&id, c)| dir::NewLimit{camera_id: id, limit: c.retain.unwrap()}) + model.streams.iter() + .map(|(&id, s)| dir::NewLimit {stream_id: id, limit: s.retain.unwrap()}) .collect(); siv.pop_layer(); // deletion confirmation siv.pop_layer(); // retention dialog @@ -150,11 +149,11 @@ fn press_change(model: &Rc>, siv: &mut Cursive) { if model.borrow().errors > 0 { return; } - let to_delete = model.borrow().cameras.values().map( - |c| ::std::cmp::max(c.used - c.retain.unwrap(), 0)).sum(); - info!("change press, to_delete={}", to_delete); + let to_delete = model.borrow().streams.values().map( + |s| ::std::cmp::max(s.used - s.retain.unwrap(), 0)).sum(); + debug!("change press, to_delete={}", to_delete); if to_delete > 0 { - let prompt = format!("Some cameras' usage exceeds new limit. Please confirm the amount \ + let prompt = format!("Some streams' usage exceeds new limit. Please confirm the amount \ of data to delete by typing it back:\n\n{}", encode_size(to_delete)); let dialog = views::Dialog::around( views::LinearLayout::vertical() @@ -179,19 +178,20 @@ fn press_change(model: &Rc>, siv: &mut Cursive) { pub fn add_dialog(db: &Arc, dir: &Arc, siv: &mut Cursive) { let model = { - let mut cameras = BTreeMap::new(); + let mut streams = BTreeMap::new(); let mut total_used = 0; let mut total_retain = 0; { let db = db.lock(); - for (&id, camera) in db.cameras_by_id() { - cameras.insert(id, Camera{ - label: format!("{}: {}", id, camera.short_name), - used: camera.sample_file_bytes, - retain: Some(camera.retain_bytes), + for (&id, s) in db.streams_by_id() { + let c = db.cameras_by_id().get(&s.camera_id).expect("stream without camera"); + streams.insert(id, Stream { + label: format!("{}: {}: {}", id, c.short_name, s.type_.as_str()), + used: s.sample_file_bytes, + retain: Some(s.retain_bytes), }); - total_used += camera.sample_file_bytes; - total_retain += camera.retain_bytes; + total_used += s.sample_file_bytes; + total_retain += s.retain_bytes; } } let stat = dir.statfs().unwrap(); @@ -199,27 +199,27 @@ pub fn add_dialog(db: &Arc, dir: &Arc, siv: &m Rc::new(RefCell::new(Model{ dir: dir.clone(), db: db.clone(), - fs_capacity: fs_capacity, - total_used: total_used, - total_retain: total_retain, + fs_capacity, + total_used, + total_retain, errors: (total_retain > fs_capacity) as isize, - cameras: cameras, + streams, })) }; let mut list = views::ListView::new(); list.add_child( - "camera", + "stream", views::LinearLayout::horizontal() .child(views::TextView::new("usage").fixed_width(25)) .child(views::TextView::new("limit").fixed_width(25))); - for (&id, camera) in &model.borrow().cameras { + for (&id, stream) in &model.borrow().streams { list.add_child( - &camera.label, + &stream.label, views::LinearLayout::horizontal() - .child(views::TextView::new(encode_size(camera.used)).fixed_width(25)) + .child(views::TextView::new(encode_size(stream.used)).fixed_width(25)) .child(views::EditView::new() - .content(encode_size(camera.retain.unwrap())) + .content(encode_size(stream.retain.unwrap())) .on_edit({ let model = model.clone(); move |siv, content, _pos| edit_limit(&model, siv, id, content) diff --git a/src/cmds/run.rs b/src/cmds/run.rs index bd69b6d..ddfea99 100644 --- a/src/cmds/run.rs +++ b/src/cmds/run.rs @@ -98,18 +98,21 @@ pub fn run() -> Result<(), Error> { &args.flag_db_dir, if args.flag_read_only { super::OpenMode::ReadOnly } else { super::OpenMode::ReadWrite })?; let db = Arc::new(db::Database::new(conn).unwrap()); + + // TODO: multiple sample file dirs. let dir = dir::SampleFileDir::new(&args.flag_sample_file_dir, db.clone()).unwrap(); info!("Database is loaded."); let s = web::Service::new(db.clone(), dir.clone(), Some(&args.flag_ui_dir), resolve_zone())?; - // Start a streamer for each camera. + // Start a streamer for each stream. + // TODO: enabled only. let shutdown_streamers = Arc::new(AtomicBool::new(false)); let mut streamers = Vec::new(); let syncer = if !args.flag_read_only { let (syncer_channel, syncer_join) = dir::start_syncer(dir.clone()).unwrap(); let l = db.lock(); - let cameras = l.cameras_by_id().len(); + let streams = l.streams_by_id().len(); let env = streamer::Environment{ db: &db, dir: &dir, @@ -117,12 +120,13 @@ pub fn run() -> Result<(), Error> { opener: &*stream::FFMPEG, shutdown: &shutdown_streamers, }; - for (i, (id, camera)) in l.cameras_by_id().iter().enumerate() { - let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / cameras as i64; + for (i, (id, stream)) in l.streams_by_id().iter().enumerate() { + let camera = l.cameras_by_id().get(&stream.camera_id).unwrap(); + let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / streams as i64; let mut streamer = streamer::Streamer::new(&env, syncer_channel.clone(), *id, camera, - rotate_offset_sec, + stream, rotate_offset_sec, streamer::ROTATE_INTERVAL_SEC); - let name = format!("stream-{}", streamer.short_name()); + let name = format!("s-{}", streamer.short_name()); streamers.push(thread::Builder::new().name(name).spawn(move|| { streamer.run(); }).expect("can't create thread")); diff --git a/src/db.rs b/src/db.rs index 79ed59e..8f7e06d 100644 --- a/src/db.rs +++ b/src/db.rs @@ -65,6 +65,7 @@ use std::cell::RefCell; use std::cmp; use std::io::Write; use std::ops::Range; +use std::mem; use std::str; use std::string::String; use std::sync::Arc; @@ -73,7 +74,7 @@ use time; use uuid::Uuid; /// Expected schema version. See `guide/schema.md` for more information. -pub const EXPECTED_VERSION: i32 = 1; +pub const EXPECTED_VERSION: i32 = 2; const GET_RECORDING_PLAYBACK_SQL: &'static str = r#" select @@ -110,10 +111,10 @@ const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#" "#; const INSERT_RECORDING_SQL: &'static str = r#" - insert into recording (composite_id, camera_id, run_offset, flags, sample_file_bytes, + insert into recording (composite_id, stream_id, run_offset, flags, sample_file_bytes, start_time_90k, duration_90k, local_time_delta_90k, video_samples, video_sync_samples, video_sample_entry_id) - values (:composite_id, :camera_id, :run_offset, :flags, :sample_file_bytes, + values (:composite_id, :stream_id, :run_offset, :flags, :sample_file_bytes, :start_time_90k, :duration_90k, :local_time_delta_90k, :video_samples, :video_sync_samples, :video_sample_entry_id) "#; @@ -125,7 +126,7 @@ const INSERT_RECORDING_PLAYBACK_SQL: &'static str = r#" "#; const UPDATE_NEXT_RECORDING_ID_SQL: &'static str = - "update camera set next_recording_id = :next_recording_id where id = :camera_id"; + "update stream set next_recording_id = :next_recording_id where id = :stream_id"; const LIST_OLDEST_SAMPLE_FILES_SQL: &'static str = r#" select @@ -152,24 +153,24 @@ const DELETE_RECORDING_PLAYBACK_SQL: &'static str = r#" delete from recording_playback where composite_id = :composite_id "#; -const CAMERA_MIN_START_SQL: &'static str = r#" +const STREAM_MIN_START_SQL: &'static str = r#" select start_time_90k from recording where - camera_id = :camera_id + stream_id = :stream_id order by start_time_90k limit 1 "#; -const CAMERA_MAX_START_SQL: &'static str = r#" +const STREAM_MAX_START_SQL: &'static str = r#" select start_time_90k, duration_90k from recording where - camera_id = :camera_id + stream_id = :stream_id order by start_time_90k desc; "#; @@ -236,7 +237,7 @@ pub struct ListRecordingsRow { pub start: recording::Time, pub video_sample_entry: Arc, - pub camera_id: i32, + pub stream_id: i32, pub id: i32, /// This is a recording::Duration, but a single recording's duration fits into an i32. @@ -257,7 +258,7 @@ pub struct ListAggregatedRecordingsRow { pub video_sync_samples: i64, pub sample_file_bytes: i64, pub video_sample_entry: Arc, - pub camera_id: i32, + pub stream_id: i32, pub flags: i32, pub run_start_id: i32, } @@ -286,7 +287,7 @@ pub enum RecordingFlags { /// A recording to pass to `insert_recording`. #[derive(Debug)] pub struct RecordingToInsert { - pub camera_id: i32, + pub stream_id: i32, pub run_offset: i32, pub flags: i32, pub sample_file_bytes: i32, @@ -304,7 +305,7 @@ pub struct RecordingToInsert { #[derive(Debug)] pub struct ListOldestSampleFilesRow { pub uuid: Uuid, - pub camera_id: i32, + pub stream_id: i32, pub recording_id: i32, pub time: Range, pub sample_file_bytes: i32, @@ -312,11 +313,11 @@ pub struct ListOldestSampleFilesRow { /// A calendar day in `YYYY-mm-dd` format. #[derive(Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd)] -pub struct CameraDayKey([u8; 10]); +pub struct StreamDayKey([u8; 10]); -impl CameraDayKey { +impl StreamDayKey { fn new(tm: time::Tm) -> Result { - let mut s = CameraDayKey([0u8; 10]); + let mut s = StreamDayKey([0u8; 10]); write!(&mut s.0[..], "{}", tm.strftime("%Y-%m-%d")?)?; Ok(s) } @@ -335,13 +336,13 @@ impl CameraDayKey { } } -impl AsRef for CameraDayKey { +impl AsRef for StreamDayKey { fn as_ref(&self) -> &str { str::from_utf8(&self.0[..]).expect("days are always UTF-8") } } /// In-memory state about a particular camera on a particular day. #[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub struct CameraDayValue { +pub struct StreamDayValue { /// The number of recordings that overlap with this day. Note that `adjust_day` automatically /// prunes days with 0 recordings. pub recordings: i64, @@ -362,11 +363,53 @@ pub struct Camera { pub host: String, pub username: String, pub password: String, - pub main_rtsp_path: String, - pub sub_rtsp_path: String, + pub streams: [Option; 2], +} + +#[derive(Copy, Clone, Debug)] +pub enum StreamType { MAIN, SUB } + +impl StreamType { + pub fn from_index(i: usize) -> Option { + match i { + 0 => Some(StreamType::MAIN), + 1 => Some(StreamType::SUB), + _ => None, + } + } + + pub fn index(self) -> usize { + match self { + StreamType::MAIN => 0, + StreamType::SUB => 1, + } + } + + pub fn as_str(self) -> &'static str { + match self { + StreamType::MAIN => "main", + StreamType::SUB => "sub", + } + } + + pub fn parse(type_: &str) -> Option { + match type_ { + "main" => Some(StreamType::MAIN), + "sub" => Some(StreamType::SUB), + _ => None, + } + } +} + +#[derive(Debug)] +pub struct Stream { + pub id: i32, + pub camera_id: i32, + pub type_: StreamType, + pub rtsp_path: String, pub retain_bytes: i64, - /// The time range of recorded data associated with this camera (minimum start time and maximum + /// The time range of recorded data associated with this stream (minimum start time and maximum /// end time). `None` iff there are no recordings for this camera. pub range: Option>, pub sample_file_bytes: i64, @@ -376,8 +419,7 @@ pub struct Camera { pub duration: recording::Duration, /// Mapping of calendar day (in the server's time zone) to a summary of recordings on that day. - pub days: BTreeMap, - + pub days: BTreeMap, next_recording_id: i32, } @@ -389,14 +431,13 @@ pub struct CameraChange { pub host: String, pub username: String, pub password: String, - pub main_rtsp_path: String, - pub sub_rtsp_path: String, + pub rtsp_paths: [String; 2], } /// Adds non-zero `delta` to the day represented by `day` in the map `m`. /// Inserts a map entry if absent; removes the entry if it has 0 entries on exit. -fn adjust_day(day: CameraDayKey, delta: CameraDayValue, - m: &mut BTreeMap) { +fn adjust_day(day: StreamDayKey, delta: StreamDayValue, + m: &mut BTreeMap) { use ::std::collections::btree_map::Entry; match m.entry(day) { Entry::Vacant(e) => { e.insert(delta); }, @@ -421,10 +462,10 @@ fn adjust_day(day: CameraDayKey, delta: CameraDayValue, /// This function swallows/logs date formatting errors because they shouldn't happen and there's /// not much that can be done about them. (The database operation has already gone through.) fn adjust_days(r: Range, sign: i64, - m: &mut BTreeMap) { + m: &mut BTreeMap) { // Find first day key. let mut my_tm = time::at(time::Timespec{sec: r.start.unix_seconds(), nsec: 0}); - let day = match CameraDayKey::new(my_tm) { + let day = match StreamDayKey::new(my_tm) { Ok(d) => d, Err(ref e) => { error!("Unable to fill first day key from {:?}: {}; will ignore.", my_tm, e); @@ -443,7 +484,7 @@ fn adjust_days(r: Range, sign: i64, let boundary_90k = boundary.sec * TIME_UNITS_PER_SEC; // Adjust the first day. - let first_day_delta = CameraDayValue{ + let first_day_delta = StreamDayValue{ recordings: sign, duration: recording::Duration(sign * (cmp::min(r.end.0, boundary_90k) - r.start.0)), }; @@ -456,21 +497,21 @@ fn adjust_days(r: Range, sign: i64, // Fill day with the second day. This requires a normalized representation so recalculate. // (The C mktime(3) already normalized for us once, but .to_timespec() discarded that result.) let my_tm = time::at(boundary); - let day = match CameraDayKey::new(my_tm) { + let day = match StreamDayKey::new(my_tm) { Ok(d) => d, Err(ref e) => { error!("Unable to fill second day key from {:?}: {}; will ignore.", my_tm, e); return; } }; - let second_day_delta = CameraDayValue{ + let second_day_delta = StreamDayValue{ recordings: sign, duration: recording::Duration(sign * (r.end.0 - boundary_90k)), }; adjust_day(day, second_day_delta, m); } -impl Camera { +impl Stream { /// Adds a single recording with the given properties to the in-memory state. fn add_recording(&mut self, r: Range, sample_file_bytes: i32) { self.range = Some(match self.range { @@ -484,9 +525,10 @@ impl Camera { } /// Initializes the recordings associated with the given camera. -fn init_recordings(conn: &mut rusqlite::Connection, camera_id: i32, camera: &mut Camera) +fn init_recordings(conn: &mut rusqlite::Connection, stream_id: i32, camera: &Camera, + stream: &mut Stream) -> Result<(), Error> { - info!("Loading recordings for camera {}", camera.short_name); + info!("Loading recordings for camera {} stream {:?}", camera.short_name, stream.type_); let mut stmt = conn.prepare(r#" select recording.start_time_90k, @@ -495,19 +537,19 @@ fn init_recordings(conn: &mut rusqlite::Connection, camera_id: i32, camera: &mut from recording where - camera_id = :camera_id + stream_id = :stream_id "#)?; - let mut rows = stmt.query_named(&[(":camera_id", &camera_id)])?; + let mut rows = stmt.query_named(&[(":stream_id", &stream_id)])?; let mut i = 0; while let Some(row) = rows.next() { let row = row?; let start = recording::Time(row.get_checked(0)?); let duration = recording::Duration(row.get_checked(1)?); let bytes = row.get_checked(2)?; - camera.add_recording(start .. start + duration, bytes); + stream.add_recording(start .. start + duration, bytes); i += 1; } - info!("Loaded {} recordings for camera {}", i, camera.short_name); + info!("Loaded {} recordings for camera {} stream {:?}", i, camera.short_name, stream.type_); Ok(()) } @@ -523,6 +565,7 @@ pub struct LockedDatabase { #[derive(Debug)] struct State { cameras_by_id: BTreeMap, + streams_by_id: BTreeMap, cameras_by_uuid: BTreeMap, video_sample_entries: BTreeMap>, list_recordings_by_time_sql: String, @@ -533,7 +576,7 @@ struct State { /// be applied to the in-memory state on successful commit. pub struct Transaction<'a> { state: &'a mut State, - mods_by_camera: fnv::FnvHashMap, + mods_by_stream: fnv::FnvHashMap, tx: rusqlite::Transaction<'a>, /// True if due to an earlier error the transaction must be rolled back rather than committed. @@ -548,8 +591,8 @@ pub struct Transaction<'a> { pub bypass_reservation_for_testing: bool, } -/// A modification to be done to a `Camera` after a `Transaction` is committed. -struct CameraModification { +/// A modification to be done to a `Stream` after a `Transaction` is committed. +struct StreamModification { /// Add this to `camera.duration`. Thus, positive values indicate a net addition; /// negative values indicate a net subtraction. duration: recording::Duration, @@ -557,10 +600,10 @@ struct CameraModification { /// Add this to `camera.sample_file_bytes`. sample_file_bytes: i64, - /// Add this to `camera.days`. - days: BTreeMap, + /// Add this to `stream.days`. + days: BTreeMap, - /// Reset the Camera range to this value. This should be populated immediately prior to the + /// Reset the Stream range to this value. This should be populated immediately prior to the /// commit. range: Option>, @@ -571,8 +614,8 @@ struct CameraModification { new_retain_bytes: Option, } -fn composite_id(camera_id: i32, recording_id: i32) -> i64 { - (camera_id as i64) << 32 | recording_id as i64 +fn composite_id(stream_id: i32, recording_id: i32) -> i64 { + (stream_id as i64) << 32 | recording_id as i64 } impl<'a> Transaction<'a> { @@ -601,23 +644,23 @@ impl<'a> Transaction<'a> { self.check_must_rollback()?; self.must_rollback = true; for row in rows { - let composite_id = &composite_id(row.camera_id, row.recording_id); + let composite_id = &composite_id(row.stream_id, row.recording_id); let changes = del1.execute_named(&[(":composite_id", composite_id)])?; if changes != 1 { return Err(Error::new(format!("no such recording {}/{} (uuid {})", - row.camera_id, row.recording_id, row.uuid))); + row.stream_id, row.recording_id, row.uuid))); } let changes = del2.execute_named(&[(":composite_id", composite_id)])?; if changes != 1 { return Err(Error::new(format!("no such recording_playback {}/{} (uuid {})", - row.camera_id, row.recording_id, row.uuid))); + row.stream_id, row.recording_id, row.uuid))); } let uuid = &row.uuid.as_bytes()[..]; insert.execute_named(&[ (":uuid", &uuid), (":state", &(ReservationState::Deleting as i64)) ])?; - let m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, row.camera_id); + let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, row.stream_id); m.duration -= row.time.end - row.time.start; m.sample_file_bytes -= row.sample_file_bytes as i64; adjust_days(row.time.clone(), -1, &mut m.days); @@ -656,9 +699,10 @@ impl<'a> Transaction<'a> { } // Unreserve the sample file uuid and insert the recording row. - let cam = match self.state.cameras_by_id.get_mut(&r.camera_id) { - None => return Err(Error::new(format!("no such camera id {}", r.camera_id))), - Some(c) => c, + // TODO: var used? + let stream = match self.state.streams_by_id.get_mut(&r.stream_id) { + None => return Err(Error::new(format!("no such stream id {}", r.stream_id))), + Some(s) => s, }; let uuid = &r.sample_file_uuid.as_bytes()[..]; { @@ -669,15 +713,15 @@ impl<'a> Transaction<'a> { } } self.must_rollback = true; - let m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, r.camera_id); + let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, r.stream_id); let recording_id; { - recording_id = m.new_next_recording_id.unwrap_or(cam.next_recording_id); - let composite_id = composite_id(r.camera_id, recording_id); + recording_id = m.new_next_recording_id.unwrap_or(stream.next_recording_id); + let composite_id = composite_id(r.stream_id, recording_id); let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_SQL)?; stmt.execute_named(&[ (":composite_id", &composite_id), - (":camera_id", &(r.camera_id as i64)), + (":stream_id", &(r.stream_id as i64)), (":run_offset", &r.run_offset), (":flags", &r.flags), (":sample_file_bytes", &r.sample_file_bytes), @@ -699,7 +743,7 @@ impl<'a> Transaction<'a> { ])?; let mut stmt = self.tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?; stmt.execute_named(&[ - (":camera_id", &(r.camera_id as i64)), + (":stream_id", &(r.stream_id as i64)), (":next_recording_id", &m.new_next_recording_id), ])?; } @@ -710,22 +754,22 @@ impl<'a> Transaction<'a> { Ok(recording_id) } - /// Updates the `retain_bytes` for the given camera to the specified limit. + /// Updates the `retain_bytes` for the given stream to the specified limit. /// Note this just resets the limit in the database; it's the caller's responsibility to ensure /// current usage is under the new limit if desired. - pub fn update_retention(&mut self, camera_id: i32, new_limit: i64) -> Result<(), Error> { + pub fn update_retention(&mut self, stream_id: i32, new_limit: i64) -> Result<(), Error> { if new_limit < 0 { - return Err(Error::new(format!("can't set limit for camera {} to {}; must be >= 0", - camera_id, new_limit))); + return Err(Error::new(format!("can't set limit for stream {} to {}; must be >= 0", + stream_id, new_limit))); } self.check_must_rollback()?; let mut stmt = - self.tx.prepare_cached("update camera set retain_bytes = :retain where id = :id")?; - let changes = stmt.execute_named(&[(":retain", &new_limit), (":id", &camera_id)])?; + self.tx.prepare_cached("update stream set retain_bytes = :retain where id = :id")?; + let changes = stmt.execute_named(&[(":retain", &new_limit), (":id", &stream_id)])?; if changes != 1 { - return Err(Error::new(format!("no such camera {}", camera_id))); + return Err(Error::new(format!("no such stream {}", stream_id))); } - let m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, camera_id); + let m = Transaction::get_mods_by_stream(&mut self.mods_by_stream, stream_id); m.new_retain_bytes = Some(new_limit); Ok(()) } @@ -735,20 +779,20 @@ impl<'a> Transaction<'a> { self.check_must_rollback()?; self.precommit()?; self.tx.commit()?; - for (&camera_id, m) in &self.mods_by_camera { - let camera = self.state.cameras_by_id.get_mut(&camera_id) - .expect("modified camera must exist"); - camera.duration += m.duration; - camera.sample_file_bytes += m.sample_file_bytes; + for (&stream_id, m) in &self.mods_by_stream { + let stream = self.state.streams_by_id.get_mut(&stream_id) + .expect("modified stream must exist"); + stream.duration += m.duration; + stream.sample_file_bytes += m.sample_file_bytes; for (k, v) in &m.days { - adjust_day(*k, *v, &mut camera.days); + adjust_day(*k, *v, &mut stream.days); } - camera.range = m.range.clone(); + stream.range = m.range.clone(); if let Some(id) = m.new_next_recording_id { - camera.next_recording_id = id; + stream.next_recording_id = id; } if let Some(b) = m.new_retain_bytes { - camera.retain_bytes = b; + stream.retain_bytes = b; } } Ok(()) @@ -762,11 +806,11 @@ impl<'a> Transaction<'a> { Ok(()) } - /// Looks up an existing entry in `mods` for a given camera or makes+inserts an identity entry. - fn get_mods_by_camera(mods: &mut fnv::FnvHashMap, camera_id: i32) - -> &mut CameraModification { - mods.entry(camera_id).or_insert_with(|| { - CameraModification{ + /// Looks up an existing entry in `mods` for a given stream or makes+inserts an identity entry. + fn get_mods_by_stream(mods: &mut fnv::FnvHashMap, stream_id: i32) + -> &mut StreamModification { + mods.entry(stream_id).or_insert_with(|| { + StreamModification{ duration: recording::Duration(0), sample_file_bytes: 0, range: None, @@ -777,14 +821,14 @@ impl<'a> Transaction<'a> { }) } - /// Fills the `range` of each `CameraModification`. This is done prior to commit so that if the + /// Fills the `range` of each `StreamModification`. This is done prior to commit so that if the /// commit succeeds, there's no possibility that the correct state can't be retrieved. fn precommit(&mut self) -> Result<(), Error> { // Recompute start and end times for each camera. - for (&camera_id, m) in &mut self.mods_by_camera { + for (&stream_id, m) in &mut self.mods_by_stream { // The minimum is straightforward, taking advantage of the start_time_90k index. - let mut stmt = self.tx.prepare_cached(CAMERA_MIN_START_SQL)?; - let mut rows = stmt.query_named(&[(":camera_id", &camera_id)])?; + let mut stmt = self.tx.prepare_cached(STREAM_MIN_START_SQL)?; + let mut rows = stmt.query_named(&[(":stream_id", &stream_id)])?; let min_start = match rows.next() { Some(row) => recording::Time(row?.get_checked(0)?), None => continue, // no data; leave m.range alone. @@ -794,8 +838,8 @@ impl<'a> Transaction<'a> { // straightforward because recordings could overlap. All recordings starting in the // last MAX_RECORDING_DURATION must be examined in order to take advantage of the // start_time_90k index. - let mut stmt = self.tx.prepare_cached(CAMERA_MAX_START_SQL)?; - let mut rows = stmt.query_named(&[(":camera_id", &camera_id)])?; + let mut stmt = self.tx.prepare_cached(STREAM_MAX_START_SQL)?; + let mut rows = stmt.query_named(&[(":stream_id", &stream_id)])?; let mut maxes_opt = None; while let Some(row) = rows.next() { let row = row?; @@ -814,8 +858,8 @@ impl<'a> Transaction<'a> { let max_end = match maxes_opt { Some(Range{end: e, ..}) => e, None => { - return Err(Error::new(format!("missing max for camera {} which had min {}", - camera_id, min_start))); + return Err(Error::new(format!("missing max for stream {} which had min {}", + stream_id, min_start))); } }; m.range = Some(min_start .. max_end); @@ -824,9 +868,54 @@ impl<'a> Transaction<'a> { } } +struct StreamInserter<'tx> { + tx: &'tx rusqlite::Transaction<'tx>, + stmt: rusqlite::Statement<'tx>, + new_streams: BTreeMap, +} + +impl<'tx> StreamInserter<'tx> { + fn new(tx: &'tx rusqlite::Transaction) -> Result { + let stmt = tx.prepare(r#" + insert into stream (camera_id, type, rtsp_path, retain_bytes, next_recording_id) + values (:camera_id, :type, :rtsp_path, 0, 1) + "#)?; + Ok(StreamInserter { + tx, + stmt, + new_streams: BTreeMap::new(), + }) + } + + fn add(&mut self, camera_id: i32, type_: StreamType, rtsp_path: String) -> Result<(), Error> { + self.stmt.execute_named(&[ + (":camera_id", &camera_id), + (":type", &type_.as_str()), + (":rtsp_path", &rtsp_path) + ])?; + let id = self.tx.last_insert_rowid() as i32; + self.new_streams.insert(id, Stream { + id, + type_, + camera_id, + rtsp_path, + retain_bytes: 0, + range: None, + sample_file_bytes: 0, + duration: recording::Duration(0), + days: BTreeMap::new(), + next_recording_id: 1, + }); + Ok(()) + } + + fn streams(self) -> BTreeMap { self.new_streams } +} + impl LockedDatabase { /// Returns an immutable view of the cameras by id. pub fn cameras_by_id(&self) -> &BTreeMap { &self.state.cameras_by_id } + pub fn streams_by_id(&self) -> &BTreeMap { &self.state.streams_by_id } /// Returns an immutable view of the video sample entries. pub fn video_sample_entries(&self) -> btree_map::Values> { @@ -840,7 +929,7 @@ impl LockedDatabase { pub fn tx(&mut self) -> Result { Ok(Transaction{ state: &mut self.state, - mods_by_camera: fnv::FnvHashMap::default(), + mods_by_stream: fnv::FnvHashMap::default(), tx: self.conn.transaction()?, must_rollback: false, bypass_reservation_for_testing: false, @@ -857,30 +946,30 @@ impl LockedDatabase { /// Lists the specified recordings in ascending order by start time, passing them to a supplied /// function. Given that the function is called with the database lock held, it should be quick. - pub fn list_recordings_by_time(&self, camera_id: i32, desired_time: Range, + pub fn list_recordings_by_time(&self, stream_id: i32, desired_time: Range, f: F) -> Result<(), Error> where F: FnMut(ListRecordingsRow) -> Result<(), Error> { let mut stmt = self.conn.prepare_cached(&self.state.list_recordings_by_time_sql)?; let rows = stmt.query_named(&[ - (":camera_id", &camera_id), + (":stream_id", &stream_id), (":start_time_90k", &desired_time.start.0), (":end_time_90k", &desired_time.end.0)])?; - self.list_recordings_inner(camera_id, rows, f) + self.list_recordings_inner(stream_id, rows, f) } /// Lists the specified recordigs in ascending order by id. - pub fn list_recordings_by_id(&self, camera_id: i32, desired_ids: Range, f: F) + pub fn list_recordings_by_id(&self, stream_id: i32, desired_ids: Range, f: F) -> Result<(), Error> where F: FnMut(ListRecordingsRow) -> Result<(), Error> { let mut stmt = self.conn.prepare_cached(LIST_RECORDINGS_BY_ID_SQL)?; let rows = stmt.query_named(&[ - (":start", &composite_id(camera_id, desired_ids.start)), - (":end", &composite_id(camera_id, desired_ids.end)), + (":start", &composite_id(stream_id, desired_ids.start)), + (":end", &composite_id(stream_id, desired_ids.end)), ])?; - self.list_recordings_inner(camera_id, rows, f) + self.list_recordings_inner(stream_id, rows, f) } - fn list_recordings_inner(&self, camera_id: i32, mut rows: rusqlite::Rows, mut f: F) + fn list_recordings_inner(&self, stream_id: i32, mut rows: rusqlite::Rows, mut f: F) -> Result<(), Error> where F: FnMut(ListRecordingsRow) -> Result<(), Error> { while let Some(row) = rows.next() { @@ -892,12 +981,12 @@ impl LockedDatabase { None => { return Err(Error::new(format!( "recording {}/{} references nonexistent video_sample_entry {}", - camera_id, id, vse_id))); + stream_id, id, vse_id))); }, }; let out = ListRecordingsRow{ - camera_id: camera_id, - id: id, + stream_id, + id, run_offset: row.get_checked(1)?, flags: row.get_checked(2)?, start: recording::Time(row.get_checked(3)?), @@ -915,7 +1004,7 @@ impl LockedDatabase { /// Calls `list_recordings_by_time` and aggregates consecutive recordings. /// Rows are given to the callback in arbitrary order. Callers which care about ordering /// should do their own sorting. - pub fn list_aggregated_recordings(&self, camera_id: i32, + pub fn list_aggregated_recordings(&self, stream_id: i32, desired_time: Range, forced_split: recording::Duration, mut f: F) -> Result<(), Error> @@ -934,7 +1023,7 @@ impl LockedDatabase { // their timestamps overlap. Tracking all active runs prevents that interleaving from // causing problems.) let mut aggs: BTreeMap = BTreeMap::new(); - self.list_recordings_by_time(camera_id, desired_time, |row| { + self.list_recordings_by_time(stream_id, desired_time, |row| { let run_start_id = row.id - row.run_offset; let needs_flush = if let Some(a) = aggs.get(&run_start_id) { let new_dur = a.time.end - a.time.start + @@ -951,8 +1040,8 @@ impl LockedDatabase { let need_insert = if let Some(ref mut a) = aggs.get_mut(&run_start_id) { if a.time.end != row.start { return Err(Error::new(format!( - "camera {} recording {} ends at {}; {} starts at {}; expected same", - camera_id, a.ids.end - 1, a.time.end, row.id, row.start))); + "stream {} recording {} ends at {}; {} starts at {}; expected same", + stream_id, a.ids.end - 1, a.time.end, row.id, row.start))); } a.time.end.0 += row.duration_90k as i64; a.ids.end = row.id + 1; @@ -971,7 +1060,7 @@ impl LockedDatabase { video_sync_samples: row.video_sync_samples as i64, sample_file_bytes: row.sample_file_bytes as i64, video_sample_entry: row.video_sample_entry, - camera_id: camera_id, + stream_id, run_start_id: row.id - row.run_offset, flags: row.flags, }); @@ -987,16 +1076,16 @@ impl LockedDatabase { /// Calls `f` with a single `recording_playback` row. /// Note the lock is held for the duration of `f`. /// This uses a LRU cache to reduce the number of retrievals from the database. - pub fn with_recording_playback(&self, camera_id: i32, recording_id: i32, f: F) + pub fn with_recording_playback(&self, stream_id: i32, recording_id: i32, f: F) -> Result where F: FnOnce(&RecordingPlayback) -> Result { - let composite_id = composite_id(camera_id, recording_id); + let composite_id = composite_id(stream_id, recording_id); let mut cache = self.state.playback_cache.borrow_mut(); if let Some(r) = cache.get_mut(&composite_id) { - trace!("cache hit for recording {}/{}", camera_id, recording_id); + trace!("cache hit for recording {}/{}", stream_id, recording_id); return f(&RecordingPlayback::new(r)); } - trace!("cache miss for recording {}/{}", camera_id, recording_id); + trace!("cache miss for recording {}/{}", stream_id, recording_id); let mut stmt = self.conn.prepare_cached(GET_RECORDING_PLAYBACK_SQL)?; let mut rows = stmt.query_named(&[(":composite_id", &composite_id)])?; if let Some(row) = rows.next() { @@ -1011,7 +1100,7 @@ impl LockedDatabase { cache.insert(composite_id, data); return result; } - Err(Error::new(format!("no such recording {}/{}", camera_id, recording_id))) + Err(Error::new(format!("no such recording {}/{}", stream_id, recording_id))) } /// Lists all reserved sample files. @@ -1029,12 +1118,12 @@ impl LockedDatabase { /// Lists the oldest sample files (to delete to free room). /// `f` should return true as long as further rows are desired. - pub fn list_oldest_sample_files(&self, camera_id: i32, mut f: F) -> Result<(), Error> + pub fn list_oldest_sample_files(&self, stream_id: i32, mut f: F) -> Result<(), Error> where F: FnMut(ListOldestSampleFilesRow) -> bool { let mut stmt = self.conn.prepare_cached(LIST_OLDEST_SAMPLE_FILES_SQL)?; let mut rows = stmt.query_named(&[ - (":start", &composite_id(camera_id, 0)), - (":end", &composite_id(camera_id + 1, 0)), + (":start", &composite_id(stream_id, 0)), + (":end", &composite_id(stream_id + 1, 0)), ])?; while let Some(row) = rows.next() { let row = row?; @@ -1044,7 +1133,7 @@ impl LockedDatabase { let uuid: FromSqlUuid = row.get_checked(1)?; let should_continue = f(ListOldestSampleFilesRow{ recording_id: composite_id as i32, - camera_id: (composite_id >> 32) as i32, + stream_id: (composite_id >> 32) as i32, uuid: uuid.0, time: start .. start + duration, sample_file_bytes: row.get_checked(4)?, @@ -1106,17 +1195,13 @@ impl LockedDatabase { info!("Loading cameras"); let mut stmt = self.conn.prepare(r#" select - camera.id, - camera.uuid, - camera.short_name, - camera.description, - camera.host, - camera.username, - camera.password, - camera.main_rtsp_path, - camera.sub_rtsp_path, - camera.retain_bytes, - next_recording_id + id, + uuid, + short_name, + description, + host, + username, + password from camera; "#)?; @@ -1125,7 +1210,7 @@ impl LockedDatabase { let row = row?; let id = row.get_checked(0)?; let uuid: FromSqlUuid = row.get_checked(1)?; - self.state.cameras_by_id.insert(id, Camera{ + self.state.cameras_by_id.insert(id, Camera { id: id, uuid: uuid.0, short_name: row.get_checked(2)?, @@ -1133,14 +1218,7 @@ impl LockedDatabase { host: row.get_checked(4)?, username: row.get_checked(5)?, password: row.get_checked(6)?, - main_rtsp_path: row.get_checked(7)?, - sub_rtsp_path: row.get_checked(8)?, - retain_bytes: row.get_checked(9)?, - range: None, - sample_file_bytes: 0, - duration: recording::Duration(0), - days: BTreeMap::new(), - next_recording_id: row.get_checked(10)?, + streams: Default::default(), }); self.state.cameras_by_uuid.insert(uuid.0, id); } @@ -1148,6 +1226,49 @@ impl LockedDatabase { Ok(()) } + /// Initializes the streams, but not their matching recordings. + /// To be called during construction. + fn init_streams(&mut self) -> Result<(), Error> { + info!("Loading streams"); + let mut stmt = self.conn.prepare(r#" + select + id, + type, + camera_id, + rtsp_path, + retain_bytes, + next_recording_id + from + stream; + "#)?; + let mut rows = stmt.query(&[])?; + while let Some(row) = rows.next() { + let row = row?; + let id = row.get_checked(0)?; + let type_: String = row.get_checked(1)?; + let type_ = StreamType::parse(&type_).ok_or_else( + || Error::new(format!("no such stream type {}", type_)))?; + let camera_id = row.get_checked(2)?; + self.state.streams_by_id.insert(id, Stream { + id, + type_, + camera_id, + rtsp_path: row.get_checked(3)?, + retain_bytes: row.get_checked(4)?, + range: None, + sample_file_bytes: 0, + duration: recording::Duration(0), + days: BTreeMap::new(), + next_recording_id: row.get_checked(5)?, + }); + let c = self.state.cameras_by_id.get_mut(&camera_id) + .ok_or_else(|| Error::new("missing camera".to_owned()))?; + c.streams[type_.index()] = Some(id); + } + info!("Loaded {} streams", self.state.streams_by_id.len()); + Ok(()) + } + /// Inserts the specified video sample entry if absent. /// On success, returns the id of a new or existing row. pub fn insert_video_sample_entry(&mut self, w: u16, h: u16, data: Vec, @@ -1192,97 +1313,162 @@ impl LockedDatabase { } /// Adds a camera. - pub fn add_camera(&mut self, camera: CameraChange) -> Result { + pub fn add_camera(&mut self, mut camera: CameraChange) -> Result { let uuid = Uuid::new_v4(); let uuid_bytes = &uuid.as_bytes()[..]; - let mut stmt = self.conn.prepare_cached(r#" - insert into camera (uuid, short_name, description, host, username, password, - main_rtsp_path, sub_rtsp_path, retain_bytes, next_recording_id) - values (:uuid, :short_name, :description, :host, :username, :password, - :main_rtsp_path, :sub_rtsp_path, 0, 1) - "#)?; - stmt.execute_named(&[ - (":uuid", &uuid_bytes), - (":short_name", &camera.short_name), - (":description", &camera.description), - (":host", &camera.host), - (":username", &camera.username), - (":password", &camera.password), - (":main_rtsp_path", &camera.main_rtsp_path), - (":sub_rtsp_path", &camera.sub_rtsp_path), - ])?; - let id = self.conn.last_insert_rowid() as i32; - self.state.cameras_by_id.insert(id, Camera{ - id: id, - uuid: uuid, + let tx = self.conn.transaction()?; + let mut new_streams; + let camera_id; + { + let mut stmt = tx.prepare_cached(r#" + insert into camera (uuid, short_name, description, host, username, password) + values (:uuid, :short_name, :description, :host, :username, :password) + "#)?; + stmt.execute_named(&[ + (":uuid", &uuid_bytes), + (":short_name", &camera.short_name), + (":description", &camera.description), + (":host", &camera.host), + (":username", &camera.username), + (":password", &camera.password), + ])?; + camera_id = tx.last_insert_rowid() as i32; + let mut inserter = StreamInserter::new(&tx)?; + for (i, ref mut rtsp_path) in camera.rtsp_paths.iter_mut().enumerate() { + if rtsp_path.is_empty() { continue; } + inserter.add(camera_id, StreamType::from_index(i).unwrap(), + mem::replace(rtsp_path, String::new()))?; + } + new_streams = inserter.streams(); + } + tx.commit()?; + let mut streams = [None, None]; + for (&id, s) in &new_streams { + streams[s.type_.index()] = Some(id); + } + self.state.streams_by_id.append(&mut new_streams); + self.state.cameras_by_id.insert(camera_id, Camera { + id: camera_id, + uuid, short_name: camera.short_name, description: camera.description, host: camera.host, username: camera.username, password: camera.password, - main_rtsp_path: camera.main_rtsp_path, - sub_rtsp_path: camera.sub_rtsp_path, - retain_bytes: 0, - range: None, - sample_file_bytes: 0, - duration: recording::Duration(0), - days: BTreeMap::new(), - next_recording_id: 1, + streams, }); - self.state.cameras_by_uuid.insert(uuid, id); - Ok(id) + self.state.cameras_by_uuid.insert(uuid, camera_id); + Ok(camera_id) } /// Updates a camera. - pub fn update_camera(&mut self, id: i32, camera: CameraChange) -> Result<(), Error> { - let mut stmt = self.conn.prepare_cached(r#" - update camera set - short_name = :short_name, - description = :description, - host = :host, - username = :username, - password = :password, - main_rtsp_path = :main_rtsp_path, - sub_rtsp_path = :sub_rtsp_path - where - id = :id - "#)?; - stmt.execute_named(&[ - (":id", &id), - (":short_name", &camera.short_name), - (":description", &camera.description), - (":host", &camera.host), - (":username", &camera.username), - (":password", &camera.password), - (":main_rtsp_path", &camera.main_rtsp_path), - (":sub_rtsp_path", &camera.sub_rtsp_path), - ])?; - let c = self.state.cameras_by_id.get_mut(&id).unwrap(); + pub fn update_camera(&mut self, camera_id: i32, mut camera: CameraChange) -> Result<(), Error> { + let tx = self.conn.transaction()?; + let mut new_streams; + let mut stream_rtsp_changes = BTreeMap::new(); + { + let mut stream_ids = [None; 2]; + let mut stream_update_stmt = tx.prepare_cached(r#" + update stream set + rtsp_path = :rtsp_path + where + id = :id + "#)?; + for (&stream_id, stream) in &self.state.streams_by_id { + if stream.camera_id != camera_id { + continue; + } + stream_ids[stream.type_.index()] = Some(stream_id); + let p = mem::replace(&mut camera.rtsp_paths[stream.type_.index()], String::new()); + let rows = stream_update_stmt.execute_named(&[ + (":id", &stream_id), + (":rtsp_path", &p), + ])?; + if rows != 1 { + return Err(Error::new(format!("Stream {} missing from database", + stream_id))); + } + stream_rtsp_changes.insert(stream_id, p); + } + let mut inserter = StreamInserter::new(&tx)?; + for (index, id) in stream_ids.iter().enumerate() { + if id.is_none() && !camera.rtsp_paths[index].is_empty() { + inserter.add(camera_id, StreamType::from_index(index).unwrap(), + mem::replace(&mut camera.rtsp_paths[index], String::new()))?; + } + } + new_streams = inserter.streams(); + let mut stmt = tx.prepare_cached(r#" + update camera set + short_name = :short_name, + description = :description, + host = :host, + username = :username, + password = :password + where + id = :id + "#)?; + let rows = stmt.execute_named(&[ + (":id", &camera_id), + (":short_name", &camera.short_name), + (":description", &camera.description), + (":host", &camera.host), + (":username", &camera.username), + (":password", &camera.password), + ])?; + if rows != 1 { + return Err(Error::new(format!("Camera {} missing from database", camera_id))); + } + } + tx.commit()?; + let c = self.state.cameras_by_id.get_mut(&camera_id).unwrap(); c.short_name = camera.short_name; c.description = camera.description; c.host = camera.host; c.username = camera.username; c.password = camera.password; - c.main_rtsp_path = camera.main_rtsp_path; - c.sub_rtsp_path = camera.sub_rtsp_path; + for (&id, s) in &new_streams { + c.streams[s.type_.index()] = Some(id); + } + self.state.streams_by_id.append(&mut new_streams); + for (id, p) in &mut stream_rtsp_changes { + let mut s = self.state.streams_by_id.get_mut(id) + .ok_or_else(|| Error::new(format!("stream {} missing", id)))?; + mem::swap(&mut s.rtsp_path, p); + } Ok(()) } - /// Deletes a camera. The camera must have no recordings. + /// Deletes a camera and its streams. The camera must have no recordings. pub fn delete_camera(&mut self, id: i32) -> Result<(), Error> { - let (has_recordings, uuid) = - self.state.cameras_by_id.get(&id) - .map(|c| (c.range.is_some(), c.uuid)) - .ok_or_else(|| Error::new(format!("No such camera {} to remove", id)))?; - if has_recordings { - return Err(Error::new(format!("Can't remove camera {}; has recordings.", id))); - }; - let mut stmt = self.conn.prepare_cached(r"delete from camera where id = :id")?; - let rows = stmt.execute_named(&[(":id", &id)])?; - if rows != 1 { - return Err(Error::new(format!("Camera {} missing from database", id))); + let uuid = self.state.cameras_by_id.get(&id) + .map(|c| c.uuid) + .ok_or_else(|| Error::new(format!("No such camera {} to remove", id)))?; + let mut streams_to_delete = Vec::new(); + let tx = self.conn.transaction()?; + { + let mut stream_stmt = tx.prepare_cached(r"delete from stream where id = :id")?; + for (stream_id, stream) in &self.state.streams_by_id { + if stream.camera_id != id { continue }; + if stream.range.is_some() { + return Err(Error::new(format!("Can't remove camera {}; has recordings.", id))); + } + let rows = stream_stmt.execute_named(&[(":id", stream_id)])?; + if rows != 1 { + return Err(Error::new(format!("Stream {} missing from database", id))); + } + streams_to_delete.push(*stream_id); + } + let mut cam_stmt = tx.prepare_cached(r"delete from camera where id = :id")?; + let rows = cam_stmt.execute_named(&[(":id", &id)])?; + if rows != 1 { + return Err(Error::new(format!("Camera {} missing from database", id))); + } + } + tx.commit()?; + for id in streams_to_delete { + self.state.streams_by_id.remove(&id); } - self.state.cameras_by_id.remove(&id); self.state.cameras_by_uuid.remove(&uuid); return Ok(()) } @@ -1326,7 +1512,7 @@ impl Database { from recording where - camera_id = :camera_id and + stream_id = :stream_id and recording.start_time_90k > :start_time_90k - {} and recording.start_time_90k < :end_time_90k and recording.start_time_90k + recording.duration_90k > :start_time_90k @@ -1357,9 +1543,10 @@ impl Database { } let db = Database(Mutex::new(LockedDatabase{ conn: conn, - state: State{ + state: State { cameras_by_id: BTreeMap::new(), cameras_by_uuid: BTreeMap::new(), + streams_by_id: BTreeMap::new(), video_sample_entries: BTreeMap::new(), playback_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())), list_recordings_by_time_sql: list_recordings_by_time_sql, @@ -1369,9 +1556,11 @@ impl Database { let l = &mut *db.lock(); l.init_video_sample_entries().annotate_err("init_video_sample_entries")?; l.init_cameras().annotate_err("init_cameras")?; - for (&camera_id, ref mut camera) in &mut l.state.cameras_by_id { - // TODO: we could use one thread per camera if we had multiple db conns. - init_recordings(&mut l.conn, camera_id, camera) + l.init_streams().annotate_err("init_streams")?; + for (&stream_id, ref mut stream) in &mut l.state.streams_by_id { + // TODO: we could use one thread per stream if we had multiple db conns. + let camera = l.state.cameras_by_id.get(&stream.camera_id).unwrap(); + init_recordings(&mut l.conn, stream_id, camera, stream) .annotate_err("init_recordings")?; } } @@ -1422,21 +1611,22 @@ mod tests { assert_eq!("test-camera", row.host); assert_eq!("foo", row.username); assert_eq!("bar", row.password); - assert_eq!("/main", row.main_rtsp_path); - assert_eq!("/sub", row.sub_rtsp_path); - assert_eq!(42, row.retain_bytes); - assert_eq!(None, row.range); - assert_eq!(recording::Duration(0), row.duration); - assert_eq!(0, row.sample_file_bytes); + //assert_eq!("/main", row.main_rtsp_path); + //assert_eq!("/sub", row.sub_rtsp_path); + //assert_eq!(42, row.retain_bytes); + //assert_eq!(None, row.range); + //assert_eq!(recording::Duration(0), row.duration); + //assert_eq!(0, row.sample_file_bytes); } } assert_eq!(1, rows); + let stream_id = camera_id; // TODO rows = 0; { let db = db.lock(); let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); - db.list_recordings_by_time(camera_id, all_time, |_row| { + db.list_recordings_by_time(stream_id, all_time, |_row| { rows += 1; Ok(()) }).unwrap(); @@ -1444,30 +1634,24 @@ mod tests { assert_eq!(0, rows); } - fn assert_single_recording(db: &Database, camera_uuid: Uuid, r: &RecordingToInsert) { - let mut rows = 0; - let mut camera_id = -1; + fn assert_single_recording(db: &Database, stream_id: i32, r: &RecordingToInsert) { { let db = db.lock(); - for row in db.cameras_by_id().values() { - rows += 1; - camera_id = row.id; - assert_eq!(camera_uuid, row.uuid); - assert_eq!(Some(r.time.clone()), row.range); - assert_eq!(r.sample_file_bytes as i64, row.sample_file_bytes); - assert_eq!(r.time.end - r.time.start, row.duration); - } + let stream = db.streams_by_id().get(&stream_id).unwrap(); + assert_eq!(Some(r.time.clone()), stream.range); + assert_eq!(r.sample_file_bytes as i64, stream.sample_file_bytes); + assert_eq!(r.time.end - r.time.start, stream.duration); + db.cameras_by_id().get(&stream.camera_id).unwrap(); } - assert_eq!(1, rows); // TODO(slamb): test that the days logic works correctly. - rows = 0; + let mut rows = 0; let mut recording_id = -1; { let db = db.lock(); let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); - db.list_recordings_by_time(camera_id, all_time, |row| { + db.list_recordings_by_time(stream_id, all_time, |row| { rows += 1; recording_id = row.id; assert_eq!(r.time, @@ -1482,7 +1666,7 @@ mod tests { assert_eq!(1, rows); rows = 0; - db.lock().list_oldest_sample_files(camera_id, |row| { + db.lock().list_oldest_sample_files(stream_id, |row| { rows += 1; assert_eq!(recording_id, row.recording_id); assert_eq!(r.sample_file_uuid, row.uuid); @@ -1514,21 +1698,21 @@ mod tests { let two_min = recording::Duration(2 * 60 * TIME_UNITS_PER_SEC); let three_min = recording::Duration(3 * 60 * TIME_UNITS_PER_SEC); let four_min = recording::Duration(4 * 60 * TIME_UNITS_PER_SEC); - let test_day1 = &CameraDayKey(*b"2015-12-31"); - let test_day2 = &CameraDayKey(*b"2016-01-01"); + let test_day1 = &StreamDayKey(*b"2015-12-31"); + let test_day2 = &StreamDayKey(*b"2016-01-01"); adjust_days(test_time .. test_time + one_min, 1, &mut m); assert_eq!(1, m.len()); - assert_eq!(Some(&CameraDayValue{recordings: 1, duration: one_min}), m.get(test_day1)); + assert_eq!(Some(&StreamDayValue{recordings: 1, duration: one_min}), m.get(test_day1)); // Add to a day. adjust_days(test_time .. test_time + one_min, 1, &mut m); assert_eq!(1, m.len()); - assert_eq!(Some(&CameraDayValue{recordings: 2, duration: two_min}), m.get(test_day1)); + assert_eq!(Some(&StreamDayValue{recordings: 2, duration: two_min}), m.get(test_day1)); // Subtract from a day. adjust_days(test_time .. test_time + one_min, -1, &mut m); assert_eq!(1, m.len()); - assert_eq!(Some(&CameraDayValue{recordings: 1, duration: one_min}), m.get(test_day1)); + assert_eq!(Some(&StreamDayValue{recordings: 1, duration: one_min}), m.get(test_day1)); // Remove a day. adjust_days(test_time .. test_time + one_min, -1, &mut m); @@ -1537,20 +1721,20 @@ mod tests { // Create two days. adjust_days(test_time .. test_time + three_min, 1, &mut m); assert_eq!(2, m.len()); - assert_eq!(Some(&CameraDayValue{recordings: 1, duration: one_min}), m.get(test_day1)); - assert_eq!(Some(&CameraDayValue{recordings: 1, duration: two_min}), m.get(test_day2)); + assert_eq!(Some(&StreamDayValue{recordings: 1, duration: one_min}), m.get(test_day1)); + assert_eq!(Some(&StreamDayValue{recordings: 1, duration: two_min}), m.get(test_day2)); // Add to two days. adjust_days(test_time .. test_time + three_min, 1, &mut m); assert_eq!(2, m.len()); - assert_eq!(Some(&CameraDayValue{recordings: 2, duration: two_min}), m.get(test_day1)); - assert_eq!(Some(&CameraDayValue{recordings: 2, duration: four_min}), m.get(test_day2)); + assert_eq!(Some(&StreamDayValue{recordings: 2, duration: two_min}), m.get(test_day1)); + assert_eq!(Some(&StreamDayValue{recordings: 2, duration: four_min}), m.get(test_day2)); // Subtract from two days. adjust_days(test_time .. test_time + three_min, -1, &mut m); assert_eq!(2, m.len()); - assert_eq!(Some(&CameraDayValue{recordings: 1, duration: one_min}), m.get(test_day1)); - assert_eq!(Some(&CameraDayValue{recordings: 1, duration: two_min}), m.get(test_day2)); + assert_eq!(Some(&StreamDayValue{recordings: 1, duration: one_min}), m.get(test_day1)); + assert_eq!(Some(&StreamDayValue{recordings: 1, duration: two_min}), m.get(test_day2)); // Remove two days. adjust_days(test_time .. test_time + three_min, -1, &mut m); @@ -1560,11 +1744,11 @@ mod tests { #[test] fn test_day_bounds() { testutil::init(); - assert_eq!(CameraDayKey(*b"2017-10-10").bounds(), // normal day (24 hrs) + assert_eq!(StreamDayKey(*b"2017-10-10").bounds(), // normal day (24 hrs) recording::Time(135685692000000) .. recording::Time(135693468000000)); - assert_eq!(CameraDayKey(*b"2017-03-12").bounds(), // spring forward (23 hrs) + assert_eq!(StreamDayKey(*b"2017-03-12").bounds(), // spring forward (23 hrs) recording::Time(134037504000000) .. recording::Time(134044956000000)); - assert_eq!(CameraDayKey(*b"2017-11-05").bounds(), // fall back (25 hrs) + assert_eq!(StreamDayKey(*b"2017-11-05").bounds(), // fall back (25 hrs) recording::Time(135887868000000) .. recording::Time(135895968000000)); } @@ -1579,10 +1763,10 @@ mod tests { fn test_version_too_old() { testutil::init(); let c = setup_conn(); - c.execute_batch("delete from version; insert into version values (0, 0, '');").unwrap(); + c.execute_batch("delete from version; insert into version values (1, 0, '');").unwrap(); let e = Database::new(c).unwrap_err(); assert!(e.description().starts_with( - "Database schema version 0 is too old (expected 1)"), "got: {:?}", + "Database schema version 1 is too old (expected 2)"), "got: {:?}", e.description()); } @@ -1590,10 +1774,10 @@ mod tests { fn test_version_too_new() { testutil::init(); let c = setup_conn(); - c.execute_batch("delete from version; insert into version values (2, 0, '');").unwrap(); + c.execute_batch("delete from version; insert into version values (3, 0, '');").unwrap(); let e = Database::new(c).unwrap_err(); assert!(e.description().starts_with( - "Database schema version 2 is too new (expected 1)"), "got: {:?}", e.description()); + "Database schema version 3 is too new (expected 2)"), "got: {:?}", e.description()); } /// Basic test of running some queries on a fresh database. @@ -1618,13 +1802,16 @@ mod tests { host: "test-camera".to_owned(), username: "foo".to_owned(), password: "bar".to_owned(), - main_rtsp_path: "/main".to_owned(), - sub_rtsp_path: "/sub".to_owned(), + rtsp_paths: [ + "/main".to_owned(), + "/sub".to_owned(), + ], }).unwrap(); { let mut l = db.lock(); + let stream_id = l.cameras_by_id().get(&camera_id).unwrap().streams[0].unwrap(); let mut tx = l.tx().unwrap(); - tx.update_retention(camera_id, 42).unwrap(); + tx.update_retention(stream_id, 42).unwrap(); tx.commit().unwrap(); } let camera_uuid = { db.lock().cameras_by_id().get(&camera_id).unwrap().uuid }; @@ -1656,8 +1843,9 @@ mod tests { // Inserting a recording should succeed and remove its uuid from the reserved table. let start = recording::Time(1430006400 * TIME_UNITS_PER_SEC); - let recording = RecordingToInsert{ - camera_id: camera_id, + let stream_id = camera_id; // TODO + let recording = RecordingToInsert { + stream_id, sample_file_bytes: 42, run_offset: 0, flags: 0, @@ -1680,19 +1868,19 @@ mod tests { vec![uuid_to_keep]); // Queries should return the correct result (with caches update on insert). - assert_single_recording(&db, camera_uuid, &recording); + assert_single_recording(&db, stream_id, &recording); // Queries on a fresh database should return the correct result (with caches populated from // existing database contents rather than built on insert). let conn = db.close(); let db = Database::new(conn).unwrap(); - assert_single_recording(&db, camera_uuid, &recording); + assert_single_recording(&db, stream_id, &recording); // Deleting a recording should succeed, update the min/max times, and re-reserve the uuid. { let mut db = db.lock(); let mut v = Vec::new(); - db.list_oldest_sample_files(camera_id, |r| { v.push(r); true }).unwrap(); + db.list_oldest_sample_files(stream_id, |r| { v.push(r); true }).unwrap(); assert_eq!(1, v.len()); let mut tx = db.tx().unwrap(); tx.delete_recordings(&v).unwrap(); diff --git a/src/dir.rs b/src/dir.rs index e23485d..32495a2 100644 --- a/src/dir.rs +++ b/src/dir.rs @@ -254,7 +254,7 @@ pub fn start_syncer(dir: Arc) } pub struct NewLimit { - pub camera_id: i32, + pub stream_id: i32, pub limit: i64, } @@ -272,30 +272,30 @@ pub fn lower_retention(dir: Arc, limits: &[NewLimit]) -> Result<( let mut to_delete = Vec::new(); for l in limits { let before = to_delete.len(); - let camera = db.cameras_by_id().get(&l.camera_id) - .ok_or_else(|| Error::new(format!("no such camera {}", l.camera_id)))?; - if l.limit >= camera.sample_file_bytes { continue } - get_rows_to_delete(db, l.camera_id, camera, camera.retain_bytes - l.limit, + let stream = db.streams_by_id().get(&l.stream_id) + .ok_or_else(|| Error::new(format!("no such stream {}", l.stream_id)))?; + if l.limit >= stream.sample_file_bytes { continue } + get_rows_to_delete(db, l.stream_id, stream, stream.retain_bytes - l.limit, &mut to_delete)?; - info!("camera {}, {}->{}, deleting {} rows", camera.short_name, - camera.sample_file_bytes, l.limit, to_delete.len() - before); + info!("stream {}, {}->{}, deleting {} rows", stream.id, + stream.sample_file_bytes, l.limit, to_delete.len() - before); } Ok(to_delete) }) } -/// Gets rows to delete to bring a camera's disk usage within bounds. -fn get_rows_to_delete(db: &db::LockedDatabase, camera_id: i32, - camera: &db::Camera, extra_bytes_needed: i64, +/// Gets rows to delete to bring a stream's disk usage within bounds. +fn get_rows_to_delete(db: &db::LockedDatabase, stream_id: i32, + stream: &db::Stream, extra_bytes_needed: i64, to_delete: &mut Vec) -> Result<(), Error> { - let bytes_needed = camera.sample_file_bytes + extra_bytes_needed - camera.retain_bytes; + let bytes_needed = stream.sample_file_bytes + extra_bytes_needed - stream.retain_bytes; let mut bytes_to_delete = 0; if bytes_needed <= 0 { - debug!("{}: have remaining quota of {}", camera.short_name, -bytes_needed); + debug!("{}: have remaining quota of {}", stream.id, -bytes_needed); return Ok(()); } let mut n = 0; - db.list_oldest_sample_files(camera_id, |row| { + db.list_oldest_sample_files(stream_id, |row| { bytes_to_delete += row.sample_file_bytes as i64; to_delete.push(row); n += 1; @@ -303,10 +303,10 @@ fn get_rows_to_delete(db: &db::LockedDatabase, camera_id: i32, })?; if bytes_needed > bytes_to_delete { return Err(Error::new(format!("{}: couldn't find enough files to delete: {} left.", - camera.short_name, bytes_needed))); + stream.id, bytes_needed))); } info!("{}: deleting {} bytes in {} recordings ({} bytes needed)", - camera.short_name, bytes_to_delete, n, bytes_needed); + stream.id, bytes_to_delete, n, bytes_needed); Ok(()) } @@ -343,12 +343,12 @@ impl Syncer { } } - /// Rotates files for all cameras and deletes stale reserved uuids from previous runs. + /// Rotates files for all streams and deletes stale reserved uuids from previous runs. fn initial_rotation(&mut self) -> Result<(), Error> { self.do_rotation(|db| { let mut to_delete = Vec::new(); - for (camera_id, camera) in db.cameras_by_id() { - get_rows_to_delete(&db, *camera_id, camera, 0, &mut to_delete)?; + for (stream_id, stream) in db.streams_by_id() { + get_rows_to_delete(&db, *stream_id, stream, 0, &mut to_delete)?; } Ok(to_delete) }) @@ -389,7 +389,7 @@ impl Syncer { fn save(&mut self, recording: db::RecordingToInsert, f: fs::File) { if let Err(e) = self.save_helper(&recording, f) { error!("camera {}: will discard recording {} due to error while saving: {}", - recording.camera_id, recording.sample_file_uuid, e); + recording.stream_id, recording.sample_file_uuid, e); self.to_unlink.push(recording.sample_file_uuid); return; } @@ -416,10 +416,10 @@ impl Syncer { let mut db = self.dir.db.lock(); let mut new_next_uuid = l.next_uuid; { - let camera = - db.cameras_by_id().get(&recording.camera_id) - .ok_or_else(|| Error::new(format!("no such camera {}", recording.camera_id)))?; - get_rows_to_delete(&db, recording.camera_id, camera, + let stream = + db.streams_by_id().get(&recording.stream_id) + .ok_or_else(|| Error::new(format!("no such stream {}", recording.stream_id)))?; + get_rows_to_delete(&db, recording.stream_id, stream, recording.sample_file_bytes as i64, &mut to_delete)?; } let mut tx = db.tx()?; @@ -490,7 +490,7 @@ struct InnerWriter<'a> { adjuster: ClockAdjuster, - camera_id: i32, + stream_id: i32, video_sample_entry_id: i32, run_offset: i32, @@ -567,20 +567,20 @@ pub struct PreviousWriter { impl<'a> Writer<'a> { /// Opens the writer; for use by `SampleFileDir` (which should supply `f`). - fn open(f: fs::File, uuid: Uuid, prev: Option, camera_id: i32, + fn open(f: fs::File, uuid: Uuid, prev: Option, stream_id: i32, video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel) -> Result { Ok(Writer(Some(InnerWriter{ - syncer_channel: syncer_channel, - f: f, + syncer_channel, + f, index: recording::SampleIndexEncoder::new(), - uuid: uuid, + uuid, corrupt: false, hasher: hash::Hasher::new(hash::MessageDigest::sha1())?, prev_end: prev.map(|p| p.end_time), local_start: recording::Time(i64::max_value()), adjuster: ClockAdjuster::new(prev.map(|p| p.local_time_delta.0)), - camera_id: camera_id, - video_sample_entry_id: video_sample_entry_id, + stream_id, + video_sample_entry_id, run_offset: prev.map(|p| p.run_offset + 1).unwrap_or(0), unflushed_sample: None, }))) @@ -663,7 +663,7 @@ impl<'a> InnerWriter<'a> { else { 0 }; let local_start_delta = self.local_start - start; let recording = db::RecordingToInsert{ - camera_id: self.camera_id, + stream_id: self.stream_id, sample_file_bytes: self.index.sample_file_bytes, time: start .. end, local_time_delta: local_start_delta, diff --git a/src/json.rs b/src/json.rs index df5c9bb..1dca6bf 100644 --- a/src/json.rs +++ b/src/json.rs @@ -29,6 +29,7 @@ // along with this program. If not, see . use db; +use error::Error; use serde::ser::{SerializeMap, SerializeSeq, Serializer}; use std::collections::BTreeMap; use uuid::Uuid; @@ -41,17 +42,25 @@ pub struct TopLevel<'a> { // Use a custom serializer which presents the map's values as a sequence and includes the // "days" attribute or not, according to the bool in the tuple. #[serde(serialize_with = "TopLevel::serialize_cameras")] - pub cameras: (&'a BTreeMap, bool), + pub cameras: (&'a db::LockedDatabase, bool), } -/// JSON serialization wrapper for a single camera when processing `/cameras/` and -/// `/cameras//`. See `design/api.md` for details. +/// JSON serialization wrapper for a single camera when processing `/api/` and +/// `/api/cameras//`. See `design/api.md` for details. #[derive(Debug, Serialize)] #[serde(rename_all="camelCase")] pub struct Camera<'a> { pub uuid: Uuid, pub short_name: &'a str, pub description: &'a str, + + #[serde(serialize_with = "Camera::serialize_streams")] + pub streams: [Option>; 2], +} + +#[derive(Debug, Serialize)] +#[serde(rename_all="camelCase")] +pub struct Stream<'a> { pub retain_bytes: i64, pub min_start_time_90k: Option, pub max_end_time_90k: Option, @@ -59,26 +68,54 @@ pub struct Camera<'a> { pub total_sample_file_bytes: i64, #[serde(skip_serializing_if = "Option::is_none")] - #[serde(serialize_with = "Camera::serialize_days")] - pub days: Option<&'a BTreeMap>, + #[serde(serialize_with = "Stream::serialize_days")] + pub days: Option<&'a BTreeMap>, } impl<'a> Camera<'a> { - pub fn new(c: &'a db::Camera, include_days: bool) -> Self { - Camera{ + pub fn wrap(c: &'a db::Camera, db: &'a db::LockedDatabase, include_days: bool) -> Result { + Ok(Camera { uuid: c.uuid, short_name: &c.short_name, description: &c.description, - retain_bytes: c.retain_bytes, - min_start_time_90k: c.range.as_ref().map(|r| r.start.0), - max_end_time_90k: c.range.as_ref().map(|r| r.end.0), - total_duration_90k: c.duration.0, - total_sample_file_bytes: c.sample_file_bytes, - days: if include_days { Some(&c.days) } else { None }, - } + streams: [ + Stream::wrap(db, c.streams[0], include_days)?, + Stream::wrap(db, c.streams[1], include_days)?, + ], + }) } - fn serialize_days(days: &Option<&BTreeMap>, + fn serialize_streams(streams: &[Option>; 2], serializer: S) -> Result + where S: Serializer { + let mut map = serializer.serialize_map(Some(streams.len()))?; + for (i, s) in streams.iter().enumerate() { + if let &Some(ref s) = s { + map.serialize_key(db::StreamType::from_index(i).expect("invalid stream type index").as_str())?; + map.serialize_value(s)?; + } + } + map.end() + } +} + +impl<'a> Stream<'a> { + fn wrap(db: &'a db::LockedDatabase, id: Option, include_days: bool) -> Result, Error> { + let id = match id { + Some(id) => id, + None => return Ok(None), + }; + let s = db.streams_by_id().get(&id).ok_or_else(|| Error::new(format!("missing stream {}", id)))?; + Ok(Some(Stream { + retain_bytes: s.retain_bytes, + min_start_time_90k: s.range.as_ref().map(|r| r.start.0), + max_end_time_90k: s.range.as_ref().map(|r| r.end.0), + total_duration_90k: s.duration.0, + total_sample_file_bytes: s.sample_file_bytes, + days: if include_days { Some(&s.days) } else { None }, + })) + } + + fn serialize_days(days: &Option<&BTreeMap>, serializer: S) -> Result where S: Serializer { let days = match *days { @@ -89,7 +126,7 @@ impl<'a> Camera<'a> { for (k, v) in days { map.serialize_key(k.as_ref())?; let bounds = k.bounds(); - map.serialize_value(&CameraDayValue{ + map.serialize_value(&StreamDayValue{ start_time_90k: bounds.start.0, end_time_90k: bounds.end.0, total_duration_90k: v.duration.0, @@ -101,7 +138,7 @@ impl<'a> Camera<'a> { #[derive(Debug, Serialize)] #[serde(rename_all="camelCase")] -struct CameraDayValue { +struct StreamDayValue { pub start_time_90k: i64, pub end_time_90k: i64, pub total_duration_90k: i64, @@ -109,12 +146,14 @@ struct CameraDayValue { impl<'a> TopLevel<'a> { /// Serializes cameras as a list (rather than a map), optionally including the `days` field. - fn serialize_cameras(cameras: &(&BTreeMap, bool), + fn serialize_cameras(cameras: &(&db::LockedDatabase, bool), serializer: S) -> Result where S: Serializer { - let mut seq = serializer.serialize_seq(Some(cameras.0.len()))?; - for c in cameras.0.values() { - seq.serialize_element(&Camera::new(c, cameras.1))?; + let (db, include_days) = *cameras; + let cs = db.cameras_by_id(); + let mut seq = serializer.serialize_seq(Some(cs.len()))?; + for (_, c) in cs { + seq.serialize_element(&Camera::wrap(c, db, include_days).unwrap())?; // TODO: no unwrap. } seq.end() } diff --git a/src/mp4.rs b/src/mp4.rs index 8635313..df91de9 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -381,7 +381,7 @@ impl Segment { self.index_once.call_once(|| { let index = unsafe { &mut *self.index.get() }; *index = db.lock() - .with_recording_playback(self.s.camera_id, self.s.recording_id, + .with_recording_playback(self.s.stream_id, self.s.recording_id, |playback| self.build_index(playback)) .map_err(|e| { error!("Unable to build index for segment: {:?}", e); }); }); @@ -629,7 +629,7 @@ impl Slice { } let truns = mp4.0.db.lock() - .with_recording_playback(s.s.camera_id, s.s.recording_id, + .with_recording_playback(s.s.stream_id, s.s.recording_id, |playback| s.truns(playback, pos, len)) .map_err(|e| { Error::new(format!("Unable to build index for segment: {:?}", e)) })?; let truns = ARefs::new(truns); @@ -762,7 +762,7 @@ impl FileBuilder { if prev.s.have_trailing_zero() { return Err(Error::new(format!( "unable to append recording {}/{} after recording {}/{} with trailing zero", - row.camera_id, row.id, prev.s.camera_id, prev.s.recording_id))); + row.stream_id, row.id, prev.s.stream_id, prev.s.recording_id))); } } let s = Segment::new(db, &row, rel_range_90k, self.next_frame_num)?; @@ -811,7 +811,7 @@ impl FileBuilder { // Update the etag to reflect this segment. let mut data = [0_u8; 24]; let mut cursor = io::Cursor::new(&mut data[..]); - cursor.write_i32::(s.s.camera_id)?; + cursor.write_i32::(s.s.stream_id)?; cursor.write_i32::(s.s.recording_id)?; cursor.write_i64::(s.s.start.0)?; cursor.write_i32::(d.start)?; @@ -1452,7 +1452,7 @@ impl FileInner { fn get_video_sample_data(&self, i: usize, r: Range) -> Result { let s = &self.segments[i]; let uuid = { - self.db.lock().with_recording_playback(s.s.camera_id, s.s.recording_id, + self.db.lock().with_recording_playback(s.s.stream_id, s.s.recording_id, |p| Ok(p.sample_file_uuid))? }; let f = self.dir.open_sample_file(uuid)?; @@ -1541,7 +1541,7 @@ mod tests { use strutil; use super::*; use stream::{self, Opener, Stream}; - use testutil::{self, TestDb, TEST_CAMERA_ID}; + use testutil::{self, TestDb, TEST_STREAM_ID}; fn fill_slice(slice: &mut [u8], e: &E, start: u64) { let mut p = 0; @@ -1765,7 +1765,7 @@ mod tests { extra_data.width, extra_data.height, extra_data.sample_entry, extra_data.rfc6381_codec).unwrap(); let mut output = db.dir.create_writer(&db.syncer_channel, None, - TEST_CAMERA_ID, video_sample_entry_id).unwrap(); + TEST_STREAM_ID, video_sample_entry_id).unwrap(); // end_pts is the pts of the end of the most recent frame (start + duration). // It's needed because dir::Writer calculates a packet's duration from its pts and the @@ -1799,7 +1799,7 @@ mod tests { let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); { let db = db.lock(); - db.list_recordings_by_time(TEST_CAMERA_ID, all_time, |r| { + db.list_recordings_by_time(TEST_STREAM_ID, all_time, |r| { let d = r.duration_90k; assert!(skip_90k + shorten_90k < d); builder.append(&*db, r, skip_90k .. d - shorten_90k).unwrap(); @@ -2259,7 +2259,7 @@ mod bench { let segment = { let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); let mut row = None; - db.list_recordings_by_time(testutil::TEST_CAMERA_ID, all_time, |r| { + db.list_recordings_by_time(testutil::TEST_STREAM_ID, all_time, |r| { row = Some(r); Ok(()) }).unwrap(); @@ -2267,7 +2267,7 @@ mod bench { let rel_range_90k = 0 .. row.duration_90k; super::Segment::new(&db, &row, rel_range_90k, 1).unwrap() }; - db.with_recording_playback(segment.s.camera_id, segment.s.recording_id, |playback| { + db.with_recording_playback(segment.s.stream_id, segment.s.recording_id, |playback| { let v = segment.build_index(playback).unwrap(); // warm. b.bytes = v.len() as u64; // define the benchmark performance in terms of output bytes. b.iter(|| segment.build_index(playback).unwrap()); diff --git a/src/recording.rs b/src/recording.rs index cb2c49e..4547909 100644 --- a/src/recording.rs +++ b/src/recording.rs @@ -354,7 +354,7 @@ impl SampleIndexEncoder { /// Used by the `Mp4FileBuilder` class to splice together recordings into a single virtual .mp4. #[derive(Debug)] pub struct Segment { - pub camera_id: i32, + pub stream_id: i32, pub recording_id: i32, pub start: Time, @@ -381,8 +381,8 @@ impl Segment { pub fn new(db: &db::LockedDatabase, recording: &db::ListRecordingsRow, desired_range_90k: Range) -> Result { - let mut self_ = Segment{ - camera_id: recording.camera_id, + let mut self_ = Segment { + stream_id: recording.stream_id, recording_id: recording.id, start: recording.start, begin: None, @@ -413,7 +413,7 @@ impl Segment { // Slow path. Need to iterate through the index. trace!("recording::Segment::new slow path, desired_range_90k={:?}, recording={:#?}", self_.desired_range_90k, recording); - db.with_recording_playback(self_.camera_id, self_.recording_id, |playback| { + db.with_recording_playback(self_.stream_id, self_.recording_id, |playback| { let mut begin = Box::new(SampleIndexIterator::new()); let data = &(&playback).video_index; let mut it = SampleIndexIterator::new(); @@ -481,7 +481,7 @@ impl Segment { pub fn foreach(&self, playback: &db::RecordingPlayback, mut f: F) -> Result<(), Error> where F: FnMut(&SampleIndexIterator) -> Result<(), Error> { trace!("foreach on recording {}/{}: {} frames, actual_start_90k: {}", - self.camera_id, self.recording_id, self.frames, self.actual_start_90k()); + self.stream_id, self.recording_id, self.frames, self.actual_start_90k()); let data = &(&playback).video_index; let mut it = match self.begin { Some(ref b) => **b, @@ -490,11 +490,11 @@ impl Segment { if it.uninitialized() { if !it.next(data)? { return Err(Error::new(format!("recording {}/{}: no frames", - self.camera_id, self.recording_id))); + self.stream_id, self.recording_id))); } if !it.is_key() { return Err(Error::new(format!("recording {}/{}: doesn't start with key frame", - self.camera_id, self.recording_id))); + self.stream_id, self.recording_id))); } } let mut have_frame = true; @@ -502,7 +502,7 @@ impl Segment { for i in 0 .. self.frames { if !have_frame { return Err(Error::new(format!("recording {}/{}: expected {} frames, found only {}", - self.camera_id, self.recording_id, self.frames, + self.stream_id, self.recording_id, self.frames, i+1))); } if it.is_key() { @@ -510,7 +510,7 @@ impl Segment { if key_frame > self.key_frames { return Err(Error::new(format!( "recording {}/{}: more than expected {} key frames", - self.camera_id, self.recording_id, self.key_frames))); + self.stream_id, self.recording_id, self.key_frames))); } } @@ -522,7 +522,7 @@ impl Segment { } if key_frame < self.key_frames { return Err(Error::new(format!("recording {}/{}: expected {} key frames, found only {}", - self.camera_id, self.recording_id, self.key_frames, + self.stream_id, self.recording_id, self.key_frames, key_frame))); } Ok(()) @@ -656,7 +656,7 @@ mod tests { fn get_frames(db: &db::Database, segment: &Segment, f: F) -> Vec where F: Fn(&SampleIndexIterator) -> T { let mut v = Vec::new(); - db.lock().with_recording_playback(segment.camera_id, segment.recording_id, |playback| { + db.lock().with_recording_playback(segment.stream_id, segment.recording_id, |playback| { segment.foreach(playback, |it| { v.push(f(it)); Ok(()) }) }).unwrap(); v diff --git a/src/schema.sql b/src/schema.sql index fe55230..5e643af 100644 --- a/src/schema.sql +++ b/src/schema.sql @@ -63,31 +63,34 @@ create table camera ( username text, -- The password to use when accessing the camera. - password text, + password text +); - -- The path (starting with "/") to use in rtsp:// URLs to reference this - -- camera's "main" (full-quality) video stream. - main_rtsp_path text, +create table stream ( + id integer primary key, + camera_id integer not null references camera (id), + type text not null check (type in ('main', 'sub')), - -- The path (starting with "/") to use in rtsp:// URLs to reference this - -- camera's "sub" (low-bandwidth) video stream. - sub_rtsp_path text, + -- The path (starting with "/") to use in rtsp:// URLs to for this stream. + rtsp_path text not null, -- The number of bytes of video to retain, excluding the currently-recording -- file. Older files will be deleted as necessary to stay within this limit. retain_bytes integer not null check (retain_bytes >= 0), - -- The low 32 bits of the next recording id to assign for this camera. + -- The low 32 bits of the next recording id to assign for this stream. -- Typically this is the maximum current recording + 1, but it does -- not decrease if that recording is deleted. - next_recording_id integer not null check (next_recording_id >= 0) + next_recording_id integer not null check (next_recording_id >= 0), + + unique (camera_id, type) ); -- Each row represents a single completed recorded segment of video. -- Recordings are typically ~60 seconds; never more than 5 minutes. create table recording ( - -- The high 32 bits of composite_id are taken from the camera's id, which - -- improves locality. The low 32 bits are taken from the camera's + -- The high 32 bits of composite_id are taken from the stream's id, which + -- improves locality. The low 32 bits are taken from the stream's -- next_recording_id (which should be post-incremented in the same -- transaction). It'd be simpler to use a "without rowid" table and separate -- fields to make up the primary key, but @@ -98,7 +101,7 @@ create table recording ( -- This field is redundant with id above, but used to enforce the reference -- constraint and to structure the recording_start_time index. - camera_id integer not null references camera (id), + stream_id integer not null references stream (id), -- The offset of this recording within a run. 0 means this was the first -- recording made from a RTSP session. The start of the run has id @@ -135,12 +138,12 @@ create table recording ( video_sync_samples integer not null check (video_sync_samples > 0), video_sample_entry_id integer references video_sample_entry (id), - check (composite_id >> 32 = camera_id) + check (composite_id >> 32 = stream_id) ); create index recording_cover on recording ( - -- Typical queries use "where camera_id = ? order by start_time_90k". - camera_id, + -- Typical queries use "where stream_id = ? order by start_time_90k". + stream_id, start_time_90k, -- These fields are not used for ordering; they cover most queries so @@ -202,4 +205,4 @@ create table video_sample_entry ( ); insert into version (id, unix_time, notes) - values (1, cast(strftime('%s', 'now') as int), 'db creation'); + values (2, cast(strftime('%s', 'now') as int), 'db creation'); diff --git a/src/streamer.rs b/src/streamer.rs index 73702f4..61054d5 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -29,7 +29,7 @@ // along with this program. If not, see . use clock::{Clocks, TimerGuard}; -use db::{Camera, Database}; +use db::{Camera, Database, Stream}; use dir; use error::Error; use h264; @@ -62,7 +62,7 @@ pub struct Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { syncer_channel: dir::SyncerChannel, clocks: &'a C, opener: &'a stream::Opener, - camera_id: i32, + stream_id: i32, short_name: String, url: String, redacted_url: String, @@ -77,9 +77,9 @@ struct WriterState<'a> { impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { pub fn new<'b>(env: &Environment<'a, 'b, C, S>, syncer_channel: dir::SyncerChannel, - camera_id: i32, c: &Camera, rotate_offset_sec: i64, + stream_id: i32, c: &Camera, s: &Stream, rotate_offset_sec: i64, rotate_interval_sec: i64) -> Self { - Streamer{ + Streamer { shutdown: env.shutdown.clone(), rotate_offset_sec: rotate_offset_sec, rotate_interval_sec: rotate_interval_sec, @@ -88,10 +88,10 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { syncer_channel: syncer_channel, clocks: env.clocks, opener: env.opener, - camera_id: camera_id, - short_name: c.short_name.to_owned(), - url: format!("rtsp://{}:{}@{}{}", c.username, c.password, c.host, c.main_rtsp_path), - redacted_url: format!("rtsp://{}:redacted@{}{}", c.username, c.host, c.main_rtsp_path), + stream_id: stream_id, + short_name: format!("{}-{}", c.short_name, s.type_.as_str()), + url: format!("rtsp://{}:{}@{}{}", c.username, c.password, c.host, s.rtsp_path), + redacted_url: format!("rtsp://{}:redacted@{}{}", c.username, c.host, s.rtsp_path), } } @@ -167,7 +167,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clocks, S: 'a + stream::Stream { let r = r + if prev.is_none() { self.rotate_interval_sec } else { 0 }; let _t = TimerGuard::new(self.clocks, || "creating writer"); - let w = self.dir.create_writer(&self.syncer_channel, prev, self.camera_id, + let w = self.dir.create_writer(&self.syncer_channel, prev, self.stream_id, video_sample_entry_id)?; WriterState{ writer: w, @@ -358,8 +358,9 @@ mod tests { { let l = db.db.lock(); let camera = l.cameras_by_id().get(&testutil::TEST_CAMERA_ID).unwrap(); - stream = super::Streamer::new(&env, db.syncer_channel.clone(), testutil::TEST_CAMERA_ID, - camera, 0, 3); + let s = l.streams_by_id().get(&testutil::TEST_STREAM_ID).unwrap(); + stream = super::Streamer::new(&env, db.syncer_channel.clone(), testutil::TEST_STREAM_ID, + camera, s, 0, 3); } stream.run(); assert!(opener.streams.lock().unwrap().is_empty()); @@ -370,7 +371,7 @@ mod tests { // 3-second boundaries (such as 2016-04-26 00:00:03), rotation happens somewhat later: // * the first rotation is always skipped // * the second rotation is deferred until a key frame. - assert_eq!(get_frames(&db, testutil::TEST_CAMERA_ID, 1), &[ + assert_eq!(get_frames(&db, testutil::TEST_STREAM_ID, 1), &[ Frame{start_90k: 0, duration_90k: 90379, is_key: true}, Frame{start_90k: 90379, duration_90k: 89884, is_key: false}, Frame{start_90k: 180263, duration_90k: 89749, is_key: false}, @@ -380,12 +381,12 @@ mod tests { Frame{start_90k: 540015, duration_90k: 90021, is_key: false}, // pts_time 6.0001... Frame{start_90k: 630036, duration_90k: 89958, is_key: false}, ]); - assert_eq!(get_frames(&db, testutil::TEST_CAMERA_ID, 2), &[ + assert_eq!(get_frames(&db, testutil::TEST_STREAM_ID, 2), &[ Frame{start_90k: 0, duration_90k: 90011, is_key: true}, Frame{start_90k: 90011, duration_90k: 0, is_key: false}, ]); let mut recordings = Vec::new(); - db.list_recordings_by_id(testutil::TEST_CAMERA_ID, 1..3, |r| { + db.list_recordings_by_id(testutil::TEST_STREAM_ID, 1..3, |r| { recordings.push(r); Ok(()) }).unwrap(); diff --git a/src/testutil.rs b/src/testutil.rs index ec2eae7..d4d8793 100644 --- a/src/testutil.rs +++ b/src/testutil.rs @@ -45,6 +45,7 @@ static INIT: sync::Once = sync::ONCE_INIT; /// id of the camera created by `TestDb::new` below. pub const TEST_CAMERA_ID: i32 = 1; +pub const TEST_STREAM_ID: i32 = 1; /// Performs global initialization for tests. /// * set up logging. (Note the output can be confusing unless `RUST_TEST_THREADS=1` is set in @@ -89,12 +90,14 @@ impl TestDb { host: "test-camera".to_owned(), username: "foo".to_owned(), password: "bar".to_owned(), - main_rtsp_path: "/main".to_owned(), - sub_rtsp_path: "/sub".to_owned(), + rtsp_paths: [ + "/main".to_owned(), + "/sub".to_owned(), + ], }).unwrap()); test_camera_uuid = l.cameras_by_id().get(&TEST_CAMERA_ID).unwrap().uuid; let mut tx = l.tx().unwrap(); - tx.update_retention(TEST_CAMERA_ID, 1048576).unwrap(); + tx.update_retention(TEST_STREAM_ID, 1048576).unwrap(); tx.commit().unwrap(); } let path = tmpdir.path().to_str().unwrap().to_owned(); @@ -121,7 +124,7 @@ impl TestDb { tx.bypass_reservation_for_testing = true; const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); row_id = tx.insert_recording(&db::RecordingToInsert{ - camera_id: TEST_CAMERA_ID, + stream_id: TEST_STREAM_ID, sample_file_bytes: encoder.sample_file_bytes, time: START_TIME .. START_TIME + recording::Duration(encoder.total_duration_90k as i64), @@ -138,7 +141,7 @@ impl TestDb { tx.commit().unwrap(); } let mut row = None; - db.list_recordings_by_id(TEST_CAMERA_ID, row_id .. row_id + 1, + db.list_recordings_by_id(TEST_STREAM_ID, row_id .. row_id + 1, |r| { row = Some(r); Ok(()) }).unwrap(); row.unwrap() } @@ -155,7 +158,7 @@ pub fn add_dummy_recordings_to_db(db: &db::Database, num: usize) { const START_TIME: recording::Time = recording::Time(1430006400i64 * TIME_UNITS_PER_SEC); const DURATION: recording::Duration = recording::Duration(5399985); let mut recording = db::RecordingToInsert{ - camera_id: TEST_CAMERA_ID, + stream_id: TEST_STREAM_ID, sample_file_bytes: 30104460, flags: 0, time: START_TIME .. (START_TIME + DURATION), diff --git a/src/web.rs b/src/web.rs index d8ce81d..b0eac70 100644 --- a/src/web.rs +++ b/src/web.rs @@ -66,13 +66,13 @@ lazy_static! { } enum Path { - TopLevel, // "/api/" - InitSegment([u8; 20]), // "/api/init/.mp4" - Camera(Uuid), // "/api/cameras//" - CameraRecordings(Uuid), // "/api/cameras//recordings" - CameraViewMp4(Uuid), // "/api/cameras//view.mp4" - CameraViewMp4Segment(Uuid), // "/api/cameras//view.m4s" - Static, // "" + TopLevel, // "/api/" + InitSegment([u8; 20]), // "/api/init/.mp4" + Camera(Uuid), // "/api/cameras//" + StreamRecordings(Uuid, db::StreamType), // "/api/cameras///recordings" + StreamViewMp4(Uuid, db::StreamType), // "/api/cameras///view.mp4" + StreamViewMp4Segment(Uuid, db::StreamType), // "/api/cameras///view.m4s" + Static, // "" NotFound, } @@ -101,18 +101,33 @@ fn decode_path(path: &str) -> Path { None => { return Path::NotFound; }, Some(s) => s, }; - let (uuid, path) = path.split_at(slash); + let uuid = &path[0 .. slash]; + let path = &path[slash+1 .. ]; // TODO(slamb): require uuid to be in canonical format. let uuid = match Uuid::parse_str(uuid) { Ok(u) => u, Err(_) => { return Path::NotFound }, }; + + if path.is_empty() { + return Path::Camera(uuid); + } + + let slash = match path.find('/') { + None => { return Path::NotFound; }, + Some(s) => s, + }; + let (type_, path) = path.split_at(slash); + + let type_ = match db::StreamType::parse(type_) { + None => { return Path::NotFound; }, + Some(t) => t, + }; match path { - "/" => Path::Camera(uuid), - "/recordings" => Path::CameraRecordings(uuid), - "/view.mp4" => Path::CameraViewMp4(uuid), - "/view.m4s" => Path::CameraViewMp4Segment(uuid), + "/recordings" => Path::StreamRecordings(uuid, type_), + "/view.mp4" => Path::StreamViewMp4(uuid, type_), + "/view.m4s" => Path::StreamViewMp4Segment(uuid, type_), _ => Path::NotFound, } } @@ -200,7 +215,7 @@ impl ServiceInner { let db = self.db.lock(); serde_json::to_writer(&mut w, &json::TopLevel { time_zone_name: &self.time_zone_name, - cameras: (db.cameras_by_id(), days), + cameras: (&db, days), })?; } Ok(resp) @@ -212,12 +227,12 @@ impl ServiceInner { let db = self.db.lock(); let camera = db.get_camera(uuid) .ok_or_else(|| Error::new("no such camera".to_owned()))?; - serde_json::to_writer(&mut w, &json::Camera::new(camera, true))? + serde_json::to_writer(&mut w, &json::Camera::wrap(camera, &db, true)?)? }; Ok(resp) } - fn camera_recordings(&self, req: &Request, uuid: Uuid) + fn stream_recordings(&self, req: &Request, uuid: Uuid, type_: db::StreamType) -> Result, Error> { let (r, split) = { let mut time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); @@ -240,7 +255,9 @@ impl ServiceInner { let db = self.db.lock(); let camera = db.get_camera(uuid) .ok_or_else(|| Error::new("no such camera".to_owned()))?; - db.list_aggregated_recordings(camera.id, r, split, |row| { + let stream_id = camera.streams[type_.index()] + .ok_or_else(|| Error::new("no such stream".to_owned()))?; + db.list_aggregated_recordings(stream_id, r, split, |row| { let end = row.ids.end - 1; // in api, ids are inclusive. out.recordings.push(json::Recording { start_id: row.ids.start, @@ -276,23 +293,23 @@ impl ServiceInner { self.not_found() } - fn camera_view_mp4(&self, uuid: Uuid, type_: mp4::Type, query: Option<&str>, req: &Request) - -> Result, Error> { - let camera_id = { + fn stream_view_mp4(&self, req: &Request, uuid: Uuid, stream_type_: db::StreamType, + mp4_type_: mp4::Type) -> Result, Error> { + let stream_id = { let db = self.db.lock(); let camera = db.get_camera(uuid) .ok_or_else(|| Error::new("no such camera".to_owned()))?; - camera.id + camera.streams[stream_type_.index()].ok_or_else(|| Error::new("no such stream".to_owned()))? }; - let mut builder = mp4::FileBuilder::new(type_); - if let Some(q) = query { + let mut builder = mp4::FileBuilder::new(mp4_type_); + if let Some(q) = req.uri().query() { for (key, value) in form_urlencoded::parse(q.as_bytes()) { let (key, value) = (key.borrow(), value.borrow()); match key { "s" => { let s = Segments::parse(value).map_err( |_| Error::new(format!("invalid s parameter: {}", value)))?; - debug!("camera_view_mp4: appending s={:?}", s); + debug!("stream_view_mp4: appending s={:?}", s); let mut est_segments = (s.ids.end - s.ids.start) as usize; if let Some(end) = s.end_time { // There should be roughly ceil((end - start) / @@ -309,15 +326,15 @@ impl ServiceInner { let db = self.db.lock(); let mut prev = None; let mut cur_off = 0; - db.list_recordings_by_id(camera_id, s.ids.clone(), |r| { + db.list_recordings_by_id(stream_id, s.ids.clone(), |r| { // Check for missing recordings. match prev { None if r.id == s.ids.start => {}, None => return Err(Error::new(format!("no such recording {}/{}", - camera_id, s.ids.start))), + stream_id, s.ids.start))), Some(id) if r.id != id + 1 => { return Err(Error::new(format!("no such recording {}/{}", - camera_id, id + 1))); + stream_id, id + 1))); }, _ => {}, }; @@ -330,11 +347,11 @@ impl ServiceInner { let start = cmp::max(0, s.start_time - cur_off); let end = cmp::min(d, end_time - cur_off); let times = start as i32 .. end as i32; - debug!("...appending recording {}/{} with times {:?} (out of dur {})", - r.camera_id, r.id, times, d); + debug!("...appending recording {}/{} with times {:?} \ + (out of dur {})", r.stream_id, r.id, times, d); builder.append(&db, r, start as i32 .. end as i32)?; } else { - debug!("...skipping recording {}/{} dur {}", r.camera_id, r.id, d); + debug!("...skipping recording {}/{} dur {}", r.stream_id, r.id, d); } cur_off += d; Ok(()) @@ -344,11 +361,11 @@ impl ServiceInner { match prev { Some(id) if s.ids.end != id + 1 => { return Err(Error::new(format!("no such recording {}/{}", - camera_id, s.ids.end - 1))); + stream_id, s.ids.end - 1))); }, None => { return Err(Error::new(format!("no such recording {}/{}", - camera_id, s.ids.start))); + stream_id, s.ids.start))); }, _ => {}, }; @@ -451,12 +468,12 @@ impl server::Service for Service { Path::InitSegment(sha1) => self.0.init_segment(sha1, &req), Path::TopLevel => self.0.top_level(&req), Path::Camera(uuid) => self.0.camera(&req, uuid), - Path::CameraRecordings(uuid) => self.0.camera_recordings(&req, uuid), - Path::CameraViewMp4(uuid) => { - self.0.camera_view_mp4(uuid, mp4::Type::Normal, req.uri().query(), &req) + Path::StreamRecordings(uuid, type_) => self.0.stream_recordings(&req, uuid, type_), + Path::StreamViewMp4(uuid, type_) => { + self.0.stream_view_mp4(&req, uuid, type_, mp4::Type::Normal) }, - Path::CameraViewMp4Segment(uuid) => { - self.0.camera_view_mp4(uuid, mp4::Type::MediaSegment, req.uri().query(), &req) + Path::StreamViewMp4Segment(uuid, type_) => { + self.0.stream_view_mp4(&req, uuid, type_, mp4::Type::MediaSegment) }, Path::NotFound => self.0.not_found(), Path::Static => self.0.static_file(&req), @@ -539,10 +556,10 @@ mod bench { } #[bench] - fn serve_camera_recordings(b: &mut Bencher) { + fn serve_stream_recordings(b: &mut Bencher) { testutil::init(); let server = &*SERVER; - let url = reqwest::Url::parse(&format!("{}/api/cameras/{}/recordings", server.base_url, + let url = reqwest::Url::parse(&format!("{}/api/cameras/{}/main/recordings", server.base_url, server.test_camera_uuid)).unwrap(); let mut buf = Vec::new(); let client = reqwest::Client::new(); diff --git a/ui-src/index.html b/ui-src/index.html index c3b4590..ce6bfc5 100644 --- a/ui-src/index.html +++ b/ui-src/index.html @@ -40,8 +40,10 @@