From eee887b9a6a44fb96eae0c2c3d0bde1e22aa3064 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Tue, 20 Dec 2016 22:08:18 -0800 Subject: [PATCH] schema version 1 The advantages of the new schema are: * overlapping recordings can be unambiguously described and viewed. This is a significant problem right now; the clock on my cameras appears to run faster than the (NTP-synchronized) clock on my NVR. Thus, if an RTSP session drops and is quickly reconnected, there's likely to be overlap. * less I/O is required to view mp4s when there are multiple cameras. This is a pretty dramatic difference in the number of database read syscalls with pragma page_size = 1024 (605 -> 39 in one test), although I'm not sure how much of that maps to actual I/O wait time. That's probably as dramatic as it is due to overflow page chaining. But even with larger page sizes, there's an improvement. It helps to stop interleaving the video_index fields from different cameras. There are changes to the JSON API to take advantage of this, described in design/api.md. There's an upgrade procedure, described in guide/schema.md. --- README.md | 54 +++--- design/api.md | 80 +++++---- guide/schema.md | 140 +++++++++++++-- src/db.rs | 387 ++++++++++++++++++++++++++++------------ src/dir.rs | 15 +- src/main.rs | 41 ++++- src/mp4.rs | 40 +++-- src/recording.rs | 43 +++-- src/schema.sql | 60 +++++-- src/streamer.rs | 13 +- src/testutil.rs | 12 +- src/upgrade/mod.rs | 94 ++++++++++ src/upgrade/v0_to_v1.rs | 235 ++++++++++++++++++++++++ src/web.rs | 250 +++++++++++++++++--------- 14 files changed, 1121 insertions(+), 343 deletions(-) create mode 100644 src/upgrade/mod.rs create mode 100644 src/upgrade/v0_to_v1.rs diff --git a/README.md b/README.md index 4997c24..a2baf97 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,8 @@ support for motion detection, no authentication, and no config UI. This is version 0.1, the initial release. Until version 1.0, there will be no compatibility guarantees: configuration and storage formats may change from -version to version. +version to version. There is an [upgrade procedure](guide/schema.md) but it is +not for the faint of heart. I hope to add features such as salient motion detection. It's way too early to make promises, but it seems possible to build a full-featured @@ -209,11 +210,12 @@ each camera you insert using this method. $ sudo -u moonfire-nvr sqlite3 ~moonfire-nvr/db/db sqlite3> insert into camera ( ...> uuid, short_name, description, host, username, password, - ...> main_rtsp_path, sub_rtsp_path, retain_bytes) values ( + ...> main_rtsp_path, sub_rtsp_path, retain_bytes, + ...> next_recording_id) values ( ...> X'b47f48706d91414591cd6c931bf836b4', 'driveway', ...> 'Longer description of this camera', '192.168.1.101', ...> 'admin', '12345', '/Streaming/Channels/1', - ...> '/Streaming/Channels/2', 104857600); + ...> '/Streaming/Channels/2', 104857600, 0); sqlite3> ^D ### Using automatic camera configuration inclusion with `prep.sh` @@ -226,29 +228,29 @@ for easy reading, and editing, and does not have to be altered in formatting, but can if you wish and know what you are doing): insert into camera ( - uuid, - short_name, description, - host, username, password, - main_rtsp_path, sub_rtsp_path, - retain_bytes - ) - values - ( - X'1c944181b8074b8083eb579c8e194451', - 'Front Left', 'Front Left Driveway', - '192.168.1.41', - 'admin', 'secret', - '/Streaming/Channels/1', '/Streaming/Channels/2', - 346870912000 - ), - ( - X'da5921f493ac4279aafe68e69e174026', - 'Front Right', 'Front Right Driveway', - '192.168.1.42', - 'admin', 'secret', - '/Streaming/Channels/1', '/Streaming/Channels/2', - 346870912000 - ); + uuid, + short_name, description, + host, username, password, + main_rtsp_path, sub_rtsp_path, + retain_bytes, next_recording_id + ) + values + ( + X'1c944181b8074b8083eb579c8e194451', + 'Front Left', 'Front Left Driveway', + '192.168.1.41', + 'admin', 'secret', + '/Streaming/Channels/1', '/Streaming/Channels/2', + 346870912000, 0 + ), + ( + X'da5921f493ac4279aafe68e69e174026', + 'Front Right', 'Front Right Driveway', + '192.168.1.42', + 'admin', 'secret', + '/Streaming/Channels/1', '/Streaming/Channels/2', + 346870912000, 0 + ); You'll still have to find the correct rtsp paths, usernames and passwords, and set retained byte counts, as explained above. diff --git a/design/api.md b/design/api.md index dc62cff..bdd6474 100644 --- a/design/api.md +++ b/design/api.md @@ -125,31 +125,28 @@ Valid request parameters: TODO(slamb): once we support annotations, should they be included in the same URI or as a separate `/annotations`? -TODO(slamb): There might be some irregularity in the order if there are -overlapping recordings (such as if the server's clock jumped while running) -but I haven't thought about the details. In general, I'm not really sure how -to handle this case, other than ideally to keep recording stuff no matter what -and present some UI to help the user to fix it after the -fact. +In the property `recordings`, returns a list of recordings in arbitrary order. +Each recording object has the following properties: -In the property `recordings`, returns a list of recordings. Each recording -object has the following properties: - -* `start_time_90k` -* `end_time_90k` +* `start_id`. The id of this recording, which can be used with `/view.mp4` + to retrieve its content. +* `end_id` (optional). If absent, this object describes a single recording. + If present, this indicates that recordings `start_id-end_id` (inclusive) + together are as described. Adjacent recordings from the same RTSP session + may be coalesced in this fashion to reduce the amount of redundant data + transferred. +* `start_time_90k`: the start time of the given recording. Note this may be + less than the requested `start_time_90k` if this recording was ongoing + at the requested time. +* `end_time_90k`: the end time of the given recording. Note this may be + greater than the requested `end_time_90k` if this recording was ongoing at + the requested time. * `sample_file_bytes` * `video_sample_entry_sha1` * `video_sample_entry_width` * `video_sample_entry_height` * `video_samples`: the number of samples (aka frames) of video in this recording. -* TODO: recording id(s)? interior split points for coalesced recordings? - -Recordings may be coalesced if they are adjacent and have the same -`video_sample_entry_*` data. That is, if recording A spans times [t, u) and -recording B spans times [u, v), they may be returned as a single recording -AB spanning times [t, v). Arbitrarily many recordings may be coalesced in this -fashion. Example request URI (with added whitespace between parameters): @@ -165,8 +162,9 @@ Example response: { "recordings": [ { - "end_time_90k": 130985466591817, + "start_id": 1, "start_time_90k": 130985461191810, + "end_time_90k": 130985466591817, "sample_file_bytes": 8405564, "video_sample_entry_sha1": "81710c9c51a02cc95439caa8dd3bc12b77ffe767", "video_sample_entry_width": 1280, @@ -184,18 +182,38 @@ Example response: ### `/camera//view.mp4` -A GET returns a .mp4 file, with an etag and support for range requests. +A GET returns a `.mp4` file, with an etag and support for range requests. Expected query parameters: -* `start_time_90k` -* `end_time_90k` -* `ts`: should be set to `true` to request a subtitle track be added with - human-readable recording timestamps. -* TODO(slamb): possibly `overlap` to indicate what to do about segments of - recording with overlapping wall times. Values might include: - * `error` (return an HTTP error) - * `include_all` (include all, in order of the recording ids) - * `include_latest` (include only the latest by recording id for a - particular segment of time) -* TODO(slamb): gaps allowed or not? maybe a parameter for this also? +* `s` (one or more): a string of the form + `START_ID[-END_ID][.[REL_START_TIME]-[REL_END_TIME]]`. This specifies + recording segments to include. The produced `.mp4` file will be a + concatenation of the segments indicated by all `s` parameters. The ids to + retrieve are as returned by the `/recordings` URL. The optional start and + end times are in 90k units and relative to the start of the first specified + id. These can be used to clip the returned segments. Note they can be used + to skip over some ids entirely; this is allowed so that the caller doesn't + need to know the start time of each interior id. +* `ts` (optional): should be set to `true` to request a subtitle track be + added with human-readable recording timestamps. + +Example request URI to retrieve all of recording id 1 from the given camera: + +``` + /camera/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/view.mp4?s=1 +``` + +Example request URI to retrieve all of recording ids 1–5 from the given camera, +with timestamp subtitles: + +``` + /camera/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/view.mp4?s=1-5&ts=true +``` + +Example request URI to retrieve recording id 1, skipping its first 26 +90,000ths of a second: + +``` + /camera/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/view.mp4?s=1.26 +``` diff --git a/guide/schema.md b/guide/schema.md index d54eb8c..7d73e08 100644 --- a/guide/schema.md +++ b/guide/schema.md @@ -15,33 +15,132 @@ The database schema includes a version number to quickly identify if a the database is compatible with a particular version of the software. Some software upgrades will require you to upgrade the database. -### Unversioned to version 0 +Note that in general upgrades are one-way and backward-incompatible. That is, +you can't downgrade the database to the old version, and you can't run the old +software on the new database. To minimize the corresponding risk, you should +save a backup of the old SQLite database and verify the new software works in +read-only mode prior to deleting the old database. -Early versions of Moonfire NVR did not include the version information in the -schema. You can manually add this information to your schema using the -`sqlite3` commandline. This process is backward compatible, meaning that -software versions that accept an unversioned database will also accept a -version 0 database. +### Procedure -Version 0 makes two changes: +First ensure there is sufficient space available for three copies of the +SQLite database: - * schema versioning, as described above. - * adding a column (`video_sync_samples`) to a database index to speed up - certain operations. + # the primary copy, which will be upgraded + # a copy you create manually as a backup so that you can restore if you + discover a problem while running the new software against the upgraded + database in read-only mode. If disk space is tight, you can save this + to a different filesystem than the primary copy. + # an internal copy made and destroyed by Moonfire NVR and SQLite during the + upgrade: + * a write-ahead log or rollback journal during earlier stages + * a complete database copy during the final vacuum step + If disk space is tight, and you are _very careful_, you can skip these + copies with the `--preset-journal=off --no-vacuum` arguments to + the updater. If you aren't confident in your ability to do this, *don't + do it*. If you are confident, take additional safety precautions anyway: + * double-check you have the full backup described above. Without the + journal any problems during the upgrade will corrupt your database + and you will need to restore. + * ensure you re-enable journalling via `pragma journal_mode = wal;` + before using the upgraded database, or any problems after the + upgrade will corrupt your database. The upgrade procedure should do + this automatically, but you will want to verify by hand that you are + no longer in the dangerous mode. -First ensure Moonfire NVR is not running; if you are using systemd with the +Next ensure Moonfire NVR is not running and does not automatically restart if +the system is rebooted during the upgrade. If you are using systemd with the service name `moonfire-nvr`, you can do this as follows: $ sudo systemctl stop moonfire-nvr + $ sudo systemctl disable moonfire-nvr The service takes a moment to shut down; wait until the following command reports that it is not running: $ sudo systemctl status moonfire-nvr -Then use `sqlite3` to manually edit the database. The default path is -`/var/lib/moonfire-nvr/db/db`; if you've specified a different `--db_dir`, -use that directory with a suffix of `/db`. +Then back up your SQLite database. If you are using the default path, you can +do so as follows: + + $ sudo -u moonfire-nvr cp /var/lib/moonfire-nvr/db/db{,.pre-upgrade} + +By default, the upgrade command will reset the SQLite `journal_mode` to +`delete` prior to the upgrade. This works around a problem with +`journal_mode = wal` in older SQLite versions, as documented in [the SQLite +manual for write-ahead logging](https://www.sqlite.org/wal.html): + +> WAL works best with smaller transactions. WAL does not work well for very +> large transactions. For transactions larger than about 100 megabytes, +> traditional rollback journal modes will likely be faster. For transactions +> in excess of a gigabyte, WAL mode may fail with an I/O or disk-full error. +> It is recommended that one of the rollback journal modes be used for +> transactions larger than a few dozen megabytes. Beginning with version +> 3.11.0 (2016-02-15), WAL mode works as efficiently with large transactions +> as does rollback mode. + +Run the upgrade procedure using the new software binary (here referred to as +`new-moonfire-nvr`; if you are installing from source, you may find it as +`target/release/moonfire-nvr`). + + $ sudo -u moonfire-nvr RUST_LOG=info new-moonfire-nvr --upgrade + +Then run the system in read-only mode to verify correct operation: + + $ sudo -u moonfire-nvr new-moonfire-nvr --read-only + +Go to the web interface and ensure the system is operating correctly. If +you detect a problem now, you can copy the old database back over the new one. +If you detect a problem after enabling read-write operation, a restore will be +more complicated. + +Then install the new software to the path expected by your systemd +configuration and start it up: + + $ sudo install -m 755 new-moonfire-nvr /usr/local/bin/moonfire-nvr + $ sudo systemctl enable moonfire-nvr + $ sudo systemctl start moonfire-nvr + +Hopefully your system is functioning correctly. If not, there are two options +for restore; neither are easy: + + * go back to your old database. There will be two classes of problems: + * If the new system deleted any recordings, the old system will + incorrectly believe they are still present. You could wait until all + existing files are rotated away, or you could try to delete them + manually from the database. + * if the new system created any recordings, the old system will not + know about them and will not delete them. Your disk may become full. + You should find some way to discover these files and manually delete + them. + +Once you're confident of correct operation, delete the unneeded backup: + + $ sudo systemctl rm /var/lib/moonfire-nvr/db/db.pre-upgrade + +### Unversioned to version 0 + +Early versions of Moonfire NVR (prior to 2016-12-20) did not include the +version information in the schema. You can manually add this information to +your schema using the `sqlite3` commandline. This process is backward +compatible, meaning that software versions that accept an unversioned database +will also accept a version 0 database. + +Version 0 makes two changes: + + * it adds schema versioning, as described above. + * it adds a column (`video_sync_samples`) to a database index to speed up + certain operations. + +There's a special procedure for this upgrade. The good news is that a backup +is unnecessary; there's no risk with this procedure. + +First ensure Moonfire NVR is not running as described in the general procedure +above. + +Then use `sqlite3` to manually edit the database. The default +path is `/var/lib/moonfire-nvr/db/db`; if you've specified a different +`--db_dir`, use that directory with a suffix of `/db`. $ sudo -u moonfire-nvr sqlite3 /var/lib/moonfire-nvr/db/db sqlite3> @@ -74,6 +173,15 @@ create index recording_cover on recording ( commit transaction; ``` -When you are done, you can restart the service: +When you are done, you can restart the service via `systemctl` and continue +using it with your existing or new version of Moonfire NVR. - $ sudo systemctl start moonfire-nvr +### Version 0 to version 1 + +Version 1 makes several changes to the recording tables and indices. These +changes allow overlapping recordings to be unambiguously listed and viewed. +They also reduce the amount of I/O; in one test of retrieving playback +indexes, the number of (mostly 1024-byte) read syscalls on the database +dropped from 605 to 39. + +The general upgrade procedure applies to this upgrade. diff --git a/src/db.rs b/src/db.rs index 0a3a2f4..408b237 100644 --- a/src/db.rs +++ b/src/db.rs @@ -49,7 +49,7 @@ //! and such to avoid database operations in these paths. //! //! * the `Transaction` interface allows callers to batch write operations to reduce latency and -//! SSD write samples. +//! SSD write cycles. use error::{Error, ResultExt}; use fnv; @@ -70,17 +70,24 @@ use time; use uuid::Uuid; /// Expected schema version. See `guide/schema.md` for more information. -pub const EXPECTED_VERSION: i32 = 0; +pub const EXPECTED_VERSION: i32 = 1; -const GET_RECORDING_SQL: &'static str = - "select sample_file_uuid, video_index from recording where id = :id"; +const GET_RECORDING_PLAYBACK_SQL: &'static str = r#" + select + sample_file_uuid, + video_index + from + recording_playback + where + composite_id = :composite_id +"#; const DELETE_RESERVATION_SQL: &'static str = "delete from reserved_sample_files where uuid = :uuid"; const INSERT_RESERVATION_SQL: &'static str = r#" insert into reserved_sample_files (uuid, state) - values (:uuid, :state); + values (:uuid, :state) "#; /// Valid values for the `state` column in the `reserved_sample_files` table. @@ -96,38 +103,50 @@ enum ReservationState { const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#" insert into video_sample_entry (sha1, width, height, data) - values (:sha1, :width, :height, :data); + values (:sha1, :width, :height, :data) "#; const INSERT_RECORDING_SQL: &'static str = r#" - insert into recording (camera_id, sample_file_bytes, start_time_90k, - duration_90k, local_time_delta_90k, video_samples, - video_sync_samples, video_sample_entry_id, - sample_file_uuid, sample_file_sha1, video_index) - values (:camera_id, :sample_file_bytes, :start_time_90k, - :duration_90k, :local_time_delta_90k, - :video_samples, :video_sync_samples, - :video_sample_entry_id, :sample_file_uuid, - :sample_file_sha1, :video_index); + insert into recording (composite_id, camera_id, run_offset, flags, sample_file_bytes, + start_time_90k, duration_90k, local_time_delta_90k, video_samples, + video_sync_samples, video_sample_entry_id) + values (:composite_id, :camera_id, :run_offset, :flags, :sample_file_bytes, + :start_time_90k, :duration_90k, :local_time_delta_90k, + :video_samples, :video_sync_samples, :video_sample_entry_id) "#; +const INSERT_RECORDING_PLAYBACK_SQL: &'static str = r#" + insert into recording_playback (composite_id, sample_file_uuid, sample_file_sha1, video_index) + values (:composite_id, :sample_file_uuid, :sample_file_sha1, + :video_index) +"#; + +const UPDATE_NEXT_RECORDING_ID_SQL: &'static str = + "update camera set next_recording_id = :next_recording_id where id = :camera_id"; + const LIST_OLDEST_SAMPLE_FILES_SQL: &'static str = r#" select - id, - sample_file_uuid, - start_time_90k, - duration_90k, - sample_file_bytes + recording.composite_id, + recording_playback.sample_file_uuid, + recording.start_time_90k, + recording.duration_90k, + recording.sample_file_bytes from recording + join recording_playback on (recording.composite_id = recording_playback.composite_id) where - camera_id = :camera_id + :start <= recording.composite_id and + recording.composite_id < :end order by - start_time_90k + recording.composite_id "#; const DELETE_RECORDING_SQL: &'static str = r#" - delete from recording where id = :recording_id; + delete from recording where composite_id = :composite_id +"#; + +const DELETE_RECORDING_PLAYBACK_SQL: &'static str = r#" + delete from recording_playback where composite_id = :composite_id "#; const CAMERA_MIN_START_SQL: &'static str = r#" @@ -137,7 +156,7 @@ const CAMERA_MIN_START_SQL: &'static str = r#" recording where camera_id = :camera_id - order by start_time_90k limit 1; + order by start_time_90k limit 1 "#; const CAMERA_MAX_START_SQL: &'static str = r#" @@ -151,6 +170,26 @@ const CAMERA_MAX_START_SQL: &'static str = r#" order by start_time_90k desc; "#; +const LIST_RECORDINGS_BY_ID_SQL: &'static str = r#" + select + recording.composite_id, + recording.run_offset, + recording.flags, + recording.start_time_90k, + recording.duration_90k, + recording.sample_file_bytes, + recording.video_samples, + recording.video_sync_samples, + recording.video_sample_entry_id + from + recording + where + :start <= composite_id and + composite_id < :end + order by + recording.composite_id +"#; + /// A concrete box derived from a ISO/IEC 14496-12 section 8.5.2 VisualSampleEntry box. Describes /// the codec, width, height, etc. #[derive(Debug)] @@ -162,42 +201,56 @@ pub struct VideoSampleEntry { pub data: Vec, } -/// A row used in `list_recordings`. +/// A row used in `list_recordings_by_time` and `list_recordings_by_id`. #[derive(Debug)] -pub struct ListCameraRecordingsRow { - pub id: i64, +pub struct ListRecordingsRow { pub start: recording::Time, + pub video_sample_entry: Arc, + + pub camera_id: i32, + pub id: i32, /// This is a recording::Duration, but a single recording's duration fits into an i32. pub duration_90k: i32, pub video_samples: i32, pub video_sync_samples: i32, pub sample_file_bytes: i32, - pub video_sample_entry: Arc, + pub run_offset: i32, + pub flags: i32, } /// A row used in `list_aggregated_recordings`. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct ListAggregatedRecordingsRow { - pub range: Range, + pub time: Range, + pub ids: Range, pub video_samples: i64, pub video_sync_samples: i64, pub sample_file_bytes: i64, pub video_sample_entry: Arc, + pub camera_id: i32, + pub flags: i32, + pub run_start_id: i32, } -/// Extra data about a recording, beyond what is returned by ListCameraRecordingsRow. -/// Retrieve with `get_recording`. +/// Select fields from the `recordings_playback` table. Retrieve with `get_recording_playback`. #[derive(Debug)] -pub struct ExtraRecording { +pub struct RecordingPlayback { pub sample_file_uuid: Uuid, pub video_index: Vec } +/// Bitmask in the `flags` field in the `recordings` table; see `schema.sql`. +pub enum RecordingFlags { + TrailingZero = 1, +} + /// A recording to pass to `insert_recording`. #[derive(Debug)] pub struct RecordingToInsert { pub camera_id: i32, + pub run_offset: i32, + pub flags: i32, pub sample_file_bytes: i32, pub time: Range, pub local_time: recording::Time, @@ -214,7 +267,7 @@ pub struct RecordingToInsert { pub struct ListOldestSampleFilesRow { pub uuid: Uuid, pub camera_id: i32, - pub recording_id: i64, + pub recording_id: i32, pub time: Range, pub sample_file_bytes: i32, } @@ -285,6 +338,8 @@ pub struct Camera { /// Mapping of calendar day (in the server's time zone) to a summary of recordings on that day. pub days: BTreeMap, + + next_recording_id: i32, } /// Adds `delta` to the day represented by `day` in the map `m`. @@ -431,8 +486,8 @@ struct State { cameras_by_id: BTreeMap, cameras_by_uuid: BTreeMap, video_sample_entries: BTreeMap>, - list_recordings_sql: String, - recording_cache: RefCell, fnv::FnvBuildHasher>>, + list_recordings_by_time_sql: String, + playback_cache: RefCell, fnv::FnvBuildHasher>>, } /// A high-level transaction. This manages the SQLite transaction and the matching modification to @@ -443,10 +498,9 @@ pub struct Transaction<'a> { tx: rusqlite::Transaction<'a>, /// True if due to an earlier error the transaction must be rolled back rather than committed. - /// Insert and delete are two-part, requiring a delete from the `reserve_sample_files` table - /// and an insert to the `recording` table (or vice versa). If the latter half fails, the - /// former should be aborted as well. We could use savepoints (nested transactions) for this, - /// but for simplicity we just require the entire transaction be rolled back. + /// Insert and delete are multi-part. If later parts fail, earlier parts should be aborted as + /// well. We could use savepoints (nested transactions) for this, but for simplicity we just + /// require the entire transaction be rolled back. must_rollback: bool, /// Normally sample file uuids must be reserved prior to a recording being inserted. @@ -470,6 +524,13 @@ struct CameraModification { /// Reset the Camera range to this value. This should be populated immediately prior to the /// commit. range: Option>, + + /// Reset the next_recording_id to the specified value. + new_next_recording_id: Option, +} + +fn composite_id(camera_id: i32, recording_id: i32) -> i64 { + (camera_id as i64) << 32 | recording_id as i64 } impl<'a> Transaction<'a> { @@ -486,21 +547,28 @@ impl<'a> Transaction<'a> { Ok(uuid) } - /// Deletes the given recordings from the `recording` table. + /// Deletes the given recordings from the `recording` and `recording_playback` tables. /// Note they are not fully removed from the database; the uuids are transferred to the /// `reserved_sample_files` table. The caller should `unlink` the files, then remove the /// reservation. pub fn delete_recordings(&mut self, rows: &[ListOldestSampleFilesRow]) -> Result<(), Error> { - let mut del = self.tx.prepare_cached(DELETE_RECORDING_SQL)?; + let mut del1 = self.tx.prepare_cached(DELETE_RECORDING_SQL)?; + let mut del2 = self.tx.prepare_cached(DELETE_RECORDING_PLAYBACK_SQL)?; let mut insert = self.tx.prepare_cached(INSERT_RESERVATION_SQL)?; self.check_must_rollback()?; self.must_rollback = true; for row in rows { - let changes = del.execute_named(&[(":recording_id", &row.recording_id)])?; + let composite_id = &composite_id(row.camera_id, row.recording_id); + let changes = del1.execute_named(&[(":composite_id", composite_id)])?; if changes != 1 { - return Err(Error::new(format!("no such recording {} (camera {}, uuid {})", - row.recording_id, row.camera_id, row.uuid))); + return Err(Error::new(format!("no such recording {}/{} (uuid {})", + row.camera_id, row.recording_id, row.uuid))); + } + let changes = del2.execute_named(&[(":composite_id", composite_id)])?; + if changes != 1 { + return Err(Error::new(format!("no such recording_playback {}/{} (uuid {})", + row.camera_id, row.recording_id, row.uuid))); } let uuid = &row.uuid.as_bytes()[..]; insert.execute_named(&[ @@ -546,9 +614,10 @@ impl<'a> Transaction<'a> { } // Unreserve the sample file uuid and insert the recording row. - if self.state.cameras_by_id.get_mut(&r.camera_id).is_none() { - return Err(Error::new(format!("no such camera id {}", r.camera_id))); - } + let cam = match self.state.cameras_by_id.get_mut(&r.camera_id) { + None => return Err(Error::new(format!("no such camera id {}", r.camera_id))), + Some(c) => c, + }; let uuid = &r.sample_file_uuid.as_bytes()[..]; { let mut stmt = self.tx.prepare_cached(DELETE_RESERVATION_SQL)?; @@ -558,11 +627,16 @@ impl<'a> Transaction<'a> { } } self.must_rollback = true; + let mut m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, r.camera_id); { + let recording_id = m.new_next_recording_id.unwrap_or(cam.next_recording_id); + let composite_id = composite_id(r.camera_id, recording_id); let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_SQL)?; - let sha1 = &r.sample_file_sha1[..]; stmt.execute_named(&[ + (":composite_id", &composite_id), (":camera_id", &(r.camera_id as i64)), + (":run_offset", &r.run_offset), + (":flags", &r.flags), (":sample_file_bytes", &r.sample_file_bytes), (":start_time_90k", &r.time.start.0), (":duration_90k", &(r.time.end.0 - r.time.start.0)), @@ -570,13 +644,23 @@ impl<'a> Transaction<'a> { (":video_samples", &r.video_samples), (":video_sync_samples", &r.video_sync_samples), (":video_sample_entry_id", &r.video_sample_entry_id), + ])?; + m.new_next_recording_id = Some(recording_id + 1); + let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_PLAYBACK_SQL)?; + let sha1 = &r.sample_file_sha1[..]; + stmt.execute_named(&[ + (":composite_id", &composite_id), (":sample_file_uuid", &uuid), (":sample_file_sha1", &sha1), (":video_index", &r.video_index), ])?; + let mut stmt = self.tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?; + stmt.execute_named(&[ + (":camera_id", &(r.camera_id as i64)), + (":next_recording_id", &m.new_next_recording_id), + ])?; } self.must_rollback = false; - let mut m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, r.camera_id); m.duration += r.time.end - r.time.start; m.sample_file_bytes += r.sample_file_bytes as i64; adjust_days(r.time.clone(), 1, &mut m.days); @@ -597,6 +681,9 @@ impl<'a> Transaction<'a> { adjust_day(*k, *v, &mut camera.days); } camera.range = m.range.clone(); + if let Some(id) = m.new_next_recording_id { + camera.next_recording_id = id; + } } Ok(()) } @@ -618,6 +705,7 @@ impl<'a> Transaction<'a> { sample_file_bytes: 0, range: None, days: BTreeMap::new(), + new_next_recording_id: None, } }) } @@ -697,32 +785,53 @@ impl LockedDatabase { /// Lists the specified recordings in ascending order, passing them to a supplied function. /// Given that the function is called with the database lock held, it should be quick. - pub fn list_recordings(&self, camera_id: i32, desired_time: &Range, - mut f: F) -> Result<(), Error> - where F: FnMut(ListCameraRecordingsRow) -> Result<(), Error> { - let mut stmt = self.conn.prepare_cached(&self.state.list_recordings_sql)?; - let mut rows = stmt.query_named(&[ + pub fn list_recordings_by_time(&self, camera_id: i32, desired_time: Range, + f: F) -> Result<(), Error> + where F: FnMut(ListRecordingsRow) -> Result<(), Error> { + let mut stmt = self.conn.prepare_cached(&self.state.list_recordings_by_time_sql)?; + let rows = stmt.query_named(&[ (":camera_id", &camera_id), (":start_time_90k", &desired_time.start.0), (":end_time_90k", &desired_time.end.0)])?; + self.list_recordings_inner(camera_id, rows, f) + } + + pub fn list_recordings_by_id(&self, camera_id: i32, desired_ids: Range, f: F) + -> Result<(), Error> + where F: FnMut(ListRecordingsRow) -> Result<(), Error> { + let mut stmt = self.conn.prepare_cached(LIST_RECORDINGS_BY_ID_SQL)?; + let rows = stmt.query_named(&[ + (":start", &composite_id(camera_id, desired_ids.start)), + (":end", &composite_id(camera_id, desired_ids.end)), + ])?; + self.list_recordings_inner(camera_id, rows, f) + } + + fn list_recordings_inner(&self, camera_id: i32, mut rows: rusqlite::Rows, mut f: F) + -> Result<(), Error> + where F: FnMut(ListRecordingsRow) -> Result<(), Error> { while let Some(row) = rows.next() { let row = row?; - let id = row.get_checked(0)?; - let vse_id = row.get_checked(6)?; + let id = row.get_checked::<_, i64>(0)? as i32; // drop top bits of composite_id. + let vse_id = row.get_checked(8)?; let video_sample_entry = match self.state.video_sample_entries.get(&vse_id) { Some(v) => v, None => { return Err(Error::new(format!( - "recording {} references nonexistent video_sample_entry {}", id, vse_id))); + "recording {}/{} references nonexistent video_sample_entry {}", + camera_id, id, vse_id))); }, }; - let out = ListCameraRecordingsRow{ + let out = ListRecordingsRow{ + camera_id: camera_id, id: id, - start: recording::Time(row.get_checked(1)?), - duration_90k: row.get_checked(2)?, - sample_file_bytes: row.get_checked(3)?, - video_samples: row.get_checked(4)?, - video_sync_samples: row.get_checked(5)?, + run_offset: row.get_checked(1)?, + flags: row.get_checked(2)?, + start: recording::Time(row.get_checked(3)?), + duration_90k: row.get_checked(4)?, + sample_file_bytes: row.get_checked(5)?, + video_samples: row.get_checked(6)?, + video_sync_samples: row.get_checked(7)?, video_sample_entry: video_sample_entry.clone(), }; f(out)?; @@ -730,73 +839,101 @@ impl LockedDatabase { Ok(()) } - /// Convenience method which calls `list_recordings` and aggregates consecutive recordings. + /// Calls `list_recordings_by_time` and aggregates consecutive recordings. + /// Rows are given to the callback in arbitrary order. Callers which care about ordering + /// should do their own sorting. pub fn list_aggregated_recordings(&self, camera_id: i32, - desired_time: &Range, + desired_time: Range, forced_split: recording::Duration, mut f: F) -> Result<(), Error> - where F: FnMut(ListAggregatedRecordingsRow) -> Result<(), Error> { - let mut agg: Option = None; - self.list_recordings(camera_id, desired_time, |row| { - let needs_flush = if let Some(ref a) = agg { - let new_dur = a.range.end - a.range.start + + where F: FnMut(&ListAggregatedRecordingsRow) -> Result<(), Error> { + // Iterate, maintaining a map from a recording_id to the aggregated row for the latest + // batch of recordings from the run starting at that id. Runs can be split into multiple + // batches for a few reasons: + // + // * forced split (when exceeding a duration limit) + // * a missing id (one that was deleted out of order) + // * video_sample_entry mismatch (if the parameters changed during a RTSP session) + // + // This iteration works because in a run, the start_time+duration of recording id r + // is equal to the start_time of recording id r+1. Thus ascending times guarantees + // ascending ids within a run. (Different runs, however, can be arbitrarily interleaved if + // their timestamps overlap. Tracking all active runs prevents that interleaving from + // causing problems.) + let mut aggs: BTreeMap = BTreeMap::new(); + self.list_recordings_by_time(camera_id, desired_time, |row| { + let run_start_id = row.id - row.run_offset; + let needs_flush = if let Some(a) = aggs.get(&run_start_id) { + let new_dur = a.time.end - a.time.start + recording::Duration(row.duration_90k as i64); - a.range.end != row.start || - row.video_sample_entry.id != a.video_sample_entry.id || new_dur >= forced_split + a.ids.end != row.id || row.video_sample_entry.id != a.video_sample_entry.id || + new_dur >= forced_split } else { false }; if needs_flush { - let a = agg.take().expect("needs_flush when agg is none"); - f(a)?; + let a = aggs.remove(&run_start_id).expect("needs_flush when agg is None"); + f(&a)?; } - match agg { - None => { - agg = Some(ListAggregatedRecordingsRow{ - range: row.start .. recording::Time(row.start.0 + row.duration_90k as i64), + let need_insert = if let Some(ref mut a) = aggs.get_mut(&run_start_id) { + if a.time.end != row.start { + return Err(Error::new(format!( + "camera {} recording {} ends at {}; {} starts at {}; expected same", + camera_id, a.ids.end - 1, a.time.end, row.id, row.start))); + } + a.time.end.0 += row.duration_90k as i64; + a.ids.end = row.id + 1; + a.video_samples += row.video_samples as i64; + a.video_sync_samples += row.video_sync_samples as i64; + a.sample_file_bytes += row.sample_file_bytes as i64; + false + } else { + true + }; + if need_insert { + aggs.insert(run_start_id, ListAggregatedRecordingsRow{ + time: row.start .. recording::Time(row.start.0 + row.duration_90k as i64), + ids: row.id .. row.id+1, video_samples: row.video_samples as i64, video_sync_samples: row.video_sync_samples as i64, sample_file_bytes: row.sample_file_bytes as i64, video_sample_entry: row.video_sample_entry, - }); - }, - Some(ref mut a) => { - a.range.end.0 += row.duration_90k as i64; - a.video_samples += row.video_samples as i64; - a.video_sync_samples += row.video_sync_samples as i64; - a.sample_file_bytes += row.sample_file_bytes as i64; - } + camera_id: camera_id, + run_start_id: row.id - row.run_offset, + flags: row.flags, + }); }; Ok(()) })?; - if let Some(a) = agg { + for a in aggs.values() { f(a)?; } Ok(()) } - /// Gets extra data about a single recording. + /// Gets a single `recording_playback` row. /// This uses a LRU cache to reduce the number of retrievals from the database. - pub fn get_recording(&self, recording_id: i64) - -> Result, Error> { - let mut cache = self.state.recording_cache.borrow_mut(); - if let Some(r) = cache.get_mut(&recording_id) { - debug!("cache hit for recording {}", recording_id); + pub fn get_recording_playback(&self, camera_id: i32, recording_id: i32) + -> Result, Error> { + let composite_id = composite_id(camera_id, recording_id); + let mut cache = self.state.playback_cache.borrow_mut(); + if let Some(r) = cache.get_mut(&composite_id) { + trace!("cache hit for recording {}/{}", camera_id, recording_id); return Ok(r.clone()); } - debug!("cache miss for recording {}", recording_id); - let mut stmt = self.conn.prepare_cached(GET_RECORDING_SQL)?; - let mut rows = stmt.query_named(&[(":id", &recording_id)])?; + trace!("cache miss for recording {}/{}", camera_id, recording_id); + let mut stmt = self.conn.prepare_cached(GET_RECORDING_PLAYBACK_SQL)?; + let mut rows = stmt.query_named(&[(":composite_id", &composite_id)])?; if let Some(row) = rows.next() { let row = row?; - let r = Arc::new(ExtraRecording{ + let r = Arc::new(RecordingPlayback{ sample_file_uuid: get_uuid(&row, 0)?, video_index: row.get_checked(1)?, }); - cache.insert(recording_id, r.clone()); + cache.insert(composite_id, r.clone()); return Ok(r); } - Err(Error::new(format!("no such recording {}", recording_id))) + Err(Error::new(format!("no such recording {}/{}", camera_id, recording_id))) } /// Lists all reserved sample files. @@ -816,15 +953,19 @@ impl LockedDatabase { pub fn list_oldest_sample_files(&self, camera_id: i32, mut f: F) -> Result<(), Error> where F: FnMut(ListOldestSampleFilesRow) -> bool { let mut stmt = self.conn.prepare_cached(LIST_OLDEST_SAMPLE_FILES_SQL)?; - let mut rows = stmt.query_named(&[(":camera_id", &(camera_id as i64))])?; + let mut rows = stmt.query_named(&[ + (":start", &composite_id(camera_id, 0)), + (":end", &composite_id(camera_id + 1, 0)), + ])?; while let Some(row) = rows.next() { let row = row?; let start = recording::Time(row.get_checked(2)?); let duration = recording::Duration(row.get_checked(3)?); + let composite_id: i64 = row.get_checked(0)?; let should_continue = f(ListOldestSampleFilesRow{ - recording_id: row.get_checked(0)?, + recording_id: composite_id as i32, + camera_id: (composite_id >> 32) as i32, uuid: get_uuid(&row, 1)?, - camera_id: camera_id, time: start .. start + duration, sample_file_bytes: row.get_checked(4)?, }); @@ -888,7 +1029,8 @@ impl LockedDatabase { camera.password, camera.main_rtsp_path, camera.sub_rtsp_path, - camera.retain_bytes + camera.retain_bytes, + next_recording_id from camera; "#)?; @@ -912,6 +1054,7 @@ impl LockedDatabase { sample_file_bytes: 0, duration: recording::Duration(0), days: BTreeMap::new(), + next_recording_id: row.get_checked(10)?, }); self.state.cameras_by_uuid.insert(uuid, id); } @@ -970,9 +1113,11 @@ pub struct Database(Mutex); impl Database { /// Creates the database from a caller-supplied SQLite connection. pub fn new(conn: rusqlite::Connection) -> Result { - let list_recordings_sql = format!(r#" + let list_recordings_by_time_sql = format!(r#" select - recording.id, + recording.composite_id, + recording.run_offset, + recording.flags, recording.start_time_90k, recording.duration_90k, recording.sample_file_bytes, @@ -987,7 +1132,7 @@ impl Database { recording.start_time_90k < :end_time_90k and recording.start_time_90k + recording.duration_90k > :start_time_90k order by - recording.start_time_90k + recording.composite_id "#, recording::MAX_RECORDING_DURATION); { use std::error::Error as E; @@ -999,7 +1144,7 @@ impl Database { \ If you are starting from an \ empty database, see README.md to complete the \ - installation. If you are starting from + installation. If you are starting from complete the schema. If you are starting from a database \ that predates schema versioning, see guide/schema.md." .to_owned())); @@ -1025,8 +1170,8 @@ impl Database { cameras_by_id: BTreeMap::new(), cameras_by_uuid: BTreeMap::new(), video_sample_entries: BTreeMap::new(), - recording_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())), - list_recordings_sql: list_recordings_sql, + playback_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())), + list_recordings_by_time_sql: list_recordings_by_time_sql, }, })); { @@ -1078,9 +1223,9 @@ mod tests { let uuid_bytes = &uuid.as_bytes()[..]; conn.execute_named(r#" insert into camera (uuid, short_name, description, host, username, password, - main_rtsp_path, sub_rtsp_path, retain_bytes) + main_rtsp_path, sub_rtsp_path, retain_bytes, next_recording_id) values (:uuid, :short_name, :description, :host, :username, :password, - :main_rtsp_path, :sub_rtsp_path, :retain_bytes) + :main_rtsp_path, :sub_rtsp_path, :retain_bytes, :next_recording_id) "#, &[ (":uuid", &uuid_bytes), (":short_name", &short_name), @@ -1091,6 +1236,7 @@ mod tests { (":main_rtsp_path", &"/main"), (":sub_rtsp_path", &"/sub"), (":retain_bytes", &42i64), + (":next_recording_id", &0i64), ]).unwrap(); conn.last_insert_rowid() as i32 } @@ -1121,7 +1267,7 @@ mod tests { { let db = db.lock(); let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); - db.list_recordings(camera_id, &all_time, |_row| { + db.list_recordings_by_time(camera_id, all_time, |_row| { rows += 1; Ok(()) }).unwrap(); @@ -1152,7 +1298,7 @@ mod tests { { let db = db.lock(); let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); - db.list_recordings(camera_id, &all_time, |row| { + db.list_recordings_by_time(camera_id, all_time, |row| { rows += 1; recording_id = row.id; assert_eq!(r.time, @@ -1176,7 +1322,8 @@ mod tests { }).unwrap(); assert_eq!(1, rows); - // TODO: get_recording. + // TODO: list_aggregated_recordings. + // TODO: get_recording_playback. } fn assert_unsorted_eq(mut a: Vec, mut b: Vec) @@ -1251,10 +1398,10 @@ mod tests { fn test_version_too_old() { testutil::init(); let c = setup_conn(); - c.execute_batch("delete from version; insert into version values (-1, 0, '');").unwrap(); + c.execute_batch("delete from version; insert into version values (0, 0, '');").unwrap(); let e = Database::new(c).unwrap_err(); assert!(e.description().starts_with( - "Database schema version -1 is too old (expected 0)"), "got: {:?}", + "Database schema version 0 is too old (expected 1)"), "got: {:?}", e.description()); } @@ -1262,10 +1409,10 @@ mod tests { fn test_version_too_new() { testutil::init(); let c = setup_conn(); - c.execute_batch("delete from version; insert into version values (1, 0, '');").unwrap(); + c.execute_batch("delete from version; insert into version values (2, 0, '');").unwrap(); let e = Database::new(c).unwrap_err(); assert!(e.description().starts_with( - "Database schema version 1 is too new (expected 0)"), "got: {:?}", e.description()); + "Database schema version 2 is too new (expected 1)"), "got: {:?}", e.description()); } /// Basic test of running some queries on a fresh database. @@ -1310,6 +1457,8 @@ mod tests { let recording = RecordingToInsert{ camera_id: camera_id, sample_file_bytes: 42, + run_offset: 0, + flags: 0, time: start .. start + recording::Duration(TIME_UNITS_PER_SEC), local_time: start, video_samples: 1, diff --git a/src/dir.rs b/src/dir.rs index e5bc9c5..291714a 100644 --- a/src/dir.rs +++ b/src/dir.rs @@ -33,9 +33,9 @@ //! This includes opening files for serving, rotating away old files, and saving new files. use db; +use error::Error; use libc; use recording; -use error::Error; use openssl::crypto::hash; use std::ffi; use std::fs; @@ -121,7 +121,7 @@ impl SampleFileDir { /// directory has sufficient space for a couple recordings per camera in addition to the /// cameras' total `retain_bytes`. pub fn create_writer<'a>(&self, channel: &'a SyncerChannel, start: recording::Time, - local_start: recording::Time, camera_id: i32, + local_start: recording::Time, run_offset: i32, camera_id: i32, video_sample_entry_id: i32) -> Result, Error> { // Grab the next uuid. Typically one is cached—a sync has usually completed since the last // writer was created, and syncs ensure `next_uuid` is filled while performing their @@ -145,7 +145,8 @@ impl SampleFileDir { return Err(e.into()); }, }; - Writer::open(f, uuid, start, local_start, camera_id, video_sample_entry_id, channel) + Writer::open(f, uuid, start, local_start, run_offset, camera_id, video_sample_entry_id, + channel) } /// Opens a sample file within this directory with the given flags and (if creating) mode. @@ -428,6 +429,7 @@ struct InnerWriter<'a> { local_time: recording::Time, camera_id: i32, video_sample_entry_id: i32, + run_offset: i32, /// A sample which has been written to disk but not added to `index`. Index writes are one /// sample behind disk writes because the duration of a sample is the difference between its @@ -446,8 +448,8 @@ struct UnflushedSample { impl<'a> Writer<'a> { /// Opens the writer; for use by `SampleFileDir` (which should supply `f`). fn open(f: fs::File, uuid: Uuid, start_time: recording::Time, local_time: recording::Time, - camera_id: i32, video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel) - -> Result { + run_offset: i32, camera_id: i32, video_sample_entry_id: i32, + syncer_channel: &'a SyncerChannel) -> Result { Ok(Writer(Some(InnerWriter{ syncer_channel: syncer_channel, f: f, @@ -459,6 +461,7 @@ impl<'a> Writer<'a> { local_time: local_time, camera_id: camera_id, video_sample_entry_id: video_sample_entry_id, + run_offset: run_offset, unflushed_sample: None, }))) } @@ -530,6 +533,8 @@ impl<'a> InnerWriter<'a> { sample_file_uuid: self.uuid, video_index: self.index.video_index, sample_file_sha1: sha1_bytes, + run_offset: self.run_offset, + flags: 0, // TODO }; self.syncer_channel.async_save_recording(recording, self.f); Ok(end) diff --git a/src/main.rs b/src/main.rs index d927109..f6b44fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -81,6 +81,7 @@ mod stream; mod streamer; mod strutil; #[cfg(test)] mod testutil; +mod upgrade; mod web; /// Commandline usage string. This is in the particular format expected by the `docopt` crate. @@ -88,20 +89,30 @@ mod web; /// allowed commandline arguments and their defaults. const USAGE: &'static str = " Usage: moonfire-nvr [options] + moonfire-nvr --upgrade [options] moonfire-nvr (--help | --version) Options: -h, --help Show this message. --version Show the version of moonfire-nvr. - --db-dir DIR Set the directory holding the SQLite3 index database. + --db-dir=DIR Set the directory holding the SQLite3 index database. This is typically on a flash device. [default: /var/lib/moonfire-nvr/db] - --sample-file-dir DIR Set the directory holding video data. + --sample-file-dir=DIR Set the directory holding video data. This is typically on a hard drive. [default: /var/lib/moonfire-nvr/sample] - --http-addr ADDR Set the bind address for the unencrypted HTTP server. + --http-addr=ADDR Set the bind address for the unencrypted HTTP server. [default: 0.0.0.0:8080] --read-only Forces read-only mode / disables recording. + --preset-journal=MODE With --upgrade, resets the SQLite journal_mode to + the specified mode prior to the upgrade. The default, + delete, is recommended. off is very dangerous but + may be desirable in some circumstances. See + guide/schema.md for more information. The journal + mode will be reset to wal after the upgrade. + [default: delete] + --no-vacuum With --upgrade, skips the normal post-upgrade vacuum + operation. "; /// Commandline arguments corresponding to `USAGE`; automatically filled by the `docopt` crate. @@ -111,9 +122,18 @@ struct Args { flag_sample_file_dir: String, flag_http_addr: String, flag_read_only: bool, + flag_upgrade: bool, + flag_no_vacuum: bool, + flag_preset_journal: String, } fn main() { + // Parse commandline arguments. + let version = "Moonfire NVR 0.1.0".to_owned(); + let args: Args = docopt::Docopt::new(USAGE) + .and_then(|d| d.version(Some(version)).decode()) + .unwrap_or_else(|e| e.exit()); + // Watch for termination signals. // This must be started before any threads are spawned (such as the async logger thread) so // that signals will be blocked in all threads. @@ -124,12 +144,6 @@ fn main() { let drain = slog_envlogger::new(drain); slog_stdlog::set_logger(slog::Logger::root(drain.ignore_err(), None)).unwrap(); - // Parse commandline arguments. - let version = "Moonfire NVR 0.1.0".to_owned(); - let args: Args = docopt::Docopt::new(USAGE) - .and_then(|d| d.version(Some(version)).decode()) - .unwrap_or_else(|e| e.exit()); - // Open the database and populate cached state. let db_dir = dir::Fd::open(&args.flag_db_dir).unwrap(); db_dir.lock(if args.flag_read_only { libc::LOCK_SH } else { libc::LOCK_EX } | libc::LOCK_NB) @@ -144,6 +158,15 @@ fn main() { // rusqlite::Connection is not Sync, so there's no reason to tell SQLite3 to use the // serialized threading mode. rusqlite::SQLITE_OPEN_NO_MUTEX).unwrap(); + + if args.flag_upgrade { + upgrade::run(conn, &args.flag_preset_journal, args.flag_no_vacuum).unwrap(); + } else { + run(args, conn, &signal); + } +} + +fn run(args: Args, conn: rusqlite::Connection, signal: &chan::Receiver) { let db = Arc::new(db::Database::new(conn).unwrap()); let dir = dir::SampleFileDir::new(&args.flag_sample_file_dir, db.clone()).unwrap(); info!("Database is loaded."); diff --git a/src/mp4.rs b/src/mp4.rs index fd6f79d..344efa4 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -543,11 +543,16 @@ impl Mp4FileBuilder { self.segments.reserve(additional); } - pub fn len(&self) -> usize { self.segments.len() } - /// Appends a segment for (a subset of) the given recording. - pub fn append(&mut self, db: &MutexGuard, row: db::ListCameraRecordingsRow, + pub fn append(&mut self, db: &MutexGuard, row: db::ListRecordingsRow, rel_range_90k: Range) -> Result<()> { + if let Some(prev) = self.segments.last() { + if prev.s.have_trailing_zero { + return Err(Error::new(format!( + "unable to append recording {}/{} after recording {}/{} with trailing zero", + row.camera_id, row.id, prev.s.camera_id, prev.s.recording_id))); + } + } self.segments.push(Mp4Segment{ s: recording::Segment::new(db, &row, rel_range_90k)?, index: RefCell::new(None), @@ -591,7 +596,8 @@ impl Mp4FileBuilder { // Update the etag to reflect this segment. let mut data = [0_u8; 24]; let mut cursor = io::Cursor::new(&mut data[..]); - cursor.write_i64::(s.s.id)?; + cursor.write_i32::(s.s.camera_id)?; + cursor.write_i32::(s.s.recording_id)?; cursor.write_i64::(s.s.start.0)?; cursor.write_i32::(d.start)?; cursor.write_i32::(d.end)?; @@ -1129,7 +1135,8 @@ impl Mp4File { fn write_video_sample_data(&self, i: usize, r: Range, out: &mut io::Write) -> Result<()> { let s = &self.segments[i]; - let f = self.dir.open_sample_file(self.db.lock().get_recording(s.s.id)?.sample_file_uuid)?; + let rec = self.db.lock().get_recording_playback(s.s.camera_id, s.s.recording_id)?; + let f = self.dir.open_sample_file(rec.sample_file_uuid)?; mmapfile::MmapFileSlice::new(f, s.s.sample_file_range()).write_to(r, out) } @@ -1180,8 +1187,8 @@ mod tests { use byteorder::{BigEndian, ByteOrder}; use db; use dir; - use ffmpeg; use error::Error; + use ffmpeg; #[cfg(nightly)] use hyper; use hyper::header; use openssl::crypto::hash; @@ -1217,10 +1224,10 @@ mod tests { fn flush(&mut self) -> io::Result<()> { Ok(()) } } - /// Returns the SHA-1 digest of the given `Resource`. - fn digest(r: &http_entity::Entity) -> Vec { + /// Returns the SHA-1 digest of the given `Entity`. + fn digest(e: &http_entity::Entity) -> Vec { let mut sha1 = Sha1::new(); - r.write_to(0 .. r.len(), &mut sha1).unwrap(); + e.write_to(0 .. e.len(), &mut sha1).unwrap(); sha1.finish() } @@ -1401,7 +1408,7 @@ mod tests { let extra_data = input.get_extra_data().unwrap(); let video_sample_entry_id = db.db.lock().insert_video_sample_entry( extra_data.width, extra_data.height, &extra_data.sample_entry).unwrap(); - let mut output = db.dir.create_writer(&db.syncer_channel, START_TIME, START_TIME, + let mut output = db.dir.create_writer(&db.syncer_channel, START_TIME, START_TIME, 0, TEST_CAMERA_ID, video_sample_entry_id).unwrap(); // end_pts is the pts of the end of the most recent frame (start + duration). @@ -1435,6 +1442,7 @@ mod tests { let mut recording = db::RecordingToInsert{ camera_id: TEST_CAMERA_ID, sample_file_bytes: 30104460, + flags: 0, time: START_TIME .. (START_TIME + DURATION), local_time: START_TIME, video_samples: 1800, @@ -1443,6 +1451,7 @@ mod tests { sample_file_uuid: Uuid::nil(), video_index: data, sample_file_sha1: [0; 20], + run_index: 0, }; let mut tx = db.tx().unwrap(); tx.bypass_reservation_for_testing = true; @@ -1451,6 +1460,7 @@ mod tests { recording.time.start += DURATION; recording.local_time += DURATION; recording.time.end += DURATION; + recording.run_index += 1; } tx.commit().unwrap(); } @@ -1462,7 +1472,7 @@ mod tests { let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); { let db = db.lock(); - db.list_recordings(TEST_CAMERA_ID, &all_time, |r| { + db.list_recordings_by_time(TEST_CAMERA_ID, all_time, |r| { let d = r.duration_90k; assert!(skip_90k + shorten_90k < d); builder.append(&db, r, skip_90k .. d - shorten_90k).unwrap(); @@ -1658,7 +1668,7 @@ mod tests { // combine ranges from the new format with ranges from the old format. let sha1 = digest(&mp4); assert_eq!("1e5331e8371bd97ac3158b3a86494abc87cdc70e", strutil::hex(&sha1[..])); - const EXPECTED_ETAG: &'static str = "3c48af4dbce2024db07f27a00789b6af774a8c89"; + const EXPECTED_ETAG: &'static str = "908ae8ac303f66f2f4a1f8f52dba8f6ea9fdb442"; assert_eq!(Some(&header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); drop(db.syncer_channel); db.syncer_join.join().unwrap(); @@ -1678,7 +1688,7 @@ mod tests { // combine ranges from the new format with ranges from the old format. let sha1 = digest(&mp4); assert_eq!("de382684a471f178e4e3a163762711b0653bfd83", strutil::hex(&sha1[..])); - const EXPECTED_ETAG: &'static str = "c24d7af372e5d8f66f4feb6e3a5cd43828392371"; + const EXPECTED_ETAG: &'static str = "e21c6a6dfede1081db3701cc595ec267c43c2bff"; assert_eq!(Some(&header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); drop(db.syncer_channel); db.syncer_join.join().unwrap(); @@ -1698,7 +1708,7 @@ mod tests { // combine ranges from the new format with ranges from the old format. let sha1 = digest(&mp4); assert_eq!("685e026af44204bc9cc52115c5e17058e9fb7c70", strutil::hex(&sha1[..])); - const EXPECTED_ETAG: &'static str = "870e2b3cfef4a988951344b32e53af0d4496894d"; + const EXPECTED_ETAG: &'static str = "1d5c5980f6ba08a4dd52dfd785667d42cdb16992"; assert_eq!(Some(&header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); drop(db.syncer_channel); db.syncer_join.join().unwrap(); @@ -1718,7 +1728,7 @@ mod tests { // combine ranges from the new format with ranges from the old format. let sha1 = digest(&mp4); assert_eq!("e0d28ddf08e24575a82657b1ce0b2da73f32fd88", strutil::hex(&sha1[..])); - const EXPECTED_ETAG: &'static str = "71c329188a2cd175c8d61492a9789e242af06c05"; + const EXPECTED_ETAG: &'static str = "555de64b39615e1a1cbe5bdd565ff197f5f126c5"; assert_eq!(Some(&header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); drop(db.syncer_channel); db.syncer_join.join().unwrap(); diff --git a/src/recording.rs b/src/recording.rs index 329b4f6..0be4a31 100644 --- a/src/recording.rs +++ b/src/recording.rs @@ -340,7 +340,8 @@ impl SampleIndexEncoder { /// A segment represents a view of some or all of a single recording, starting from a key frame. /// Used by the `Mp4FileBuilder` class to splice together recordings into a single virtual .mp4. pub struct Segment { - pub id: i64, + pub camera_id: i32, + pub recording_id: i32, pub start: Time, begin: SampleIndexIterator, pub file_end: i32, @@ -349,6 +350,7 @@ pub struct Segment { pub frames: i32, pub key_frames: i32, pub video_sample_entry_id: i32, + pub have_trailing_zero: bool, } impl Segment { @@ -360,10 +362,11 @@ impl Segment { /// undesired portion.) It will end at the first frame after the desired range (unless the /// desired range extends beyond the recording). pub fn new(db: &MutexGuard, - recording: &db::ListCameraRecordingsRow, + recording: &db::ListRecordingsRow, desired_range_90k: Range) -> Result { let mut self_ = Segment{ - id: recording.id, + camera_id: recording.camera_id, + recording_id: recording.id, start: recording.start, begin: SampleIndexIterator::new(), file_end: recording.sample_file_bytes, @@ -372,6 +375,7 @@ impl Segment { frames: recording.video_samples, key_frames: recording.video_sync_samples, video_sample_entry_id: recording.video_sample_entry.id, + have_trailing_zero: (recording.flags & db::RecordingFlags::TrailingZero as i32) != 0, }; if self_.desired_range_90k.start > self_.desired_range_90k.end || @@ -388,8 +392,8 @@ impl Segment { } // Slow path. Need to iterate through the index. - let extra = db.get_recording(self_.id)?; - let data = &(&extra).video_index; + let playback = db.get_recording_playback(self_.camera_id, self_.recording_id)?; + let data = &(&playback).video_index; let mut it = SampleIndexIterator::new(); if !it.next(data)? { return Err(Error{description: String::from("no index"), @@ -429,6 +433,7 @@ impl Segment { } self_.file_end = it.pos; self_.actual_end_90k = it.start_90k; + self_.have_trailing_zero = it.duration_90k == 0; Ok(self_) } @@ -443,38 +448,44 @@ impl Segment { pub fn foreach(&self, db: &db::Database, mut f: F) -> Result<(), Error> where F: FnMut(&SampleIndexIterator) -> Result<(), Error> { - let extra = db.lock().get_recording(self.id)?; - let data = &(&extra).video_index; + trace!("foreach on recording {}/{}: {} frames, actual_time_90k: {:?}", + self.camera_id, self.recording_id, self.frames, self.actual_time_90k()); + let playback = db.lock().get_recording_playback(self.camera_id, self.recording_id)?; + let data = &(&playback).video_index; let mut it = self.begin; if it.i == 0 { if !it.next(data)? { - return Err(Error::new(format!("recording {}: no frames", self.id))); + return Err(Error::new(format!("recording {}/{}: no frames", + self.camera_id, self.recording_id))); } if !it.is_key { - return Err(Error::new(format!("recording {}: doesn't start with key frame", - self.id))); + return Err(Error::new(format!("recording {}/{}: doesn't start with key frame", + self.camera_id, self.recording_id))); } } let mut have_frame = true; let mut key_frame = 0; for i in 0 .. self.frames { if !have_frame { - return Err(Error::new(format!("recording {}: expected {} frames, found only {}", - self.id, self.frames, i+1))); + return Err(Error::new(format!("recording {}/{}: expected {} frames, found only {}", + self.camera_id, self.recording_id, self.frames, + i+1))); } if it.is_key { key_frame += 1; if key_frame > self.key_frames { - return Err(Error::new(format!("recording {}: more than expected {} key frames", - self.id, self.key_frames))); + return Err(Error::new(format!( + "recording {}/{}: more than expected {} key frames", + self.camera_id, self.recording_id, self.key_frames))); } } f(&it)?; have_frame = it.next(data)?; } if key_frame < self.key_frames { - return Err(Error::new(format!("recording {}: expected {} key frames, found only {}", - self.id, self.key_frames, key_frame))); + return Err(Error::new(format!("recording {}/{}: expected {} key frames, found only {}", + self.camera_id, self.recording_id, self.key_frames, + key_frame))); } Ok(()) } diff --git a/src/schema.sql b/src/schema.sql index 6c223e5..75e8ca2 100644 --- a/src/schema.sql +++ b/src/schema.sql @@ -31,8 +31,6 @@ -- schema.sql: SQLite3 database schema for Moonfire NVR. -- See also design/schema.md. ---pragma journal_mode = wal; - -- This table tracks the schema version. -- There is one row for the initial database creation (inserted below, after the -- create statements) and one for each upgrade procedure (if any). @@ -49,10 +47,10 @@ create table version ( create table camera ( id integer primary key, - uuid blob unique,-- not null check (length(uuid) = 16), + uuid blob unique not null check (length(uuid) = 16), -- A short name of the camera, used in log messages. - short_name text,-- not null, + short_name text not null, -- A short description of the camera. description text, @@ -77,14 +75,42 @@ create table camera ( -- The number of bytes of video to retain, excluding the currently-recording -- file. Older files will be deleted as necessary to stay within this limit. - retain_bytes integer not null check (retain_bytes >= 0) + retain_bytes integer not null check (retain_bytes >= 0), + + -- The low 32 bits of the next recording id to assign for this camera. + -- Typically this is the maximum current recording + 1, but it does + -- not decrease if that recording is deleted. + next_recording_id integer not null check (next_recording_id >= 0) ); -- Each row represents a single completed recorded segment of video. -- Recordings are typically ~60 seconds; never more than 5 minutes. create table recording ( - id integer primary key, - camera_id integer references camera (id) not null, + -- The high 32 bits of composite_id are taken from the camera's id, which + -- improves locality. The low 32 bits are taken from the camera's + -- next_recording_id (which should be post-incremented in the same + -- transaction). It'd be simpler to use a "without rowid" table and separate + -- fields to make up the primary key, but + -- points out that "without rowid" + -- is not appropriate when the average row size is in excess of 50 bytes. + -- These rows are typically 1--5 KiB. + composite_id integer primary key, + + -- This field is redundant with id above, but used to enforce the reference + -- constraint and to structure the recording_start_time index. + camera_id integer not null references camera (id), + + -- The offset of this recording within a run. 0 means this was the first + -- recording made from a RTSP session. The start of the run has id + -- (id-run_offset). + run_offset integer not null, + + -- flags is a bitmask: + -- + -- * 1, or "trailing zero", indicates that this recording is the last in a + -- stream. As the duration of a sample is not known until the next sample + -- is received, the final sample in this recording will have duration 0. + flags integer not null, sample_file_bytes integer not null check (sample_file_bytes > 0), @@ -100,18 +126,16 @@ create table recording ( -- The number of 90 kHz units the local system time is ahead of the -- recording; negative numbers indicate the local system time is behind - -- the recording. Large values would indicate that the local time has jumped - -- during recording or that the local time and camera time frequencies do - -- not match. + -- the recording. Large absolute values would indicate that the local time + -- has jumped during recording or that the local time and camera time + -- frequencies do not match. local_time_delta_90k integer not null, video_samples integer not null check (video_samples > 0), video_sync_samples integer not null check (video_samples > 0), video_sample_entry_id integer references video_sample_entry (id), - sample_file_uuid blob not null check (length(sample_file_uuid) = 16), - sample_file_sha1 blob not null check (length(sample_file_sha1) = 20), - video_index blob not null check (length(video_index) > 0) + check (composite_id >> 32 = camera_id) ); create index recording_cover on recording ( @@ -124,11 +148,17 @@ create index recording_cover on recording ( -- to consult the underlying row. duration_90k, video_samples, - video_sync_samples, video_sample_entry_id, sample_file_bytes ); +create table recording_playback ( + composite_id integer primary key references recording (composite_id), + sample_file_uuid blob not null check (length(sample_file_uuid) = 16), + sample_file_sha1 blob not null check (length(sample_file_sha1) = 20), + video_index blob not null check (length(video_index) > 0) +); + -- Files in the sample file directory which may be present but should simply be -- discarded on startup. (Recordings which were never completed or have been -- marked for completion.) @@ -156,4 +186,4 @@ create table video_sample_entry ( ); insert into version (id, unix_time, notes) - values (0, cast(strftime('%s', 'now') as int), 'db creation'); + values (1, cast(strftime('%s', 'now') as int), 'db creation'); diff --git a/src/streamer.rs b/src/streamer.rs index 2734c8d..2cee0a9 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -117,6 +117,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { let mut writer: Option = None; let mut transformed = Vec::new(); let mut next_start = None; + let mut run_index = -1; while !self.shutdown.load(Ordering::SeqCst) { let pkt = stream.get_next()?; let pts = pkt.pts().ok_or_else(|| Error::new("packet with no pts".to_owned()))?; @@ -144,9 +145,10 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream { if r <= frame_realtime.sec { r + self.rotate_interval_sec } else { r }); let local_realtime = recording::Time::new(frame_realtime); + run_index += 1; self.dir.create_writer(&self.syncer_channel, next_start.unwrap_or(local_realtime), local_realtime, - self.camera_id, video_sample_entry_id)? + run_index, self.camera_id, video_sample_entry_id)? }, }; let orig_data = match pkt.data() { @@ -276,8 +278,9 @@ mod tests { is_key: bool, } - fn get_frames(db: &MutexGuard, recording_id: i64) -> Vec { - let rec = db.get_recording(recording_id).unwrap(); + fn get_frames(db: &MutexGuard, camera_id: i32, recording_id: i32) + -> Vec { + let rec = db.get_recording_playback(camera_id, recording_id).unwrap(); let mut it = recording::SampleIndexIterator::new(); let mut frames = Vec::new(); while it.next(&rec.video_index).unwrap() { @@ -328,7 +331,7 @@ mod tests { // Compare frame-by-frame. Note below that while the rotation is scheduled to happen near // 5-second boundaries (such as 2016-04-26 00:00:05), it gets deferred until the next key // frame, which in this case is 00:00:07. - assert_eq!(get_frames(&db, 1), &[ + assert_eq!(get_frames(&db, testutil::TEST_CAMERA_ID, 1), &[ Frame{start_90k: 0, duration_90k: 90379, is_key: true}, Frame{start_90k: 90379, duration_90k: 89884, is_key: false}, Frame{start_90k: 180263, duration_90k: 89749, is_key: false}, @@ -338,7 +341,7 @@ mod tests { Frame{start_90k: 540015, duration_90k: 90021, is_key: false}, Frame{start_90k: 630036, duration_90k: 89958, is_key: false}, ]); - assert_eq!(get_frames(&db, 2), &[ + assert_eq!(get_frames(&db, testutil::TEST_CAMERA_ID, 2), &[ Frame{start_90k: 0, duration_90k: 90011, is_key: true}, Frame{start_90k: 90011, duration_90k: 0, is_key: false}, ]); diff --git a/src/testutil.rs b/src/testutil.rs index deb2228..a9e5dd2 100644 --- a/src/testutil.rs +++ b/src/testutil.rs @@ -88,9 +88,9 @@ impl TestDb { let uuid_bytes = &TEST_CAMERA_UUID.as_bytes()[..]; conn.execute_named(r#" insert into camera (uuid, short_name, description, host, username, password, - main_rtsp_path, sub_rtsp_path, retain_bytes) + main_rtsp_path, sub_rtsp_path, retain_bytes, next_recording_id) values (:uuid, :short_name, :description, :host, :username, :password, - :main_rtsp_path, :sub_rtsp_path, :retain_bytes) + :main_rtsp_path, :sub_rtsp_path, :retain_bytes, :next_recording_id) "#, &[ (":uuid", &uuid_bytes), (":short_name", &"test camera"), @@ -101,6 +101,7 @@ impl TestDb { (":main_rtsp_path", &"/main"), (":sub_rtsp_path", &"/sub"), (":retain_bytes", &1048576i64), + (":next_recording_id", &1i64), ]).unwrap(); assert_eq!(TEST_CAMERA_ID as i64, conn.last_insert_rowid()); let db = sync::Arc::new(db::Database::new(conn).unwrap()); @@ -117,7 +118,7 @@ impl TestDb { } pub fn create_recording_from_encoder(&self, encoder: recording::SampleIndexEncoder) - -> db::ListCameraRecordingsRow { + -> db::ListRecordingsRow { let mut db = self.db.lock(); let video_sample_entry_id = db.insert_video_sample_entry(1920, 1080, &[0u8; 100]).unwrap(); @@ -137,12 +138,15 @@ impl TestDb { sample_file_uuid: Uuid::nil(), video_index: encoder.video_index, sample_file_sha1: [0u8; 20], + run_offset: 0, // TODO + flags: 0, // TODO }).unwrap(); tx.commit().unwrap(); } let mut row = None; let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); - db.list_recordings(TEST_CAMERA_ID, &all_time, |r| { row = Some(r); Ok(()) }).unwrap(); + db.list_recordings_by_time(TEST_CAMERA_ID, all_time, + |r| { row = Some(r); Ok(()) }).unwrap(); row.unwrap() } } diff --git a/src/upgrade/mod.rs b/src/upgrade/mod.rs new file mode 100644 index 0000000..6345ad4 --- /dev/null +++ b/src/upgrade/mod.rs @@ -0,0 +1,94 @@ +// This file is part of Moonfire NVR, a security camera digital video recorder. +// Copyright (C) 2016 Scott Lamb +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// In addition, as a special exception, the copyright holders give +// permission to link the code of portions of this program with the +// OpenSSL library under certain conditions as described in each +// individual source file, and distribute linked combinations including +// the two. +// +// You must obey the GNU General Public License in all respects for all +// of the code used other than OpenSSL. If you modify file(s) with this +// exception, you may extend this exception to your version of the +// file(s), but you are not obligated to do so. If you do not wish to do +// so, delete this exception statement from your version. If you delete +// this exception statement from all source files in the program, then +// also delete it here. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +/// Upgrades the database schema. +/// +/// See `guide/schema.md` for more information. + +use db; +use error::Error; +use rusqlite; + +mod v0_to_v1; + +const UPGRADE_NOTES: &'static str = + concat!("upgraded using moonfire-nvr ", env!("CARGO_PKG_VERSION")); + +const UPGRADERS: [fn(&rusqlite::Transaction) -> Result<(), Error>; 1] = [ + v0_to_v1::run, +]; + +fn set_journal_mode(conn: &rusqlite::Connection, requested: &str) -> Result<(), Error> { + assert!(!requested.contains(';')); // quick check for accidental sql injection. + let actual = conn.query_row(&format!("pragma journal_mode = {}", requested), &[], + |row| row.get_checked::<_, String>(0))??; + info!("...database now in journal_mode {} (requested {}).", actual, requested); + Ok(()) +} + +pub fn run(mut conn: rusqlite::Connection, preset_journal: &str, + no_vacuum: bool) -> Result<(), Error> { + { + assert_eq!(UPGRADERS.len(), db::EXPECTED_VERSION as usize); + let old_ver = + conn.query_row("select max(id) from version", &[], |row| row.get_checked(0))??; + if old_ver > db::EXPECTED_VERSION { + return Err(Error::new(format!("Database is at version {}, later than expected {}", + old_ver, db::EXPECTED_VERSION)))?; + } else if old_ver < 0 { + return Err(Error::new(format!("Database is at negative version {}!", old_ver))); + } + info!("Upgrading database from version {} to version {}...", old_ver, db::EXPECTED_VERSION); + set_journal_mode(&conn, preset_journal).unwrap(); + for ver in old_ver .. db::EXPECTED_VERSION { + info!("...from version {} to version {}", ver, ver + 1); + let tx = conn.transaction()?; + UPGRADERS[ver as usize](&tx)?; + tx.execute(r#" + insert into version (id, unix_time, notes) + values (?, cast(strftime('%s', 'now') as int32), ?) + "#, &[&(ver + 1), &UPGRADE_NOTES])?; + tx.commit()?; + } + } + + // WAL is the preferred journal mode for normal operation; it reduces the number of syncs + // without compromising safety. + set_journal_mode(&conn, "wal").unwrap(); + if !no_vacuum { + info!("...vacuuming database after upgrade."); + conn.execute_batch(r#" + pragma page_size = 16384; + vacuum; + "#).unwrap(); + } + info!("...done."); + Ok(()) +} diff --git a/src/upgrade/v0_to_v1.rs b/src/upgrade/v0_to_v1.rs new file mode 100644 index 0000000..cc27119 --- /dev/null +++ b/src/upgrade/v0_to_v1.rs @@ -0,0 +1,235 @@ +// This file is part of Moonfire NVR, a security camera digital video recorder. +// Copyright (C) 2016 Scott Lamb +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// In addition, as a special exception, the copyright holders give +// permission to link the code of portions of this program with the +// OpenSSL library under certain conditions as described in each +// individual source file, and distribute linked combinations including +// the two. +// +// You must obey the GNU General Public License in all respects for all +// of the code used other than OpenSSL. If you modify file(s) with this +// exception, you may extend this exception to your version of the +// file(s), but you are not obligated to do so. If you do not wish to do +// so, delete this exception statement from your version. If you delete +// this exception statement from all source files in the program, then +// also delete it here. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +/// Upgrades a version 0 schema to a version 1 schema. + +use db; +use error::Error; +use recording; +use rusqlite; +use std::collections::HashMap; + +pub fn run(tx: &rusqlite::Transaction) -> Result<(), Error> { + // These create statements match the schema.sql when version 1 was the latest. + tx.execute_batch(r#" + alter table camera rename to old_camera; + create table camera ( + id integer primary key, + uuid blob unique, + short_name text not null, + description text, + host text, + username text, + password text, + main_rtsp_path text, + sub_rtsp_path text, + retain_bytes integer not null check (retain_bytes >= 0), + next_recording_id integer not null check (next_recording_id >= 0) + ); + alter table recording rename to old_recording; + drop index recording_cover; + create table recording ( + composite_id integer primary key, + camera_id integer not null references camera (id), + run_offset integer not null, + flags integer not null, + sample_file_bytes integer not null check (sample_file_bytes > 0), + start_time_90k integer not null check (start_time_90k > 0), + duration_90k integer not null + check (duration_90k >= 0 and duration_90k < 5*60*90000), + local_time_delta_90k integer not null, + video_samples integer not null check (video_samples > 0), + video_sync_samples integer not null check (video_samples > 0), + video_sample_entry_id integer references video_sample_entry (id), + check (composite_id >> 32 = camera_id) + ); + create index recording_cover on recording ( + start_time_90k, + duration_90k, + video_samples, + video_sample_entry_id, + sample_file_bytes + ); + create table recording_playback ( + composite_id integer primary key references recording (composite_id), + sample_file_uuid blob not null check (length(sample_file_uuid) = 16), + sample_file_sha1 blob not null check (length(sample_file_sha1) = 20), + video_index blob not null check (length(video_index) > 0) + ); + "#)?; + let camera_state = fill_recording(tx).unwrap(); + fill_camera(tx, camera_state).unwrap(); + tx.execute_batch(r#" + drop table old_camera; + drop table old_recording; + "#)?; + Ok(()) +} + +struct CameraState { + /// tuple of (run_start_id, next_start_90k). + current_run: Option<(i64, i64)>, + + /// As in the `next_recording_id` field of the `camera` table. + next_recording_id: i32, +} + +/// Fills the `recording` and `recording_playback` tables from `old_recording`, returning +/// the `camera_state` map for use by a following call to `fill_cameras`. +fn fill_recording(tx: &rusqlite::Transaction) -> Result, Error> { + let mut select = tx.prepare(r#" + select + camera_id, + sample_file_bytes, + start_time_90k, + duration_90k, + local_time_delta_90k, + video_samples, + video_sync_samples, + video_sample_entry_id, + sample_file_uuid, + sample_file_sha1, + video_index + from + old_recording + "#)?; + let mut insert1 = tx.prepare(r#" + insert into recording values (:composite_id, :camera_id, :run_offset, :flags, + :sample_file_bytes, :start_time_90k, :duration_90k, + :local_time_delta_90k, :video_samples, :video_sync_samples, + :video_sample_entry_id) + "#)?; + let mut insert2 = tx.prepare(r#" + insert into recording_playback values (:composite_id, :sample_file_uuid, :sample_file_sha1, + :video_index) + "#)?; + let mut rows = select.query(&[])?; + let mut camera_state: HashMap = HashMap::new(); + while let Some(row) = rows.next() { + let row = row?; + let camera_id: i32 = row.get_checked(0)?; + let camera_state = camera_state.entry(camera_id).or_insert_with(|| { + CameraState{ + current_run: None, + next_recording_id: 1, + } + }); + let composite_id = ((camera_id as i64) << 32) | (camera_state.next_recording_id as i64); + camera_state.next_recording_id += 1; + let sample_file_bytes: i32 = row.get_checked(1)?; + let start_time_90k: i64 = row.get_checked(2)?; + let duration_90k: i32 = row.get_checked(3)?; + let local_time_delta_90k: i64 = row.get_checked(4)?; + let video_samples: i32 = row.get_checked(5)?; + let video_sync_samples: i32 = row.get_checked(6)?; + let video_sample_entry_id: i32 = row.get_checked(7)?; + let sample_file_uuid: Vec = row.get_checked(8)?; + let sample_file_sha1: Vec = row.get_checked(9)?; + let video_index: Vec = row.get_checked(10)?; + let trailing_zero = { + let mut it = recording::SampleIndexIterator::new(); + while it.next(&video_index)? {} + it.duration_90k == 0 + }; + let run_id = match camera_state.current_run { + Some((run_id, expected_start)) if expected_start == start_time_90k => run_id, + _ => composite_id, + }; + insert1.execute_named(&[ + (":composite_id", &composite_id), + (":camera_id", &camera_id), + (":run_offset", &(composite_id - run_id)), + (":flags", &(if trailing_zero { db::RecordingFlags::TrailingZero as i32 } else { 0 })), + (":sample_file_bytes", &sample_file_bytes), + (":start_time_90k", &start_time_90k), + (":duration_90k", &duration_90k), + (":local_time_delta_90k", &local_time_delta_90k), + (":video_samples", &video_samples), + (":video_sync_samples", &video_sync_samples), + (":video_sample_entry_id", &video_sample_entry_id), + ])?; + insert2.execute_named(&[ + (":composite_id", &composite_id), + (":sample_file_uuid", &sample_file_uuid), + (":sample_file_sha1", &sample_file_sha1), + (":video_index", &video_index), + ])?; + camera_state.current_run = if trailing_zero { + None + } else { + Some((run_id, start_time_90k + duration_90k as i64)) + }; + } + Ok(camera_state) +} + +fn fill_camera(tx: &rusqlite::Transaction, camera_state: HashMap) + -> Result<(), Error> { + let mut select = tx.prepare(r#" + select + id, uuid, short_name, description, host, username, password, main_rtsp_path, + sub_rtsp_path, retain_bytes + from + old_camera + "#)?; + let mut insert = tx.prepare(r#" + insert into camera values (:id, :uuid, :short_name, :description, :host, :username, :password, + :main_rtsp_path, :sub_rtsp_path, :retain_bytes, :next_recording_id) + "#)?; + let mut rows = select.query(&[])?; + while let Some(row) = rows.next() { + let row = row?; + let id: i32 = row.get_checked(0)?; + let uuid: Vec = row.get_checked(1)?; + let short_name: String = row.get_checked(2)?; + let description: String = row.get_checked(3)?; + let host: String = row.get_checked(4)?; + let username: String = row.get_checked(5)?; + let password: String = row.get_checked(6)?; + let main_rtsp_path: String = row.get_checked(7)?; + let sub_rtsp_path: String = row.get_checked(8)?; + let retain_bytes: i64 = row.get_checked(9)?; + insert.execute_named(&[ + (":id", &id), + (":uuid", &uuid), + (":short_name", &short_name), + (":description", &description), + (":host", &host), + (":username", &username), + (":password", &password), + (":main_rtsp_path", &main_rtsp_path), + (":sub_rtsp_path", &sub_rtsp_path), + (":retain_bytes", &retain_bytes), + (":next_recording_id", + &camera_state.get(&id).map(|s| s.next_recording_id).unwrap_or(1)), + ])?; + } + Ok(()) +} diff --git a/src/web.rs b/src/web.rs index 7d7a2da..0b67720 100644 --- a/src/web.rs +++ b/src/web.rs @@ -34,14 +34,16 @@ use core::borrow::Borrow; use core::str::FromStr; use db; use dir::SampleFileDir; -use error::{Error, Result}; +use error::Error; use http_entity; use hyper::{header,server,status}; use hyper::uri::RequestUri; use mime; use mp4; use recording; +use regex::Regex; use serde_json; +use std::cmp; use std::fmt; use std::io::Write; use std::ops::Range; @@ -57,6 +59,11 @@ const DECIMAL_PREFIXES: &'static [&'static str] =&[" ", " k", " M", " G", " T", lazy_static! { static ref JSON: mime::Mime = mime!(Application/Json); static ref HTML: mime::Mime = mime!(Text/Html); + + /// Regex used to parse the `s` query parameter to `view.mp4`. + /// As described in `design/api.md`, this is of the form + /// `START_ID[-END_ID][.[REL_START_TIME]-[REL_END_TIME]]`. + static ref SEGMENTS_RE: Regex = Regex::new(r"^(\d+)(-\d+)?(?:\.(\d+)?-(\d+)?)?$").unwrap(); } mod json { include!(concat!(env!("OUT_DIR"), "/serde_types.rs")); } @@ -181,18 +188,58 @@ pub struct Handler { dir: Arc, } +#[derive(Debug, Eq, PartialEq)] +struct Segments { + ids: Range, + start_time: i64, + end_time: Option, +} + +impl Segments { + pub fn parse(input: &str) -> Result { + let caps = SEGMENTS_RE.captures(input).ok_or(())?; + let ids_start = i32::from_str(caps.at(1).unwrap()).map_err(|_| ())?; + let ids_end = match caps.at(2) { + Some(e) => i32::from_str(&e[1..]).map_err(|_| ())?, + None => ids_start, + } + 1; + if ids_start < 0 || ids_end <= ids_start { + return Err(()); + } + let start_time = caps.at(3).map_or(Ok(0), i64::from_str).map_err(|_| ())?; + if start_time < 0 { + return Err(()); + } + let end_time = match caps.at(4) { + Some(v) => { + let e = i64::from_str(v).map_err(|_| ())?; + if e <= start_time { + return Err(()); + } + Some(e) + }, + None => None + }; + Ok(Segments{ + ids: ids_start .. ids_end, + start_time: start_time, + end_time: end_time, + }) + } +} + impl Handler { pub fn new(db: Arc, dir: Arc) -> Self { Handler{db: db, dir: dir} } - fn not_found(&self, mut res: server::Response) -> Result<()> { + fn not_found(&self, mut res: server::Response) -> Result<(), Error> { *res.status_mut() = status::StatusCode::NotFound; res.send(b"not found")?; Ok(()) } - fn list_cameras(&self, req: &server::Request, mut res: server::Response) -> Result<()> { + fn list_cameras(&self, req: &server::Request, mut res: server::Response) -> Result<(), Error> { let json = is_json(req); let buf = { let db = self.db.lock(); @@ -207,7 +254,7 @@ impl Handler { Ok(()) } - fn list_cameras_html(&self, db: MutexGuard) -> Result> { + fn list_cameras_html(&self, db: MutexGuard) -> Result, Error> { let mut buf = Vec::new(); buf.extend_from_slice(b"\ \n\ @@ -242,7 +289,7 @@ impl Handler { } fn camera(&self, uuid: Uuid, query: &str, req: &server::Request, mut res: server::Response) - -> Result<()> { + -> Result<(), Error> { let json = is_json(req); let buf = { let db = self.db.lock(); @@ -260,7 +307,7 @@ impl Handler { } fn camera_html(&self, db: MutexGuard, query: &str, - uuid: Uuid) -> Result> { + uuid: Uuid) -> Result, Error> { let r = Handler::get_optional_range(query)?; let camera = db.get_camera(uuid) .ok_or_else(|| Error::new("no such camera".to_owned()))?; @@ -290,26 +337,30 @@ impl Handler { // parameters between recordings. static FORCE_SPLIT_DURATION: recording::Duration = recording::Duration(60 * 60 * recording::TIME_UNITS_PER_SEC); - db.list_aggregated_recordings(camera.id, &r, FORCE_SPLIT_DURATION, |row| { - let seconds = (row.range.end.0 - row.range.start.0) / recording::TIME_UNITS_PER_SEC; + let mut rows = Vec::new(); + db.list_aggregated_recordings(camera.id, r, FORCE_SPLIT_DURATION, |row| { + rows.push(row.clone()); + Ok(()) + })?; + rows.sort_by(|r1, r2| r1.time.start.cmp(&r2.time.start)); + for row in &rows { + let seconds = (row.time.end.0 - row.time.start.0) / recording::TIME_UNITS_PER_SEC; write!(&mut buf, "\ - {}\ + {}\ {}{}x{}{:.0}{:b}B{}bps\n", - row.range.start.0, row.range.end.0, - HumanizedTimestamp(Some(row.range.start)), - HumanizedTimestamp(Some(row.range.end)), row.video_sample_entry.width, + row.ids.start, row.ids.end - 1, HumanizedTimestamp(Some(row.time.start)), + HumanizedTimestamp(Some(row.time.end)), row.video_sample_entry.width, row.video_sample_entry.height, if seconds == 0 { 0. } else { row.video_samples as f32 / seconds as f32 }, Humanized(row.sample_file_bytes), Humanized(if seconds == 0 { 0 } else { row.sample_file_bytes * 8 / seconds }))?; - Ok(()) - })?; + }; buf.extend_from_slice(b"\n\n"); Ok(buf) } fn camera_recordings(&self, uuid: Uuid, query: &str, req: &server::Request, - mut res: server::Response) -> Result<()> { + mut res: server::Response) -> Result<(), Error> { let r = Handler::get_optional_range(query)?; if !is_json(req) { *res.status_mut() = status::StatusCode::NotAcceptable; @@ -321,11 +372,11 @@ impl Handler { let db = self.db.lock(); let camera = db.get_camera(uuid) .ok_or_else(|| Error::new("no such camera".to_owned()))?; - db.list_aggregated_recordings(camera.id, &r, recording::Duration(i64::max_value()), + db.list_aggregated_recordings(camera.id, r, recording::Duration(i64::max_value()), |row| { out.recordings.push(json::Recording{ - start_time_90k: row.range.start.0, - end_time_90k: row.range.end.0, + start_time_90k: row.time.start.0, + end_time_90k: row.time.end.0, sample_file_bytes: row.sample_file_bytes, video_samples: row.video_samples, video_sample_entry_width: row.video_sample_entry.width, @@ -342,79 +393,90 @@ impl Handler { } fn camera_view_mp4(&self, uuid: Uuid, query: &str, req: &server::Request, - res: server::Response) -> Result<()> { + res: server::Response) -> Result<(), Error> { let camera_id = { let db = self.db.lock(); let camera = db.get_camera(uuid) .ok_or_else(|| Error::new("no such camera".to_owned()))?; camera.id }; - let mut start = None; - let mut end = None; - let mut include_ts = false; + let mut builder = mp4::Mp4FileBuilder::new(); for (key, value) in form_urlencoded::parse(query.as_bytes()) { let (key, value) = (key.borrow(), value.borrow()); match key { - "start_time_90k" => start = Some(recording::Time(i64::from_str(value)?)), - "end_time_90k" => end = Some(recording::Time(i64::from_str(value)?)), - "ts" => { include_ts = value == "true"; }, - _ => {}, + "s" => { + let s = Segments::parse(value).map_err( + |_| Error::new(format!("invalid s parameter: {}", value)))?; + debug!("camera_view_mp4: appending s={:?}", s); + let mut est_segments = (s.ids.end - s.ids.start) as usize; + if let Some(end) = s.end_time { + // There should be roughly ceil((end - start) / desired_recording_duration) + // recordings in the desired timespan if there are no gaps or overlap, + // possibly another for misalignment of the requested timespan with the + // rotate offset and another because rotation only happens at key frames. + let ceil_durations = (end - s.start_time + + recording::DESIRED_RECORDING_DURATION - 1) / + recording::DESIRED_RECORDING_DURATION; + est_segments = cmp::min(est_segments, (ceil_durations + 2) as usize); + } + builder.reserve(est_segments); + let db = self.db.lock(); + let mut prev = None; + let mut cur_off = 0; + db.list_recordings_by_id(camera_id, s.ids.clone(), |r| { + // Check for missing recordings. + match prev { + None if r.id == s.ids.start => {}, + None => return Err(Error::new(format!("no such recording {}/{}", + camera_id, s.ids.start))), + Some(id) if r.id != id + 1 => { + return Err(Error::new(format!("no such recording {}/{}", + camera_id, id + 1))); + }, + _ => {}, + }; + prev = Some(r.id); + + // Add a segment for the relevant part of the recording, if any. + let end_time = s.end_time.unwrap_or(i64::max_value()); + let d = r.duration_90k as i64; + if s.start_time <= cur_off + d && cur_off < end_time { + let start = cmp::max(0, s.start_time - cur_off); + let end = cmp::min(d, end_time - cur_off); + let times = start as i32 .. end as i32; + debug!("...appending recording {}/{} with times {:?} (out of dur {})", + r.camera_id, r.id, times, d); + builder.append(&db, r, start as i32 .. end as i32)?; + } else { + debug!("...skipping recording {}/{} dur {}", r.camera_id, r.id, d); + } + cur_off += d; + Ok(()) + })?; + + // Check for missing recordings. + match prev { + Some(id) if s.ids.end != id + 1 => { + return Err(Error::new(format!("no such recording {}/{}", + camera_id, s.ids.end - 1))); + }, + None => { + return Err(Error::new(format!("no such recording {}/{}", + camera_id, s.ids.start))); + }, + _ => {}, + }; + if let Some(end) = s.end_time { + if end > cur_off { + return Err(Error::new( + format!("end time {} is beyond specified recordings", end))); + } + } + }, + "ts" => builder.include_timestamp_subtitle_track(value == "true"), + _ => return Err(Error::new(format!("parameter {} not understood", key))), } }; - let start = start.ok_or_else(|| Error::new("start_time_90k missing".to_owned()))?; - let end = end.ok_or_else(|| Error::new("end_time_90k missing".to_owned()))?; - let desired_range = start .. end; - let mut builder = mp4::Mp4FileBuilder::new(); - - // There should be roughly ceil((end - start) / desired_recording_duration) recordings - // in the desired timespan if there are no gaps or overlap. Add a couple more to be safe: - // one for misalignment of the requested timespan with the rotate offset, another because - // rotation only happens at key frames. - let ceil_durations = ((end - start).0 + recording::DESIRED_RECORDING_DURATION - 1) / - recording::DESIRED_RECORDING_DURATION; - let est_records = (ceil_durations + 2) as usize; - let mut next_start = start; - builder.reserve(est_records); - { - let db = self.db.lock(); - db.list_recordings(camera_id, &desired_range, |r| { - if builder.len() == 0 && r.start > next_start { - return Err(Error::new(format!("recording started late ({} vs requested {})", - r.start, start))); - } else if builder.len() != 0 && r.start != next_start { - return Err(Error::new(format!("gap/overlap in recording: {} to {} after row {}", - next_start, r.start, builder.len()))); - } - next_start = r.start + recording::Duration(r.duration_90k as i64); - // TODO: check for inconsistent video sample entries. - - let rel_start = if r.start < start { - (start - r.start).0 as i32 - } else { - 0 - }; - let rel_end = if r.start + recording::Duration(r.duration_90k as i64) > end { - (end - r.start).0 as i32 - } else { - r.duration_90k - }; - builder.append(&db, r, rel_start .. rel_end)?; - Ok(()) - })?; - } - if next_start < end { - return Err(Error::new(format!( - "recording ends early: {}, not requested: {} after {} rows.", - next_start, end, builder.len()))) - } - if builder.len() > est_records { - warn!("Estimated {} records for time [{}, {}); actually were {}", - est_records, start, end, builder.len()); - } else { - debug!("Estimated {} records for time [{}, {}); actually were {}", - est_records, start, end, builder.len()); - } - builder.include_timestamp_subtitle_track(include_ts); let mp4 = builder.build(self.db.clone(), self.dir.clone())?; http_entity::serve(&mp4, req, res)?; Ok(()) @@ -422,7 +484,7 @@ impl Handler { /// Parses optional `start_time_90k` and `end_time_90k` query parameters, defaulting to the /// full range of possible values. - fn get_optional_range(query: &str) -> Result> { + fn get_optional_range(query: &str) -> Result, Error> { let mut start = i64::min_value(); let mut end = i64::max_value(); for (key, value) in form_urlencoded::parse(query.as_bytes()) { @@ -455,10 +517,12 @@ impl server::Handler for Handler { #[cfg(test)] mod tests { - use super::{HtmlEscaped, Humanized}; + use super::{HtmlEscaped, Humanized, Segments}; + use testutil; #[test] fn test_humanize() { + testutil::init(); assert_eq!("1.0 B", format!("{:b}B", Humanized(1))); assert_eq!("1.0 EiB", format!("{:b}B", Humanized(1i64 << 60))); assert_eq!("1.5 EiB", format!("{:b}B", Humanized((1i64 << 60) + (1i64 << 59)))); @@ -468,8 +532,30 @@ mod tests { #[test] fn test_html_escaped() { + testutil::init(); assert_eq!("", format!("{}", HtmlEscaped(""))); assert_eq!("no special chars", format!("{}", HtmlEscaped("no special chars"))); assert_eq!("a <tag> & text", format!("{}", HtmlEscaped("a & text"))); } + + #[test] + fn test_segments() { + testutil::init(); + assert_eq!(Segments{ids: 1..2, start_time: 0, end_time: None}, + Segments::parse("1").unwrap()); + assert_eq!(Segments{ids: 1..2, start_time: 26, end_time: None}, + Segments::parse("1.26-").unwrap()); + assert_eq!(Segments{ids: 1..2, start_time: 0, end_time: Some(42)}, + Segments::parse("1.-42").unwrap()); + assert_eq!(Segments{ids: 1..2, start_time: 26, end_time: Some(42)}, + Segments::parse("1.26-42").unwrap()); + assert_eq!(Segments{ids: 1..6, start_time: 0, end_time: None}, + Segments::parse("1-5").unwrap()); + assert_eq!(Segments{ids: 1..6, start_time: 26, end_time: None}, + Segments::parse("1-5.26-").unwrap()); + assert_eq!(Segments{ids: 1..6, start_time: 0, end_time: Some(42)}, + Segments::parse("1-5.-42").unwrap()); + assert_eq!(Segments{ids: 1..6, start_time: 26, end_time: Some(42)}, + Segments::parse("1-5.26-42").unwrap()); + } }