schema version 1

The advantages of the new schema are:

* overlapping recordings can be unambiguously described and viewed.
  This is a significant problem right now; the clock on my cameras appears to
  run faster than the (NTP-synchronized) clock on my NVR. Thus, if an
  RTSP session drops and is quickly reconnected, there's likely to be
  overlap.

* less I/O is required to view mp4s when there are multiple cameras.
  This is a pretty dramatic difference in the number of database read
  syscalls with pragma page_size = 1024 (605 -> 39 in one test),
  although I'm not sure how much of that maps to actual I/O wait time.
  That's probably as dramatic as it is due to overflow page chaining.
  But even with larger page sizes, there's an improvement. It helps to
  stop interleaving the video_index fields from different cameras.

There are changes to the JSON API to take advantage of this, described
in design/api.md.

There's an upgrade procedure, described in guide/schema.md.
This commit is contained in:
Scott Lamb 2016-12-20 22:08:18 -08:00
parent fee4141dc6
commit eee887b9a6
14 changed files with 1121 additions and 343 deletions

View File

@ -17,7 +17,8 @@ support for motion detection, no authentication, and no config UI.
This is version 0.1, the initial release. Until version 1.0, there will be no
compatibility guarantees: configuration and storage formats may change from
version to version.
version to version. There is an [upgrade procedure](guide/schema.md) but it is
not for the faint of heart.
I hope to add features such as salient motion detection. It's way too early to
make promises, but it seems possible to build a full-featured
@ -209,11 +210,12 @@ each camera you insert using this method.
$ sudo -u moonfire-nvr sqlite3 ~moonfire-nvr/db/db
sqlite3> insert into camera (
...> uuid, short_name, description, host, username, password,
...> main_rtsp_path, sub_rtsp_path, retain_bytes) values (
...> main_rtsp_path, sub_rtsp_path, retain_bytes,
...> next_recording_id) values (
...> X'b47f48706d91414591cd6c931bf836b4', 'driveway',
...> 'Longer description of this camera', '192.168.1.101',
...> 'admin', '12345', '/Streaming/Channels/1',
...> '/Streaming/Channels/2', 104857600);
...> '/Streaming/Channels/2', 104857600, 0);
sqlite3> ^D
### Using automatic camera configuration inclusion with `prep.sh`
@ -230,7 +232,7 @@ but can if you wish and know what you are doing):
short_name, description,
host, username, password,
main_rtsp_path, sub_rtsp_path,
retain_bytes
retain_bytes, next_recording_id
)
values
(
@ -239,7 +241,7 @@ but can if you wish and know what you are doing):
'192.168.1.41',
'admin', 'secret',
'/Streaming/Channels/1', '/Streaming/Channels/2',
346870912000
346870912000, 0
),
(
X'da5921f493ac4279aafe68e69e174026',
@ -247,7 +249,7 @@ but can if you wish and know what you are doing):
'192.168.1.42',
'admin', 'secret',
'/Streaming/Channels/1', '/Streaming/Channels/2',
346870912000
346870912000, 0
);
You'll still have to find the correct rtsp paths, usernames and passwords, and

View File

@ -125,31 +125,28 @@ Valid request parameters:
TODO(slamb): once we support annotations, should they be included in the same
URI or as a separate `/annotations`?
TODO(slamb): There might be some irregularity in the order if there are
overlapping recordings (such as if the server's clock jumped while running)
but I haven't thought about the details. In general, I'm not really sure how
to handle this case, other than ideally to keep recording stuff no matter what
and present some UI to help the user to fix it after the
fact.
In the property `recordings`, returns a list of recordings in arbitrary order.
Each recording object has the following properties:
In the property `recordings`, returns a list of recordings. Each recording
object has the following properties:
* `start_time_90k`
* `end_time_90k`
* `start_id`. The id of this recording, which can be used with `/view.mp4`
to retrieve its content.
* `end_id` (optional). If absent, this object describes a single recording.
If present, this indicates that recordings `start_id-end_id` (inclusive)
together are as described. Adjacent recordings from the same RTSP session
may be coalesced in this fashion to reduce the amount of redundant data
transferred.
* `start_time_90k`: the start time of the given recording. Note this may be
less than the requested `start_time_90k` if this recording was ongoing
at the requested time.
* `end_time_90k`: the end time of the given recording. Note this may be
greater than the requested `end_time_90k` if this recording was ongoing at
the requested time.
* `sample_file_bytes`
* `video_sample_entry_sha1`
* `video_sample_entry_width`
* `video_sample_entry_height`
* `video_samples`: the number of samples (aka frames) of video in this
recording.
* TODO: recording id(s)? interior split points for coalesced recordings?
Recordings may be coalesced if they are adjacent and have the same
`video_sample_entry_*` data. That is, if recording A spans times [t, u) and
recording B spans times [u, v), they may be returned as a single recording
AB spanning times [t, v). Arbitrarily many recordings may be coalesced in this
fashion.
Example request URI (with added whitespace between parameters):
@ -165,8 +162,9 @@ Example response:
{
"recordings": [
{
"end_time_90k": 130985466591817,
"start_id": 1,
"start_time_90k": 130985461191810,
"end_time_90k": 130985466591817,
"sample_file_bytes": 8405564,
"video_sample_entry_sha1": "81710c9c51a02cc95439caa8dd3bc12b77ffe767",
"video_sample_entry_width": 1280,
@ -184,18 +182,38 @@ Example response:
### `/camera/<uuid>/view.mp4`
A GET returns a .mp4 file, with an etag and support for range requests.
A GET returns a `.mp4` file, with an etag and support for range requests.
Expected query parameters:
* `start_time_90k`
* `end_time_90k`
* `ts`: should be set to `true` to request a subtitle track be added with
human-readable recording timestamps.
* TODO(slamb): possibly `overlap` to indicate what to do about segments of
recording with overlapping wall times. Values might include:
* `error` (return an HTTP error)
* `include_all` (include all, in order of the recording ids)
* `include_latest` (include only the latest by recording id for a
particular segment of time)
* TODO(slamb): gaps allowed or not? maybe a parameter for this also?
* `s` (one or more): a string of the form
`START_ID[-END_ID][.[REL_START_TIME]-[REL_END_TIME]]`. This specifies
recording segments to include. The produced `.mp4` file will be a
concatenation of the segments indicated by all `s` parameters. The ids to
retrieve are as returned by the `/recordings` URL. The optional start and
end times are in 90k units and relative to the start of the first specified
id. These can be used to clip the returned segments. Note they can be used
to skip over some ids entirely; this is allowed so that the caller doesn't
need to know the start time of each interior id.
* `ts` (optional): should be set to `true` to request a subtitle track be
added with human-readable recording timestamps.
Example request URI to retrieve all of recording id 1 from the given camera:
```
/camera/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/view.mp4?s=1
```
Example request URI to retrieve all of recording ids 15 from the given camera,
with timestamp subtitles:
```
/camera/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/view.mp4?s=1-5&ts=true
```
Example request URI to retrieve recording id 1, skipping its first 26
90,000ths of a second:
```
/camera/fd20f7a2-9d69-4cb3-94ed-d51a20c3edfe/view.mp4?s=1.26
```

View File

@ -15,33 +15,132 @@ The database schema includes a version number to quickly identify if a
the database is compatible with a particular version of the software. Some
software upgrades will require you to upgrade the database.
### Unversioned to version 0
Note that in general upgrades are one-way and backward-incompatible. That is,
you can't downgrade the database to the old version, and you can't run the old
software on the new database. To minimize the corresponding risk, you should
save a backup of the old SQLite database and verify the new software works in
read-only mode prior to deleting the old database.
Early versions of Moonfire NVR did not include the version information in the
schema. You can manually add this information to your schema using the
`sqlite3` commandline. This process is backward compatible, meaning that
software versions that accept an unversioned database will also accept a
version 0 database.
### Procedure
Version 0 makes two changes:
First ensure there is sufficient space available for three copies of the
SQLite database:
* schema versioning, as described above.
* adding a column (`video_sync_samples`) to a database index to speed up
certain operations.
# the primary copy, which will be upgraded
# a copy you create manually as a backup so that you can restore if you
discover a problem while running the new software against the upgraded
database in read-only mode. If disk space is tight, you can save this
to a different filesystem than the primary copy.
# an internal copy made and destroyed by Moonfire NVR and SQLite during the
upgrade:
* a write-ahead log or rollback journal during earlier stages
* a complete database copy during the final vacuum step
If disk space is tight, and you are _very careful_, you can skip these
copies with the `--preset-journal=off --no-vacuum` arguments to
the updater. If you aren't confident in your ability to do this, *don't
do it*. If you are confident, take additional safety precautions anyway:
* double-check you have the full backup described above. Without the
journal any problems during the upgrade will corrupt your database
and you will need to restore.
* ensure you re-enable journalling via `pragma journal_mode = wal;`
before using the upgraded database, or any problems after the
upgrade will corrupt your database. The upgrade procedure should do
this automatically, but you will want to verify by hand that you are
no longer in the dangerous mode.
First ensure Moonfire NVR is not running; if you are using systemd with the
Next ensure Moonfire NVR is not running and does not automatically restart if
the system is rebooted during the upgrade. If you are using systemd with the
service name `moonfire-nvr`, you can do this as follows:
$ sudo systemctl stop moonfire-nvr
$ sudo systemctl disable moonfire-nvr
The service takes a moment to shut down; wait until the following command
reports that it is not running:
$ sudo systemctl status moonfire-nvr
Then use `sqlite3` to manually edit the database. The default path is
`/var/lib/moonfire-nvr/db/db`; if you've specified a different `--db_dir`,
use that directory with a suffix of `/db`.
Then back up your SQLite database. If you are using the default path, you can
do so as follows:
$ sudo -u moonfire-nvr cp /var/lib/moonfire-nvr/db/db{,.pre-upgrade}
By default, the upgrade command will reset the SQLite `journal_mode` to
`delete` prior to the upgrade. This works around a problem with
`journal_mode = wal` in older SQLite versions, as documented in [the SQLite
manual for write-ahead logging](https://www.sqlite.org/wal.html):
> WAL works best with smaller transactions. WAL does not work well for very
> large transactions. For transactions larger than about 100 megabytes,
> traditional rollback journal modes will likely be faster. For transactions
> in excess of a gigabyte, WAL mode may fail with an I/O or disk-full error.
> It is recommended that one of the rollback journal modes be used for
> transactions larger than a few dozen megabytes. Beginning with version
> 3.11.0 (2016-02-15), WAL mode works as efficiently with large transactions
> as does rollback mode.
Run the upgrade procedure using the new software binary (here referred to as
`new-moonfire-nvr`; if you are installing from source, you may find it as
`target/release/moonfire-nvr`).
$ sudo -u moonfire-nvr RUST_LOG=info new-moonfire-nvr --upgrade
Then run the system in read-only mode to verify correct operation:
$ sudo -u moonfire-nvr new-moonfire-nvr --read-only
Go to the web interface and ensure the system is operating correctly. If
you detect a problem now, you can copy the old database back over the new one.
If you detect a problem after enabling read-write operation, a restore will be
more complicated.
Then install the new software to the path expected by your systemd
configuration and start it up:
$ sudo install -m 755 new-moonfire-nvr /usr/local/bin/moonfire-nvr
$ sudo systemctl enable moonfire-nvr
$ sudo systemctl start moonfire-nvr
Hopefully your system is functioning correctly. If not, there are two options
for restore; neither are easy:
* go back to your old database. There will be two classes of problems:
* If the new system deleted any recordings, the old system will
incorrectly believe they are still present. You could wait until all
existing files are rotated away, or you could try to delete them
manually from the database.
* if the new system created any recordings, the old system will not
know about them and will not delete them. Your disk may become full.
You should find some way to discover these files and manually delete
them.
Once you're confident of correct operation, delete the unneeded backup:
$ sudo systemctl rm /var/lib/moonfire-nvr/db/db.pre-upgrade
### Unversioned to version 0
Early versions of Moonfire NVR (prior to 2016-12-20) did not include the
version information in the schema. You can manually add this information to
your schema using the `sqlite3` commandline. This process is backward
compatible, meaning that software versions that accept an unversioned database
will also accept a version 0 database.
Version 0 makes two changes:
* it adds schema versioning, as described above.
* it adds a column (`video_sync_samples`) to a database index to speed up
certain operations.
There's a special procedure for this upgrade. The good news is that a backup
is unnecessary; there's no risk with this procedure.
First ensure Moonfire NVR is not running as described in the general procedure
above.
Then use `sqlite3` to manually edit the database. The default
path is `/var/lib/moonfire-nvr/db/db`; if you've specified a different
`--db_dir`, use that directory with a suffix of `/db`.
$ sudo -u moonfire-nvr sqlite3 /var/lib/moonfire-nvr/db/db
sqlite3>
@ -74,6 +173,15 @@ create index recording_cover on recording (
commit transaction;
```
When you are done, you can restart the service:
When you are done, you can restart the service via `systemctl` and continue
using it with your existing or new version of Moonfire NVR.
$ sudo systemctl start moonfire-nvr
### Version 0 to version 1
Version 1 makes several changes to the recording tables and indices. These
changes allow overlapping recordings to be unambiguously listed and viewed.
They also reduce the amount of I/O; in one test of retrieving playback
indexes, the number of (mostly 1024-byte) read syscalls on the database
dropped from 605 to 39.
The general upgrade procedure applies to this upgrade.

383
src/db.rs
View File

@ -49,7 +49,7 @@
//! and such to avoid database operations in these paths.
//!
//! * the `Transaction` interface allows callers to batch write operations to reduce latency and
//! SSD write samples.
//! SSD write cycles.
use error::{Error, ResultExt};
use fnv;
@ -70,17 +70,24 @@ use time;
use uuid::Uuid;
/// Expected schema version. See `guide/schema.md` for more information.
pub const EXPECTED_VERSION: i32 = 0;
pub const EXPECTED_VERSION: i32 = 1;
const GET_RECORDING_SQL: &'static str =
"select sample_file_uuid, video_index from recording where id = :id";
const GET_RECORDING_PLAYBACK_SQL: &'static str = r#"
select
sample_file_uuid,
video_index
from
recording_playback
where
composite_id = :composite_id
"#;
const DELETE_RESERVATION_SQL: &'static str =
"delete from reserved_sample_files where uuid = :uuid";
const INSERT_RESERVATION_SQL: &'static str = r#"
insert into reserved_sample_files (uuid, state)
values (:uuid, :state);
values (:uuid, :state)
"#;
/// Valid values for the `state` column in the `reserved_sample_files` table.
@ -96,38 +103,50 @@ enum ReservationState {
const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#"
insert into video_sample_entry (sha1, width, height, data)
values (:sha1, :width, :height, :data);
values (:sha1, :width, :height, :data)
"#;
const INSERT_RECORDING_SQL: &'static str = r#"
insert into recording (camera_id, sample_file_bytes, start_time_90k,
duration_90k, local_time_delta_90k, video_samples,
video_sync_samples, video_sample_entry_id,
sample_file_uuid, sample_file_sha1, video_index)
values (:camera_id, :sample_file_bytes, :start_time_90k,
:duration_90k, :local_time_delta_90k,
:video_samples, :video_sync_samples,
:video_sample_entry_id, :sample_file_uuid,
:sample_file_sha1, :video_index);
insert into recording (composite_id, camera_id, run_offset, flags, sample_file_bytes,
start_time_90k, duration_90k, local_time_delta_90k, video_samples,
video_sync_samples, video_sample_entry_id)
values (:composite_id, :camera_id, :run_offset, :flags, :sample_file_bytes,
:start_time_90k, :duration_90k, :local_time_delta_90k,
:video_samples, :video_sync_samples, :video_sample_entry_id)
"#;
const INSERT_RECORDING_PLAYBACK_SQL: &'static str = r#"
insert into recording_playback (composite_id, sample_file_uuid, sample_file_sha1, video_index)
values (:composite_id, :sample_file_uuid, :sample_file_sha1,
:video_index)
"#;
const UPDATE_NEXT_RECORDING_ID_SQL: &'static str =
"update camera set next_recording_id = :next_recording_id where id = :camera_id";
const LIST_OLDEST_SAMPLE_FILES_SQL: &'static str = r#"
select
id,
sample_file_uuid,
start_time_90k,
duration_90k,
sample_file_bytes
recording.composite_id,
recording_playback.sample_file_uuid,
recording.start_time_90k,
recording.duration_90k,
recording.sample_file_bytes
from
recording
join recording_playback on (recording.composite_id = recording_playback.composite_id)
where
camera_id = :camera_id
:start <= recording.composite_id and
recording.composite_id < :end
order by
start_time_90k
recording.composite_id
"#;
const DELETE_RECORDING_SQL: &'static str = r#"
delete from recording where id = :recording_id;
delete from recording where composite_id = :composite_id
"#;
const DELETE_RECORDING_PLAYBACK_SQL: &'static str = r#"
delete from recording_playback where composite_id = :composite_id
"#;
const CAMERA_MIN_START_SQL: &'static str = r#"
@ -137,7 +156,7 @@ const CAMERA_MIN_START_SQL: &'static str = r#"
recording
where
camera_id = :camera_id
order by start_time_90k limit 1;
order by start_time_90k limit 1
"#;
const CAMERA_MAX_START_SQL: &'static str = r#"
@ -151,6 +170,26 @@ const CAMERA_MAX_START_SQL: &'static str = r#"
order by start_time_90k desc;
"#;
const LIST_RECORDINGS_BY_ID_SQL: &'static str = r#"
select
recording.composite_id,
recording.run_offset,
recording.flags,
recording.start_time_90k,
recording.duration_90k,
recording.sample_file_bytes,
recording.video_samples,
recording.video_sync_samples,
recording.video_sample_entry_id
from
recording
where
:start <= composite_id and
composite_id < :end
order by
recording.composite_id
"#;
/// A concrete box derived from a ISO/IEC 14496-12 section 8.5.2 VisualSampleEntry box. Describes
/// the codec, width, height, etc.
#[derive(Debug)]
@ -162,42 +201,56 @@ pub struct VideoSampleEntry {
pub data: Vec<u8>,
}
/// A row used in `list_recordings`.
/// A row used in `list_recordings_by_time` and `list_recordings_by_id`.
#[derive(Debug)]
pub struct ListCameraRecordingsRow {
pub id: i64,
pub struct ListRecordingsRow {
pub start: recording::Time,
pub video_sample_entry: Arc<VideoSampleEntry>,
pub camera_id: i32,
pub id: i32,
/// This is a recording::Duration, but a single recording's duration fits into an i32.
pub duration_90k: i32,
pub video_samples: i32,
pub video_sync_samples: i32,
pub sample_file_bytes: i32,
pub video_sample_entry: Arc<VideoSampleEntry>,
pub run_offset: i32,
pub flags: i32,
}
/// A row used in `list_aggregated_recordings`.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct ListAggregatedRecordingsRow {
pub range: Range<recording::Time>,
pub time: Range<recording::Time>,
pub ids: Range<i32>,
pub video_samples: i64,
pub video_sync_samples: i64,
pub sample_file_bytes: i64,
pub video_sample_entry: Arc<VideoSampleEntry>,
pub camera_id: i32,
pub flags: i32,
pub run_start_id: i32,
}
/// Extra data about a recording, beyond what is returned by ListCameraRecordingsRow.
/// Retrieve with `get_recording`.
/// Select fields from the `recordings_playback` table. Retrieve with `get_recording_playback`.
#[derive(Debug)]
pub struct ExtraRecording {
pub struct RecordingPlayback {
pub sample_file_uuid: Uuid,
pub video_index: Vec<u8>
}
/// Bitmask in the `flags` field in the `recordings` table; see `schema.sql`.
pub enum RecordingFlags {
TrailingZero = 1,
}
/// A recording to pass to `insert_recording`.
#[derive(Debug)]
pub struct RecordingToInsert {
pub camera_id: i32,
pub run_offset: i32,
pub flags: i32,
pub sample_file_bytes: i32,
pub time: Range<recording::Time>,
pub local_time: recording::Time,
@ -214,7 +267,7 @@ pub struct RecordingToInsert {
pub struct ListOldestSampleFilesRow {
pub uuid: Uuid,
pub camera_id: i32,
pub recording_id: i64,
pub recording_id: i32,
pub time: Range<recording::Time>,
pub sample_file_bytes: i32,
}
@ -285,6 +338,8 @@ pub struct Camera {
/// Mapping of calendar day (in the server's time zone) to a summary of recordings on that day.
pub days: BTreeMap<CameraDayKey, CameraDayValue>,
next_recording_id: i32,
}
/// Adds `delta` to the day represented by `day` in the map `m`.
@ -431,8 +486,8 @@ struct State {
cameras_by_id: BTreeMap<i32, Camera>,
cameras_by_uuid: BTreeMap<Uuid, i32>,
video_sample_entries: BTreeMap<i32, Arc<VideoSampleEntry>>,
list_recordings_sql: String,
recording_cache: RefCell<LruCache<i64, Arc<ExtraRecording>, fnv::FnvBuildHasher>>,
list_recordings_by_time_sql: String,
playback_cache: RefCell<LruCache<i64, Arc<RecordingPlayback>, fnv::FnvBuildHasher>>,
}
/// A high-level transaction. This manages the SQLite transaction and the matching modification to
@ -443,10 +498,9 @@ pub struct Transaction<'a> {
tx: rusqlite::Transaction<'a>,
/// True if due to an earlier error the transaction must be rolled back rather than committed.
/// Insert and delete are two-part, requiring a delete from the `reserve_sample_files` table
/// and an insert to the `recording` table (or vice versa). If the latter half fails, the
/// former should be aborted as well. We could use savepoints (nested transactions) for this,
/// but for simplicity we just require the entire transaction be rolled back.
/// Insert and delete are multi-part. If later parts fail, earlier parts should be aborted as
/// well. We could use savepoints (nested transactions) for this, but for simplicity we just
/// require the entire transaction be rolled back.
must_rollback: bool,
/// Normally sample file uuids must be reserved prior to a recording being inserted.
@ -470,6 +524,13 @@ struct CameraModification {
/// Reset the Camera range to this value. This should be populated immediately prior to the
/// commit.
range: Option<Range<recording::Time>>,
/// Reset the next_recording_id to the specified value.
new_next_recording_id: Option<i32>,
}
fn composite_id(camera_id: i32, recording_id: i32) -> i64 {
(camera_id as i64) << 32 | recording_id as i64
}
impl<'a> Transaction<'a> {
@ -486,21 +547,28 @@ impl<'a> Transaction<'a> {
Ok(uuid)
}
/// Deletes the given recordings from the `recording` table.
/// Deletes the given recordings from the `recording` and `recording_playback` tables.
/// Note they are not fully removed from the database; the uuids are transferred to the
/// `reserved_sample_files` table. The caller should `unlink` the files, then remove the
/// reservation.
pub fn delete_recordings(&mut self, rows: &[ListOldestSampleFilesRow]) -> Result<(), Error> {
let mut del = self.tx.prepare_cached(DELETE_RECORDING_SQL)?;
let mut del1 = self.tx.prepare_cached(DELETE_RECORDING_SQL)?;
let mut del2 = self.tx.prepare_cached(DELETE_RECORDING_PLAYBACK_SQL)?;
let mut insert = self.tx.prepare_cached(INSERT_RESERVATION_SQL)?;
self.check_must_rollback()?;
self.must_rollback = true;
for row in rows {
let changes = del.execute_named(&[(":recording_id", &row.recording_id)])?;
let composite_id = &composite_id(row.camera_id, row.recording_id);
let changes = del1.execute_named(&[(":composite_id", composite_id)])?;
if changes != 1 {
return Err(Error::new(format!("no such recording {} (camera {}, uuid {})",
row.recording_id, row.camera_id, row.uuid)));
return Err(Error::new(format!("no such recording {}/{} (uuid {})",
row.camera_id, row.recording_id, row.uuid)));
}
let changes = del2.execute_named(&[(":composite_id", composite_id)])?;
if changes != 1 {
return Err(Error::new(format!("no such recording_playback {}/{} (uuid {})",
row.camera_id, row.recording_id, row.uuid)));
}
let uuid = &row.uuid.as_bytes()[..];
insert.execute_named(&[
@ -546,9 +614,10 @@ impl<'a> Transaction<'a> {
}
// Unreserve the sample file uuid and insert the recording row.
if self.state.cameras_by_id.get_mut(&r.camera_id).is_none() {
return Err(Error::new(format!("no such camera id {}", r.camera_id)));
}
let cam = match self.state.cameras_by_id.get_mut(&r.camera_id) {
None => return Err(Error::new(format!("no such camera id {}", r.camera_id))),
Some(c) => c,
};
let uuid = &r.sample_file_uuid.as_bytes()[..];
{
let mut stmt = self.tx.prepare_cached(DELETE_RESERVATION_SQL)?;
@ -558,11 +627,16 @@ impl<'a> Transaction<'a> {
}
}
self.must_rollback = true;
let mut m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, r.camera_id);
{
let recording_id = m.new_next_recording_id.unwrap_or(cam.next_recording_id);
let composite_id = composite_id(r.camera_id, recording_id);
let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_SQL)?;
let sha1 = &r.sample_file_sha1[..];
stmt.execute_named(&[
(":composite_id", &composite_id),
(":camera_id", &(r.camera_id as i64)),
(":run_offset", &r.run_offset),
(":flags", &r.flags),
(":sample_file_bytes", &r.sample_file_bytes),
(":start_time_90k", &r.time.start.0),
(":duration_90k", &(r.time.end.0 - r.time.start.0)),
@ -570,13 +644,23 @@ impl<'a> Transaction<'a> {
(":video_samples", &r.video_samples),
(":video_sync_samples", &r.video_sync_samples),
(":video_sample_entry_id", &r.video_sample_entry_id),
])?;
m.new_next_recording_id = Some(recording_id + 1);
let mut stmt = self.tx.prepare_cached(INSERT_RECORDING_PLAYBACK_SQL)?;
let sha1 = &r.sample_file_sha1[..];
stmt.execute_named(&[
(":composite_id", &composite_id),
(":sample_file_uuid", &uuid),
(":sample_file_sha1", &sha1),
(":video_index", &r.video_index),
])?;
let mut stmt = self.tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?;
stmt.execute_named(&[
(":camera_id", &(r.camera_id as i64)),
(":next_recording_id", &m.new_next_recording_id),
])?;
}
self.must_rollback = false;
let mut m = Transaction::get_mods_by_camera(&mut self.mods_by_camera, r.camera_id);
m.duration += r.time.end - r.time.start;
m.sample_file_bytes += r.sample_file_bytes as i64;
adjust_days(r.time.clone(), 1, &mut m.days);
@ -597,6 +681,9 @@ impl<'a> Transaction<'a> {
adjust_day(*k, *v, &mut camera.days);
}
camera.range = m.range.clone();
if let Some(id) = m.new_next_recording_id {
camera.next_recording_id = id;
}
}
Ok(())
}
@ -618,6 +705,7 @@ impl<'a> Transaction<'a> {
sample_file_bytes: 0,
range: None,
days: BTreeMap::new(),
new_next_recording_id: None,
}
})
}
@ -697,32 +785,53 @@ impl LockedDatabase {
/// Lists the specified recordings in ascending order, passing them to a supplied function.
/// Given that the function is called with the database lock held, it should be quick.
pub fn list_recordings<F>(&self, camera_id: i32, desired_time: &Range<recording::Time>,
mut f: F) -> Result<(), Error>
where F: FnMut(ListCameraRecordingsRow) -> Result<(), Error> {
let mut stmt = self.conn.prepare_cached(&self.state.list_recordings_sql)?;
let mut rows = stmt.query_named(&[
pub fn list_recordings_by_time<F>(&self, camera_id: i32, desired_time: Range<recording::Time>,
f: F) -> Result<(), Error>
where F: FnMut(ListRecordingsRow) -> Result<(), Error> {
let mut stmt = self.conn.prepare_cached(&self.state.list_recordings_by_time_sql)?;
let rows = stmt.query_named(&[
(":camera_id", &camera_id),
(":start_time_90k", &desired_time.start.0),
(":end_time_90k", &desired_time.end.0)])?;
self.list_recordings_inner(camera_id, rows, f)
}
pub fn list_recordings_by_id<F>(&self, camera_id: i32, desired_ids: Range<i32>, f: F)
-> Result<(), Error>
where F: FnMut(ListRecordingsRow) -> Result<(), Error> {
let mut stmt = self.conn.prepare_cached(LIST_RECORDINGS_BY_ID_SQL)?;
let rows = stmt.query_named(&[
(":start", &composite_id(camera_id, desired_ids.start)),
(":end", &composite_id(camera_id, desired_ids.end)),
])?;
self.list_recordings_inner(camera_id, rows, f)
}
fn list_recordings_inner<F>(&self, camera_id: i32, mut rows: rusqlite::Rows, mut f: F)
-> Result<(), Error>
where F: FnMut(ListRecordingsRow) -> Result<(), Error> {
while let Some(row) = rows.next() {
let row = row?;
let id = row.get_checked(0)?;
let vse_id = row.get_checked(6)?;
let id = row.get_checked::<_, i64>(0)? as i32; // drop top bits of composite_id.
let vse_id = row.get_checked(8)?;
let video_sample_entry = match self.state.video_sample_entries.get(&vse_id) {
Some(v) => v,
None => {
return Err(Error::new(format!(
"recording {} references nonexistent video_sample_entry {}", id, vse_id)));
"recording {}/{} references nonexistent video_sample_entry {}",
camera_id, id, vse_id)));
},
};
let out = ListCameraRecordingsRow{
let out = ListRecordingsRow{
camera_id: camera_id,
id: id,
start: recording::Time(row.get_checked(1)?),
duration_90k: row.get_checked(2)?,
sample_file_bytes: row.get_checked(3)?,
video_samples: row.get_checked(4)?,
video_sync_samples: row.get_checked(5)?,
run_offset: row.get_checked(1)?,
flags: row.get_checked(2)?,
start: recording::Time(row.get_checked(3)?),
duration_90k: row.get_checked(4)?,
sample_file_bytes: row.get_checked(5)?,
video_samples: row.get_checked(6)?,
video_sync_samples: row.get_checked(7)?,
video_sample_entry: video_sample_entry.clone(),
};
f(out)?;
@ -730,73 +839,101 @@ impl LockedDatabase {
Ok(())
}
/// Convenience method which calls `list_recordings` and aggregates consecutive recordings.
/// Calls `list_recordings_by_time` and aggregates consecutive recordings.
/// Rows are given to the callback in arbitrary order. Callers which care about ordering
/// should do their own sorting.
pub fn list_aggregated_recordings<F>(&self, camera_id: i32,
desired_time: &Range<recording::Time>,
desired_time: Range<recording::Time>,
forced_split: recording::Duration,
mut f: F) -> Result<(), Error>
where F: FnMut(ListAggregatedRecordingsRow) -> Result<(), Error> {
let mut agg: Option<ListAggregatedRecordingsRow> = None;
self.list_recordings(camera_id, desired_time, |row| {
let needs_flush = if let Some(ref a) = agg {
let new_dur = a.range.end - a.range.start +
where F: FnMut(&ListAggregatedRecordingsRow) -> Result<(), Error> {
// Iterate, maintaining a map from a recording_id to the aggregated row for the latest
// batch of recordings from the run starting at that id. Runs can be split into multiple
// batches for a few reasons:
//
// * forced split (when exceeding a duration limit)
// * a missing id (one that was deleted out of order)
// * video_sample_entry mismatch (if the parameters changed during a RTSP session)
//
// This iteration works because in a run, the start_time+duration of recording id r
// is equal to the start_time of recording id r+1. Thus ascending times guarantees
// ascending ids within a run. (Different runs, however, can be arbitrarily interleaved if
// their timestamps overlap. Tracking all active runs prevents that interleaving from
// causing problems.)
let mut aggs: BTreeMap<i32, ListAggregatedRecordingsRow> = BTreeMap::new();
self.list_recordings_by_time(camera_id, desired_time, |row| {
let run_start_id = row.id - row.run_offset;
let needs_flush = if let Some(a) = aggs.get(&run_start_id) {
let new_dur = a.time.end - a.time.start +
recording::Duration(row.duration_90k as i64);
a.range.end != row.start ||
row.video_sample_entry.id != a.video_sample_entry.id || new_dur >= forced_split
a.ids.end != row.id || row.video_sample_entry.id != a.video_sample_entry.id ||
new_dur >= forced_split
} else {
false
};
if needs_flush {
let a = agg.take().expect("needs_flush when agg is none");
f(a)?;
let a = aggs.remove(&run_start_id).expect("needs_flush when agg is None");
f(&a)?;
}
match agg {
None => {
agg = Some(ListAggregatedRecordingsRow{
range: row.start .. recording::Time(row.start.0 + row.duration_90k as i64),
let need_insert = if let Some(ref mut a) = aggs.get_mut(&run_start_id) {
if a.time.end != row.start {
return Err(Error::new(format!(
"camera {} recording {} ends at {}; {} starts at {}; expected same",
camera_id, a.ids.end - 1, a.time.end, row.id, row.start)));
}
a.time.end.0 += row.duration_90k as i64;
a.ids.end = row.id + 1;
a.video_samples += row.video_samples as i64;
a.video_sync_samples += row.video_sync_samples as i64;
a.sample_file_bytes += row.sample_file_bytes as i64;
false
} else {
true
};
if need_insert {
aggs.insert(run_start_id, ListAggregatedRecordingsRow{
time: row.start .. recording::Time(row.start.0 + row.duration_90k as i64),
ids: row.id .. row.id+1,
video_samples: row.video_samples as i64,
video_sync_samples: row.video_sync_samples as i64,
sample_file_bytes: row.sample_file_bytes as i64,
video_sample_entry: row.video_sample_entry,
camera_id: camera_id,
run_start_id: row.id - row.run_offset,
flags: row.flags,
});
},
Some(ref mut a) => {
a.range.end.0 += row.duration_90k as i64;
a.video_samples += row.video_samples as i64;
a.video_sync_samples += row.video_sync_samples as i64;
a.sample_file_bytes += row.sample_file_bytes as i64;
}
};
Ok(())
})?;
if let Some(a) = agg {
for a in aggs.values() {
f(a)?;
}
Ok(())
}
/// Gets extra data about a single recording.
/// Gets a single `recording_playback` row.
/// This uses a LRU cache to reduce the number of retrievals from the database.
pub fn get_recording(&self, recording_id: i64)
-> Result<Arc<ExtraRecording>, Error> {
let mut cache = self.state.recording_cache.borrow_mut();
if let Some(r) = cache.get_mut(&recording_id) {
debug!("cache hit for recording {}", recording_id);
pub fn get_recording_playback(&self, camera_id: i32, recording_id: i32)
-> Result<Arc<RecordingPlayback>, Error> {
let composite_id = composite_id(camera_id, recording_id);
let mut cache = self.state.playback_cache.borrow_mut();
if let Some(r) = cache.get_mut(&composite_id) {
trace!("cache hit for recording {}/{}", camera_id, recording_id);
return Ok(r.clone());
}
debug!("cache miss for recording {}", recording_id);
let mut stmt = self.conn.prepare_cached(GET_RECORDING_SQL)?;
let mut rows = stmt.query_named(&[(":id", &recording_id)])?;
trace!("cache miss for recording {}/{}", camera_id, recording_id);
let mut stmt = self.conn.prepare_cached(GET_RECORDING_PLAYBACK_SQL)?;
let mut rows = stmt.query_named(&[(":composite_id", &composite_id)])?;
if let Some(row) = rows.next() {
let row = row?;
let r = Arc::new(ExtraRecording{
let r = Arc::new(RecordingPlayback{
sample_file_uuid: get_uuid(&row, 0)?,
video_index: row.get_checked(1)?,
});
cache.insert(recording_id, r.clone());
cache.insert(composite_id, r.clone());
return Ok(r);
}
Err(Error::new(format!("no such recording {}", recording_id)))
Err(Error::new(format!("no such recording {}/{}", camera_id, recording_id)))
}
/// Lists all reserved sample files.
@ -816,15 +953,19 @@ impl LockedDatabase {
pub fn list_oldest_sample_files<F>(&self, camera_id: i32, mut f: F) -> Result<(), Error>
where F: FnMut(ListOldestSampleFilesRow) -> bool {
let mut stmt = self.conn.prepare_cached(LIST_OLDEST_SAMPLE_FILES_SQL)?;
let mut rows = stmt.query_named(&[(":camera_id", &(camera_id as i64))])?;
let mut rows = stmt.query_named(&[
(":start", &composite_id(camera_id, 0)),
(":end", &composite_id(camera_id + 1, 0)),
])?;
while let Some(row) = rows.next() {
let row = row?;
let start = recording::Time(row.get_checked(2)?);
let duration = recording::Duration(row.get_checked(3)?);
let composite_id: i64 = row.get_checked(0)?;
let should_continue = f(ListOldestSampleFilesRow{
recording_id: row.get_checked(0)?,
recording_id: composite_id as i32,
camera_id: (composite_id >> 32) as i32,
uuid: get_uuid(&row, 1)?,
camera_id: camera_id,
time: start .. start + duration,
sample_file_bytes: row.get_checked(4)?,
});
@ -888,7 +1029,8 @@ impl LockedDatabase {
camera.password,
camera.main_rtsp_path,
camera.sub_rtsp_path,
camera.retain_bytes
camera.retain_bytes,
next_recording_id
from
camera;
"#)?;
@ -912,6 +1054,7 @@ impl LockedDatabase {
sample_file_bytes: 0,
duration: recording::Duration(0),
days: BTreeMap::new(),
next_recording_id: row.get_checked(10)?,
});
self.state.cameras_by_uuid.insert(uuid, id);
}
@ -970,9 +1113,11 @@ pub struct Database(Mutex<LockedDatabase>);
impl Database {
/// Creates the database from a caller-supplied SQLite connection.
pub fn new(conn: rusqlite::Connection) -> Result<Database, Error> {
let list_recordings_sql = format!(r#"
let list_recordings_by_time_sql = format!(r#"
select
recording.id,
recording.composite_id,
recording.run_offset,
recording.flags,
recording.start_time_90k,
recording.duration_90k,
recording.sample_file_bytes,
@ -987,7 +1132,7 @@ impl Database {
recording.start_time_90k < :end_time_90k and
recording.start_time_90k + recording.duration_90k > :start_time_90k
order by
recording.start_time_90k
recording.composite_id
"#, recording::MAX_RECORDING_DURATION);
{
use std::error::Error as E;
@ -1025,8 +1170,8 @@ impl Database {
cameras_by_id: BTreeMap::new(),
cameras_by_uuid: BTreeMap::new(),
video_sample_entries: BTreeMap::new(),
recording_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())),
list_recordings_sql: list_recordings_sql,
playback_cache: RefCell::new(LruCache::with_hasher(1024, Default::default())),
list_recordings_by_time_sql: list_recordings_by_time_sql,
},
}));
{
@ -1078,9 +1223,9 @@ mod tests {
let uuid_bytes = &uuid.as_bytes()[..];
conn.execute_named(r#"
insert into camera (uuid, short_name, description, host, username, password,
main_rtsp_path, sub_rtsp_path, retain_bytes)
main_rtsp_path, sub_rtsp_path, retain_bytes, next_recording_id)
values (:uuid, :short_name, :description, :host, :username, :password,
:main_rtsp_path, :sub_rtsp_path, :retain_bytes)
:main_rtsp_path, :sub_rtsp_path, :retain_bytes, :next_recording_id)
"#, &[
(":uuid", &uuid_bytes),
(":short_name", &short_name),
@ -1091,6 +1236,7 @@ mod tests {
(":main_rtsp_path", &"/main"),
(":sub_rtsp_path", &"/sub"),
(":retain_bytes", &42i64),
(":next_recording_id", &0i64),
]).unwrap();
conn.last_insert_rowid() as i32
}
@ -1121,7 +1267,7 @@ mod tests {
{
let db = db.lock();
let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value());
db.list_recordings(camera_id, &all_time, |_row| {
db.list_recordings_by_time(camera_id, all_time, |_row| {
rows += 1;
Ok(())
}).unwrap();
@ -1152,7 +1298,7 @@ mod tests {
{
let db = db.lock();
let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value());
db.list_recordings(camera_id, &all_time, |row| {
db.list_recordings_by_time(camera_id, all_time, |row| {
rows += 1;
recording_id = row.id;
assert_eq!(r.time,
@ -1176,7 +1322,8 @@ mod tests {
}).unwrap();
assert_eq!(1, rows);
// TODO: get_recording.
// TODO: list_aggregated_recordings.
// TODO: get_recording_playback.
}
fn assert_unsorted_eq<T>(mut a: Vec<T>, mut b: Vec<T>)
@ -1251,10 +1398,10 @@ mod tests {
fn test_version_too_old() {
testutil::init();
let c = setup_conn();
c.execute_batch("delete from version; insert into version values (-1, 0, '');").unwrap();
c.execute_batch("delete from version; insert into version values (0, 0, '');").unwrap();
let e = Database::new(c).unwrap_err();
assert!(e.description().starts_with(
"Database schema version -1 is too old (expected 0)"), "got: {:?}",
"Database schema version 0 is too old (expected 1)"), "got: {:?}",
e.description());
}
@ -1262,10 +1409,10 @@ mod tests {
fn test_version_too_new() {
testutil::init();
let c = setup_conn();
c.execute_batch("delete from version; insert into version values (1, 0, '');").unwrap();
c.execute_batch("delete from version; insert into version values (2, 0, '');").unwrap();
let e = Database::new(c).unwrap_err();
assert!(e.description().starts_with(
"Database schema version 1 is too new (expected 0)"), "got: {:?}", e.description());
"Database schema version 2 is too new (expected 1)"), "got: {:?}", e.description());
}
/// Basic test of running some queries on a fresh database.
@ -1310,6 +1457,8 @@ mod tests {
let recording = RecordingToInsert{
camera_id: camera_id,
sample_file_bytes: 42,
run_offset: 0,
flags: 0,
time: start .. start + recording::Duration(TIME_UNITS_PER_SEC),
local_time: start,
video_samples: 1,

View File

@ -33,9 +33,9 @@
//! This includes opening files for serving, rotating away old files, and saving new files.
use db;
use error::Error;
use libc;
use recording;
use error::Error;
use openssl::crypto::hash;
use std::ffi;
use std::fs;
@ -121,7 +121,7 @@ impl SampleFileDir {
/// directory has sufficient space for a couple recordings per camera in addition to the
/// cameras' total `retain_bytes`.
pub fn create_writer<'a>(&self, channel: &'a SyncerChannel, start: recording::Time,
local_start: recording::Time, camera_id: i32,
local_start: recording::Time, run_offset: i32, camera_id: i32,
video_sample_entry_id: i32) -> Result<Writer<'a>, Error> {
// Grab the next uuid. Typically one is cached—a sync has usually completed since the last
// writer was created, and syncs ensure `next_uuid` is filled while performing their
@ -145,7 +145,8 @@ impl SampleFileDir {
return Err(e.into());
},
};
Writer::open(f, uuid, start, local_start, camera_id, video_sample_entry_id, channel)
Writer::open(f, uuid, start, local_start, run_offset, camera_id, video_sample_entry_id,
channel)
}
/// Opens a sample file within this directory with the given flags and (if creating) mode.
@ -428,6 +429,7 @@ struct InnerWriter<'a> {
local_time: recording::Time,
camera_id: i32,
video_sample_entry_id: i32,
run_offset: i32,
/// A sample which has been written to disk but not added to `index`. Index writes are one
/// sample behind disk writes because the duration of a sample is the difference between its
@ -446,8 +448,8 @@ struct UnflushedSample {
impl<'a> Writer<'a> {
/// Opens the writer; for use by `SampleFileDir` (which should supply `f`).
fn open(f: fs::File, uuid: Uuid, start_time: recording::Time, local_time: recording::Time,
camera_id: i32, video_sample_entry_id: i32, syncer_channel: &'a SyncerChannel)
-> Result<Self, Error> {
run_offset: i32, camera_id: i32, video_sample_entry_id: i32,
syncer_channel: &'a SyncerChannel) -> Result<Self, Error> {
Ok(Writer(Some(InnerWriter{
syncer_channel: syncer_channel,
f: f,
@ -459,6 +461,7 @@ impl<'a> Writer<'a> {
local_time: local_time,
camera_id: camera_id,
video_sample_entry_id: video_sample_entry_id,
run_offset: run_offset,
unflushed_sample: None,
})))
}
@ -530,6 +533,8 @@ impl<'a> InnerWriter<'a> {
sample_file_uuid: self.uuid,
video_index: self.index.video_index,
sample_file_sha1: sha1_bytes,
run_offset: self.run_offset,
flags: 0, // TODO
};
self.syncer_channel.async_save_recording(recording, self.f);
Ok(end)

View File

@ -81,6 +81,7 @@ mod stream;
mod streamer;
mod strutil;
#[cfg(test)] mod testutil;
mod upgrade;
mod web;
/// Commandline usage string. This is in the particular format expected by the `docopt` crate.
@ -88,20 +89,30 @@ mod web;
/// allowed commandline arguments and their defaults.
const USAGE: &'static str = "
Usage: moonfire-nvr [options]
moonfire-nvr --upgrade [options]
moonfire-nvr (--help | --version)
Options:
-h, --help Show this message.
--version Show the version of moonfire-nvr.
--db-dir DIR Set the directory holding the SQLite3 index database.
--db-dir=DIR Set the directory holding the SQLite3 index database.
This is typically on a flash device.
[default: /var/lib/moonfire-nvr/db]
--sample-file-dir DIR Set the directory holding video data.
--sample-file-dir=DIR Set the directory holding video data.
This is typically on a hard drive.
[default: /var/lib/moonfire-nvr/sample]
--http-addr ADDR Set the bind address for the unencrypted HTTP server.
--http-addr=ADDR Set the bind address for the unencrypted HTTP server.
[default: 0.0.0.0:8080]
--read-only Forces read-only mode / disables recording.
--preset-journal=MODE With --upgrade, resets the SQLite journal_mode to
the specified mode prior to the upgrade. The default,
delete, is recommended. off is very dangerous but
may be desirable in some circumstances. See
guide/schema.md for more information. The journal
mode will be reset to wal after the upgrade.
[default: delete]
--no-vacuum With --upgrade, skips the normal post-upgrade vacuum
operation.
";
/// Commandline arguments corresponding to `USAGE`; automatically filled by the `docopt` crate.
@ -111,9 +122,18 @@ struct Args {
flag_sample_file_dir: String,
flag_http_addr: String,
flag_read_only: bool,
flag_upgrade: bool,
flag_no_vacuum: bool,
flag_preset_journal: String,
}
fn main() {
// Parse commandline arguments.
let version = "Moonfire NVR 0.1.0".to_owned();
let args: Args = docopt::Docopt::new(USAGE)
.and_then(|d| d.version(Some(version)).decode())
.unwrap_or_else(|e| e.exit());
// Watch for termination signals.
// This must be started before any threads are spawned (such as the async logger thread) so
// that signals will be blocked in all threads.
@ -124,12 +144,6 @@ fn main() {
let drain = slog_envlogger::new(drain);
slog_stdlog::set_logger(slog::Logger::root(drain.ignore_err(), None)).unwrap();
// Parse commandline arguments.
let version = "Moonfire NVR 0.1.0".to_owned();
let args: Args = docopt::Docopt::new(USAGE)
.and_then(|d| d.version(Some(version)).decode())
.unwrap_or_else(|e| e.exit());
// Open the database and populate cached state.
let db_dir = dir::Fd::open(&args.flag_db_dir).unwrap();
db_dir.lock(if args.flag_read_only { libc::LOCK_SH } else { libc::LOCK_EX } | libc::LOCK_NB)
@ -144,6 +158,15 @@ fn main() {
// rusqlite::Connection is not Sync, so there's no reason to tell SQLite3 to use the
// serialized threading mode.
rusqlite::SQLITE_OPEN_NO_MUTEX).unwrap();
if args.flag_upgrade {
upgrade::run(conn, &args.flag_preset_journal, args.flag_no_vacuum).unwrap();
} else {
run(args, conn, &signal);
}
}
fn run(args: Args, conn: rusqlite::Connection, signal: &chan::Receiver<chan_signal::Signal>) {
let db = Arc::new(db::Database::new(conn).unwrap());
let dir = dir::SampleFileDir::new(&args.flag_sample_file_dir, db.clone()).unwrap();
info!("Database is loaded.");

View File

@ -543,11 +543,16 @@ impl Mp4FileBuilder {
self.segments.reserve(additional);
}
pub fn len(&self) -> usize { self.segments.len() }
/// Appends a segment for (a subset of) the given recording.
pub fn append(&mut self, db: &MutexGuard<db::LockedDatabase>, row: db::ListCameraRecordingsRow,
pub fn append(&mut self, db: &MutexGuard<db::LockedDatabase>, row: db::ListRecordingsRow,
rel_range_90k: Range<i32>) -> Result<()> {
if let Some(prev) = self.segments.last() {
if prev.s.have_trailing_zero {
return Err(Error::new(format!(
"unable to append recording {}/{} after recording {}/{} with trailing zero",
row.camera_id, row.id, prev.s.camera_id, prev.s.recording_id)));
}
}
self.segments.push(Mp4Segment{
s: recording::Segment::new(db, &row, rel_range_90k)?,
index: RefCell::new(None),
@ -591,7 +596,8 @@ impl Mp4FileBuilder {
// Update the etag to reflect this segment.
let mut data = [0_u8; 24];
let mut cursor = io::Cursor::new(&mut data[..]);
cursor.write_i64::<BigEndian>(s.s.id)?;
cursor.write_i32::<BigEndian>(s.s.camera_id)?;
cursor.write_i32::<BigEndian>(s.s.recording_id)?;
cursor.write_i64::<BigEndian>(s.s.start.0)?;
cursor.write_i32::<BigEndian>(d.start)?;
cursor.write_i32::<BigEndian>(d.end)?;
@ -1129,7 +1135,8 @@ impl Mp4File {
fn write_video_sample_data(&self, i: usize, r: Range<u64>, out: &mut io::Write) -> Result<()> {
let s = &self.segments[i];
let f = self.dir.open_sample_file(self.db.lock().get_recording(s.s.id)?.sample_file_uuid)?;
let rec = self.db.lock().get_recording_playback(s.s.camera_id, s.s.recording_id)?;
let f = self.dir.open_sample_file(rec.sample_file_uuid)?;
mmapfile::MmapFileSlice::new(f, s.s.sample_file_range()).write_to(r, out)
}
@ -1180,8 +1187,8 @@ mod tests {
use byteorder::{BigEndian, ByteOrder};
use db;
use dir;
use ffmpeg;
use error::Error;
use ffmpeg;
#[cfg(nightly)] use hyper;
use hyper::header;
use openssl::crypto::hash;
@ -1217,10 +1224,10 @@ mod tests {
fn flush(&mut self) -> io::Result<()> { Ok(()) }
}
/// Returns the SHA-1 digest of the given `Resource`.
fn digest(r: &http_entity::Entity<Error>) -> Vec<u8> {
/// Returns the SHA-1 digest of the given `Entity`.
fn digest(e: &http_entity::Entity<Error>) -> Vec<u8> {
let mut sha1 = Sha1::new();
r.write_to(0 .. r.len(), &mut sha1).unwrap();
e.write_to(0 .. e.len(), &mut sha1).unwrap();
sha1.finish()
}
@ -1401,7 +1408,7 @@ mod tests {
let extra_data = input.get_extra_data().unwrap();
let video_sample_entry_id = db.db.lock().insert_video_sample_entry(
extra_data.width, extra_data.height, &extra_data.sample_entry).unwrap();
let mut output = db.dir.create_writer(&db.syncer_channel, START_TIME, START_TIME,
let mut output = db.dir.create_writer(&db.syncer_channel, START_TIME, START_TIME, 0,
TEST_CAMERA_ID, video_sample_entry_id).unwrap();
// end_pts is the pts of the end of the most recent frame (start + duration).
@ -1435,6 +1442,7 @@ mod tests {
let mut recording = db::RecordingToInsert{
camera_id: TEST_CAMERA_ID,
sample_file_bytes: 30104460,
flags: 0,
time: START_TIME .. (START_TIME + DURATION),
local_time: START_TIME,
video_samples: 1800,
@ -1443,6 +1451,7 @@ mod tests {
sample_file_uuid: Uuid::nil(),
video_index: data,
sample_file_sha1: [0; 20],
run_index: 0,
};
let mut tx = db.tx().unwrap();
tx.bypass_reservation_for_testing = true;
@ -1451,6 +1460,7 @@ mod tests {
recording.time.start += DURATION;
recording.local_time += DURATION;
recording.time.end += DURATION;
recording.run_index += 1;
}
tx.commit().unwrap();
}
@ -1462,7 +1472,7 @@ mod tests {
let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value());
{
let db = db.lock();
db.list_recordings(TEST_CAMERA_ID, &all_time, |r| {
db.list_recordings_by_time(TEST_CAMERA_ID, all_time, |r| {
let d = r.duration_90k;
assert!(skip_90k + shorten_90k < d);
builder.append(&db, r, skip_90k .. d - shorten_90k).unwrap();
@ -1658,7 +1668,7 @@ mod tests {
// combine ranges from the new format with ranges from the old format.
let sha1 = digest(&mp4);
assert_eq!("1e5331e8371bd97ac3158b3a86494abc87cdc70e", strutil::hex(&sha1[..]));
const EXPECTED_ETAG: &'static str = "3c48af4dbce2024db07f27a00789b6af774a8c89";
const EXPECTED_ETAG: &'static str = "908ae8ac303f66f2f4a1f8f52dba8f6ea9fdb442";
assert_eq!(Some(&header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag());
drop(db.syncer_channel);
db.syncer_join.join().unwrap();
@ -1678,7 +1688,7 @@ mod tests {
// combine ranges from the new format with ranges from the old format.
let sha1 = digest(&mp4);
assert_eq!("de382684a471f178e4e3a163762711b0653bfd83", strutil::hex(&sha1[..]));
const EXPECTED_ETAG: &'static str = "c24d7af372e5d8f66f4feb6e3a5cd43828392371";
const EXPECTED_ETAG: &'static str = "e21c6a6dfede1081db3701cc595ec267c43c2bff";
assert_eq!(Some(&header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag());
drop(db.syncer_channel);
db.syncer_join.join().unwrap();
@ -1698,7 +1708,7 @@ mod tests {
// combine ranges from the new format with ranges from the old format.
let sha1 = digest(&mp4);
assert_eq!("685e026af44204bc9cc52115c5e17058e9fb7c70", strutil::hex(&sha1[..]));
const EXPECTED_ETAG: &'static str = "870e2b3cfef4a988951344b32e53af0d4496894d";
const EXPECTED_ETAG: &'static str = "1d5c5980f6ba08a4dd52dfd785667d42cdb16992";
assert_eq!(Some(&header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag());
drop(db.syncer_channel);
db.syncer_join.join().unwrap();
@ -1718,7 +1728,7 @@ mod tests {
// combine ranges from the new format with ranges from the old format.
let sha1 = digest(&mp4);
assert_eq!("e0d28ddf08e24575a82657b1ce0b2da73f32fd88", strutil::hex(&sha1[..]));
const EXPECTED_ETAG: &'static str = "71c329188a2cd175c8d61492a9789e242af06c05";
const EXPECTED_ETAG: &'static str = "555de64b39615e1a1cbe5bdd565ff197f5f126c5";
assert_eq!(Some(&header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag());
drop(db.syncer_channel);
db.syncer_join.join().unwrap();

View File

@ -340,7 +340,8 @@ impl SampleIndexEncoder {
/// A segment represents a view of some or all of a single recording, starting from a key frame.
/// Used by the `Mp4FileBuilder` class to splice together recordings into a single virtual .mp4.
pub struct Segment {
pub id: i64,
pub camera_id: i32,
pub recording_id: i32,
pub start: Time,
begin: SampleIndexIterator,
pub file_end: i32,
@ -349,6 +350,7 @@ pub struct Segment {
pub frames: i32,
pub key_frames: i32,
pub video_sample_entry_id: i32,
pub have_trailing_zero: bool,
}
impl Segment {
@ -360,10 +362,11 @@ impl Segment {
/// undesired portion.) It will end at the first frame after the desired range (unless the
/// desired range extends beyond the recording).
pub fn new(db: &MutexGuard<db::LockedDatabase>,
recording: &db::ListCameraRecordingsRow,
recording: &db::ListRecordingsRow,
desired_range_90k: Range<i32>) -> Result<Segment, Error> {
let mut self_ = Segment{
id: recording.id,
camera_id: recording.camera_id,
recording_id: recording.id,
start: recording.start,
begin: SampleIndexIterator::new(),
file_end: recording.sample_file_bytes,
@ -372,6 +375,7 @@ impl Segment {
frames: recording.video_samples,
key_frames: recording.video_sync_samples,
video_sample_entry_id: recording.video_sample_entry.id,
have_trailing_zero: (recording.flags & db::RecordingFlags::TrailingZero as i32) != 0,
};
if self_.desired_range_90k.start > self_.desired_range_90k.end ||
@ -388,8 +392,8 @@ impl Segment {
}
// Slow path. Need to iterate through the index.
let extra = db.get_recording(self_.id)?;
let data = &(&extra).video_index;
let playback = db.get_recording_playback(self_.camera_id, self_.recording_id)?;
let data = &(&playback).video_index;
let mut it = SampleIndexIterator::new();
if !it.next(data)? {
return Err(Error{description: String::from("no index"),
@ -429,6 +433,7 @@ impl Segment {
}
self_.file_end = it.pos;
self_.actual_end_90k = it.start_90k;
self_.have_trailing_zero = it.duration_90k == 0;
Ok(self_)
}
@ -443,38 +448,44 @@ impl Segment {
pub fn foreach<F>(&self, db: &db::Database, mut f: F) -> Result<(), Error>
where F: FnMut(&SampleIndexIterator) -> Result<(), Error>
{
let extra = db.lock().get_recording(self.id)?;
let data = &(&extra).video_index;
trace!("foreach on recording {}/{}: {} frames, actual_time_90k: {:?}",
self.camera_id, self.recording_id, self.frames, self.actual_time_90k());
let playback = db.lock().get_recording_playback(self.camera_id, self.recording_id)?;
let data = &(&playback).video_index;
let mut it = self.begin;
if it.i == 0 {
if !it.next(data)? {
return Err(Error::new(format!("recording {}: no frames", self.id)));
return Err(Error::new(format!("recording {}/{}: no frames",
self.camera_id, self.recording_id)));
}
if !it.is_key {
return Err(Error::new(format!("recording {}: doesn't start with key frame",
self.id)));
return Err(Error::new(format!("recording {}/{}: doesn't start with key frame",
self.camera_id, self.recording_id)));
}
}
let mut have_frame = true;
let mut key_frame = 0;
for i in 0 .. self.frames {
if !have_frame {
return Err(Error::new(format!("recording {}: expected {} frames, found only {}",
self.id, self.frames, i+1)));
return Err(Error::new(format!("recording {}/{}: expected {} frames, found only {}",
self.camera_id, self.recording_id, self.frames,
i+1)));
}
if it.is_key {
key_frame += 1;
if key_frame > self.key_frames {
return Err(Error::new(format!("recording {}: more than expected {} key frames",
self.id, self.key_frames)));
return Err(Error::new(format!(
"recording {}/{}: more than expected {} key frames",
self.camera_id, self.recording_id, self.key_frames)));
}
}
f(&it)?;
have_frame = it.next(data)?;
}
if key_frame < self.key_frames {
return Err(Error::new(format!("recording {}: expected {} key frames, found only {}",
self.id, self.key_frames, key_frame)));
return Err(Error::new(format!("recording {}/{}: expected {} key frames, found only {}",
self.camera_id, self.recording_id, self.key_frames,
key_frame)));
}
Ok(())
}

View File

@ -31,8 +31,6 @@
-- schema.sql: SQLite3 database schema for Moonfire NVR.
-- See also design/schema.md.
--pragma journal_mode = wal;
-- This table tracks the schema version.
-- There is one row for the initial database creation (inserted below, after the
-- create statements) and one for each upgrade procedure (if any).
@ -49,10 +47,10 @@ create table version (
create table camera (
id integer primary key,
uuid blob unique,-- not null check (length(uuid) = 16),
uuid blob unique not null check (length(uuid) = 16),
-- A short name of the camera, used in log messages.
short_name text,-- not null,
short_name text not null,
-- A short description of the camera.
description text,
@ -77,14 +75,42 @@ create table camera (
-- The number of bytes of video to retain, excluding the currently-recording
-- file. Older files will be deleted as necessary to stay within this limit.
retain_bytes integer not null check (retain_bytes >= 0)
retain_bytes integer not null check (retain_bytes >= 0),
-- The low 32 bits of the next recording id to assign for this camera.
-- Typically this is the maximum current recording + 1, but it does
-- not decrease if that recording is deleted.
next_recording_id integer not null check (next_recording_id >= 0)
);
-- Each row represents a single completed recorded segment of video.
-- Recordings are typically ~60 seconds; never more than 5 minutes.
create table recording (
id integer primary key,
camera_id integer references camera (id) not null,
-- The high 32 bits of composite_id are taken from the camera's id, which
-- improves locality. The low 32 bits are taken from the camera's
-- next_recording_id (which should be post-incremented in the same
-- transaction). It'd be simpler to use a "without rowid" table and separate
-- fields to make up the primary key, but
-- <https://www.sqlite.org/withoutrowid.html> points out that "without rowid"
-- is not appropriate when the average row size is in excess of 50 bytes.
-- These rows are typically 1--5 KiB.
composite_id integer primary key,
-- This field is redundant with id above, but used to enforce the reference
-- constraint and to structure the recording_start_time index.
camera_id integer not null references camera (id),
-- The offset of this recording within a run. 0 means this was the first
-- recording made from a RTSP session. The start of the run has id
-- (id-run_offset).
run_offset integer not null,
-- flags is a bitmask:
--
-- * 1, or "trailing zero", indicates that this recording is the last in a
-- stream. As the duration of a sample is not known until the next sample
-- is received, the final sample in this recording will have duration 0.
flags integer not null,
sample_file_bytes integer not null check (sample_file_bytes > 0),
@ -100,18 +126,16 @@ create table recording (
-- The number of 90 kHz units the local system time is ahead of the
-- recording; negative numbers indicate the local system time is behind
-- the recording. Large values would indicate that the local time has jumped
-- during recording or that the local time and camera time frequencies do
-- not match.
-- the recording. Large absolute values would indicate that the local time
-- has jumped during recording or that the local time and camera time
-- frequencies do not match.
local_time_delta_90k integer not null,
video_samples integer not null check (video_samples > 0),
video_sync_samples integer not null check (video_samples > 0),
video_sample_entry_id integer references video_sample_entry (id),
sample_file_uuid blob not null check (length(sample_file_uuid) = 16),
sample_file_sha1 blob not null check (length(sample_file_sha1) = 20),
video_index blob not null check (length(video_index) > 0)
check (composite_id >> 32 = camera_id)
);
create index recording_cover on recording (
@ -124,11 +148,17 @@ create index recording_cover on recording (
-- to consult the underlying row.
duration_90k,
video_samples,
video_sync_samples,
video_sample_entry_id,
sample_file_bytes
);
create table recording_playback (
composite_id integer primary key references recording (composite_id),
sample_file_uuid blob not null check (length(sample_file_uuid) = 16),
sample_file_sha1 blob not null check (length(sample_file_sha1) = 20),
video_index blob not null check (length(video_index) > 0)
);
-- Files in the sample file directory which may be present but should simply be
-- discarded on startup. (Recordings which were never completed or have been
-- marked for completion.)
@ -156,4 +186,4 @@ create table video_sample_entry (
);
insert into version (id, unix_time, notes)
values (0, cast(strftime('%s', 'now') as int), 'db creation');
values (1, cast(strftime('%s', 'now') as int), 'db creation');

View File

@ -117,6 +117,7 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
let mut writer: Option<dir::Writer> = None;
let mut transformed = Vec::new();
let mut next_start = None;
let mut run_index = -1;
while !self.shutdown.load(Ordering::SeqCst) {
let pkt = stream.get_next()?;
let pts = pkt.pts().ok_or_else(|| Error::new("packet with no pts".to_owned()))?;
@ -144,9 +145,10 @@ impl<'a, C, S> Streamer<'a, C, S> where C: 'a + Clock, S: 'a + stream::Stream {
if r <= frame_realtime.sec { r + self.rotate_interval_sec } else { r });
let local_realtime = recording::Time::new(frame_realtime);
run_index += 1;
self.dir.create_writer(&self.syncer_channel,
next_start.unwrap_or(local_realtime), local_realtime,
self.camera_id, video_sample_entry_id)?
run_index, self.camera_id, video_sample_entry_id)?
},
};
let orig_data = match pkt.data() {
@ -276,8 +278,9 @@ mod tests {
is_key: bool,
}
fn get_frames(db: &MutexGuard<db::LockedDatabase>, recording_id: i64) -> Vec<Frame> {
let rec = db.get_recording(recording_id).unwrap();
fn get_frames(db: &MutexGuard<db::LockedDatabase>, camera_id: i32, recording_id: i32)
-> Vec<Frame> {
let rec = db.get_recording_playback(camera_id, recording_id).unwrap();
let mut it = recording::SampleIndexIterator::new();
let mut frames = Vec::new();
while it.next(&rec.video_index).unwrap() {
@ -328,7 +331,7 @@ mod tests {
// Compare frame-by-frame. Note below that while the rotation is scheduled to happen near
// 5-second boundaries (such as 2016-04-26 00:00:05), it gets deferred until the next key
// frame, which in this case is 00:00:07.
assert_eq!(get_frames(&db, 1), &[
assert_eq!(get_frames(&db, testutil::TEST_CAMERA_ID, 1), &[
Frame{start_90k: 0, duration_90k: 90379, is_key: true},
Frame{start_90k: 90379, duration_90k: 89884, is_key: false},
Frame{start_90k: 180263, duration_90k: 89749, is_key: false},
@ -338,7 +341,7 @@ mod tests {
Frame{start_90k: 540015, duration_90k: 90021, is_key: false},
Frame{start_90k: 630036, duration_90k: 89958, is_key: false},
]);
assert_eq!(get_frames(&db, 2), &[
assert_eq!(get_frames(&db, testutil::TEST_CAMERA_ID, 2), &[
Frame{start_90k: 0, duration_90k: 90011, is_key: true},
Frame{start_90k: 90011, duration_90k: 0, is_key: false},
]);

View File

@ -88,9 +88,9 @@ impl TestDb {
let uuid_bytes = &TEST_CAMERA_UUID.as_bytes()[..];
conn.execute_named(r#"
insert into camera (uuid, short_name, description, host, username, password,
main_rtsp_path, sub_rtsp_path, retain_bytes)
main_rtsp_path, sub_rtsp_path, retain_bytes, next_recording_id)
values (:uuid, :short_name, :description, :host, :username, :password,
:main_rtsp_path, :sub_rtsp_path, :retain_bytes)
:main_rtsp_path, :sub_rtsp_path, :retain_bytes, :next_recording_id)
"#, &[
(":uuid", &uuid_bytes),
(":short_name", &"test camera"),
@ -101,6 +101,7 @@ impl TestDb {
(":main_rtsp_path", &"/main"),
(":sub_rtsp_path", &"/sub"),
(":retain_bytes", &1048576i64),
(":next_recording_id", &1i64),
]).unwrap();
assert_eq!(TEST_CAMERA_ID as i64, conn.last_insert_rowid());
let db = sync::Arc::new(db::Database::new(conn).unwrap());
@ -117,7 +118,7 @@ impl TestDb {
}
pub fn create_recording_from_encoder(&self, encoder: recording::SampleIndexEncoder)
-> db::ListCameraRecordingsRow {
-> db::ListRecordingsRow {
let mut db = self.db.lock();
let video_sample_entry_id =
db.insert_video_sample_entry(1920, 1080, &[0u8; 100]).unwrap();
@ -137,12 +138,15 @@ impl TestDb {
sample_file_uuid: Uuid::nil(),
video_index: encoder.video_index,
sample_file_sha1: [0u8; 20],
run_offset: 0, // TODO
flags: 0, // TODO
}).unwrap();
tx.commit().unwrap();
}
let mut row = None;
let all_time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value());
db.list_recordings(TEST_CAMERA_ID, &all_time, |r| { row = Some(r); Ok(()) }).unwrap();
db.list_recordings_by_time(TEST_CAMERA_ID, all_time,
|r| { row = Some(r); Ok(()) }).unwrap();
row.unwrap()
}
}

94
src/upgrade/mod.rs Normal file
View File

@ -0,0 +1,94 @@
// This file is part of Moonfire NVR, a security camera digital video recorder.
// Copyright (C) 2016 Scott Lamb <slamb@slamb.org>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// In addition, as a special exception, the copyright holders give
// permission to link the code of portions of this program with the
// OpenSSL library under certain conditions as described in each
// individual source file, and distribute linked combinations including
// the two.
//
// You must obey the GNU General Public License in all respects for all
// of the code used other than OpenSSL. If you modify file(s) with this
// exception, you may extend this exception to your version of the
// file(s), but you are not obligated to do so. If you do not wish to do
// so, delete this exception statement from your version. If you delete
// this exception statement from all source files in the program, then
// also delete it here.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
/// Upgrades the database schema.
///
/// See `guide/schema.md` for more information.
use db;
use error::Error;
use rusqlite;
mod v0_to_v1;
const UPGRADE_NOTES: &'static str =
concat!("upgraded using moonfire-nvr ", env!("CARGO_PKG_VERSION"));
const UPGRADERS: [fn(&rusqlite::Transaction) -> Result<(), Error>; 1] = [
v0_to_v1::run,
];
fn set_journal_mode(conn: &rusqlite::Connection, requested: &str) -> Result<(), Error> {
assert!(!requested.contains(';')); // quick check for accidental sql injection.
let actual = conn.query_row(&format!("pragma journal_mode = {}", requested), &[],
|row| row.get_checked::<_, String>(0))??;
info!("...database now in journal_mode {} (requested {}).", actual, requested);
Ok(())
}
pub fn run(mut conn: rusqlite::Connection, preset_journal: &str,
no_vacuum: bool) -> Result<(), Error> {
{
assert_eq!(UPGRADERS.len(), db::EXPECTED_VERSION as usize);
let old_ver =
conn.query_row("select max(id) from version", &[], |row| row.get_checked(0))??;
if old_ver > db::EXPECTED_VERSION {
return Err(Error::new(format!("Database is at version {}, later than expected {}",
old_ver, db::EXPECTED_VERSION)))?;
} else if old_ver < 0 {
return Err(Error::new(format!("Database is at negative version {}!", old_ver)));
}
info!("Upgrading database from version {} to version {}...", old_ver, db::EXPECTED_VERSION);
set_journal_mode(&conn, preset_journal).unwrap();
for ver in old_ver .. db::EXPECTED_VERSION {
info!("...from version {} to version {}", ver, ver + 1);
let tx = conn.transaction()?;
UPGRADERS[ver as usize](&tx)?;
tx.execute(r#"
insert into version (id, unix_time, notes)
values (?, cast(strftime('%s', 'now') as int32), ?)
"#, &[&(ver + 1), &UPGRADE_NOTES])?;
tx.commit()?;
}
}
// WAL is the preferred journal mode for normal operation; it reduces the number of syncs
// without compromising safety.
set_journal_mode(&conn, "wal").unwrap();
if !no_vacuum {
info!("...vacuuming database after upgrade.");
conn.execute_batch(r#"
pragma page_size = 16384;
vacuum;
"#).unwrap();
}
info!("...done.");
Ok(())
}

235
src/upgrade/v0_to_v1.rs Normal file
View File

@ -0,0 +1,235 @@
// This file is part of Moonfire NVR, a security camera digital video recorder.
// Copyright (C) 2016 Scott Lamb <slamb@slamb.org>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// In addition, as a special exception, the copyright holders give
// permission to link the code of portions of this program with the
// OpenSSL library under certain conditions as described in each
// individual source file, and distribute linked combinations including
// the two.
//
// You must obey the GNU General Public License in all respects for all
// of the code used other than OpenSSL. If you modify file(s) with this
// exception, you may extend this exception to your version of the
// file(s), but you are not obligated to do so. If you do not wish to do
// so, delete this exception statement from your version. If you delete
// this exception statement from all source files in the program, then
// also delete it here.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
/// Upgrades a version 0 schema to a version 1 schema.
use db;
use error::Error;
use recording;
use rusqlite;
use std::collections::HashMap;
pub fn run(tx: &rusqlite::Transaction) -> Result<(), Error> {
// These create statements match the schema.sql when version 1 was the latest.
tx.execute_batch(r#"
alter table camera rename to old_camera;
create table camera (
id integer primary key,
uuid blob unique,
short_name text not null,
description text,
host text,
username text,
password text,
main_rtsp_path text,
sub_rtsp_path text,
retain_bytes integer not null check (retain_bytes >= 0),
next_recording_id integer not null check (next_recording_id >= 0)
);
alter table recording rename to old_recording;
drop index recording_cover;
create table recording (
composite_id integer primary key,
camera_id integer not null references camera (id),
run_offset integer not null,
flags integer not null,
sample_file_bytes integer not null check (sample_file_bytes > 0),
start_time_90k integer not null check (start_time_90k > 0),
duration_90k integer not null
check (duration_90k >= 0 and duration_90k < 5*60*90000),
local_time_delta_90k integer not null,
video_samples integer not null check (video_samples > 0),
video_sync_samples integer not null check (video_samples > 0),
video_sample_entry_id integer references video_sample_entry (id),
check (composite_id >> 32 = camera_id)
);
create index recording_cover on recording (
start_time_90k,
duration_90k,
video_samples,
video_sample_entry_id,
sample_file_bytes
);
create table recording_playback (
composite_id integer primary key references recording (composite_id),
sample_file_uuid blob not null check (length(sample_file_uuid) = 16),
sample_file_sha1 blob not null check (length(sample_file_sha1) = 20),
video_index blob not null check (length(video_index) > 0)
);
"#)?;
let camera_state = fill_recording(tx).unwrap();
fill_camera(tx, camera_state).unwrap();
tx.execute_batch(r#"
drop table old_camera;
drop table old_recording;
"#)?;
Ok(())
}
struct CameraState {
/// tuple of (run_start_id, next_start_90k).
current_run: Option<(i64, i64)>,
/// As in the `next_recording_id` field of the `camera` table.
next_recording_id: i32,
}
/// Fills the `recording` and `recording_playback` tables from `old_recording`, returning
/// the `camera_state` map for use by a following call to `fill_cameras`.
fn fill_recording(tx: &rusqlite::Transaction) -> Result<HashMap<i32, CameraState>, Error> {
let mut select = tx.prepare(r#"
select
camera_id,
sample_file_bytes,
start_time_90k,
duration_90k,
local_time_delta_90k,
video_samples,
video_sync_samples,
video_sample_entry_id,
sample_file_uuid,
sample_file_sha1,
video_index
from
old_recording
"#)?;
let mut insert1 = tx.prepare(r#"
insert into recording values (:composite_id, :camera_id, :run_offset, :flags,
:sample_file_bytes, :start_time_90k, :duration_90k,
:local_time_delta_90k, :video_samples, :video_sync_samples,
:video_sample_entry_id)
"#)?;
let mut insert2 = tx.prepare(r#"
insert into recording_playback values (:composite_id, :sample_file_uuid, :sample_file_sha1,
:video_index)
"#)?;
let mut rows = select.query(&[])?;
let mut camera_state: HashMap<i32, CameraState> = HashMap::new();
while let Some(row) = rows.next() {
let row = row?;
let camera_id: i32 = row.get_checked(0)?;
let camera_state = camera_state.entry(camera_id).or_insert_with(|| {
CameraState{
current_run: None,
next_recording_id: 1,
}
});
let composite_id = ((camera_id as i64) << 32) | (camera_state.next_recording_id as i64);
camera_state.next_recording_id += 1;
let sample_file_bytes: i32 = row.get_checked(1)?;
let start_time_90k: i64 = row.get_checked(2)?;
let duration_90k: i32 = row.get_checked(3)?;
let local_time_delta_90k: i64 = row.get_checked(4)?;
let video_samples: i32 = row.get_checked(5)?;
let video_sync_samples: i32 = row.get_checked(6)?;
let video_sample_entry_id: i32 = row.get_checked(7)?;
let sample_file_uuid: Vec<u8> = row.get_checked(8)?;
let sample_file_sha1: Vec<u8> = row.get_checked(9)?;
let video_index: Vec<u8> = row.get_checked(10)?;
let trailing_zero = {
let mut it = recording::SampleIndexIterator::new();
while it.next(&video_index)? {}
it.duration_90k == 0
};
let run_id = match camera_state.current_run {
Some((run_id, expected_start)) if expected_start == start_time_90k => run_id,
_ => composite_id,
};
insert1.execute_named(&[
(":composite_id", &composite_id),
(":camera_id", &camera_id),
(":run_offset", &(composite_id - run_id)),
(":flags", &(if trailing_zero { db::RecordingFlags::TrailingZero as i32 } else { 0 })),
(":sample_file_bytes", &sample_file_bytes),
(":start_time_90k", &start_time_90k),
(":duration_90k", &duration_90k),
(":local_time_delta_90k", &local_time_delta_90k),
(":video_samples", &video_samples),
(":video_sync_samples", &video_sync_samples),
(":video_sample_entry_id", &video_sample_entry_id),
])?;
insert2.execute_named(&[
(":composite_id", &composite_id),
(":sample_file_uuid", &sample_file_uuid),
(":sample_file_sha1", &sample_file_sha1),
(":video_index", &video_index),
])?;
camera_state.current_run = if trailing_zero {
None
} else {
Some((run_id, start_time_90k + duration_90k as i64))
};
}
Ok(camera_state)
}
fn fill_camera(tx: &rusqlite::Transaction, camera_state: HashMap<i32, CameraState>)
-> Result<(), Error> {
let mut select = tx.prepare(r#"
select
id, uuid, short_name, description, host, username, password, main_rtsp_path,
sub_rtsp_path, retain_bytes
from
old_camera
"#)?;
let mut insert = tx.prepare(r#"
insert into camera values (:id, :uuid, :short_name, :description, :host, :username, :password,
:main_rtsp_path, :sub_rtsp_path, :retain_bytes, :next_recording_id)
"#)?;
let mut rows = select.query(&[])?;
while let Some(row) = rows.next() {
let row = row?;
let id: i32 = row.get_checked(0)?;
let uuid: Vec<u8> = row.get_checked(1)?;
let short_name: String = row.get_checked(2)?;
let description: String = row.get_checked(3)?;
let host: String = row.get_checked(4)?;
let username: String = row.get_checked(5)?;
let password: String = row.get_checked(6)?;
let main_rtsp_path: String = row.get_checked(7)?;
let sub_rtsp_path: String = row.get_checked(8)?;
let retain_bytes: i64 = row.get_checked(9)?;
insert.execute_named(&[
(":id", &id),
(":uuid", &uuid),
(":short_name", &short_name),
(":description", &description),
(":host", &host),
(":username", &username),
(":password", &password),
(":main_rtsp_path", &main_rtsp_path),
(":sub_rtsp_path", &sub_rtsp_path),
(":retain_bytes", &retain_bytes),
(":next_recording_id",
&camera_state.get(&id).map(|s| s.next_recording_id).unwrap_or(1)),
])?;
}
Ok(())
}

View File

@ -34,14 +34,16 @@ use core::borrow::Borrow;
use core::str::FromStr;
use db;
use dir::SampleFileDir;
use error::{Error, Result};
use error::Error;
use http_entity;
use hyper::{header,server,status};
use hyper::uri::RequestUri;
use mime;
use mp4;
use recording;
use regex::Regex;
use serde_json;
use std::cmp;
use std::fmt;
use std::io::Write;
use std::ops::Range;
@ -57,6 +59,11 @@ const DECIMAL_PREFIXES: &'static [&'static str] =&[" ", " k", " M", " G", " T",
lazy_static! {
static ref JSON: mime::Mime = mime!(Application/Json);
static ref HTML: mime::Mime = mime!(Text/Html);
/// Regex used to parse the `s` query parameter to `view.mp4`.
/// As described in `design/api.md`, this is of the form
/// `START_ID[-END_ID][.[REL_START_TIME]-[REL_END_TIME]]`.
static ref SEGMENTS_RE: Regex = Regex::new(r"^(\d+)(-\d+)?(?:\.(\d+)?-(\d+)?)?$").unwrap();
}
mod json { include!(concat!(env!("OUT_DIR"), "/serde_types.rs")); }
@ -181,18 +188,58 @@ pub struct Handler {
dir: Arc<SampleFileDir>,
}
#[derive(Debug, Eq, PartialEq)]
struct Segments {
ids: Range<i32>,
start_time: i64,
end_time: Option<i64>,
}
impl Segments {
pub fn parse(input: &str) -> Result<Segments, ()> {
let caps = SEGMENTS_RE.captures(input).ok_or(())?;
let ids_start = i32::from_str(caps.at(1).unwrap()).map_err(|_| ())?;
let ids_end = match caps.at(2) {
Some(e) => i32::from_str(&e[1..]).map_err(|_| ())?,
None => ids_start,
} + 1;
if ids_start < 0 || ids_end <= ids_start {
return Err(());
}
let start_time = caps.at(3).map_or(Ok(0), i64::from_str).map_err(|_| ())?;
if start_time < 0 {
return Err(());
}
let end_time = match caps.at(4) {
Some(v) => {
let e = i64::from_str(v).map_err(|_| ())?;
if e <= start_time {
return Err(());
}
Some(e)
},
None => None
};
Ok(Segments{
ids: ids_start .. ids_end,
start_time: start_time,
end_time: end_time,
})
}
}
impl Handler {
pub fn new(db: Arc<db::Database>, dir: Arc<SampleFileDir>) -> Self {
Handler{db: db, dir: dir}
}
fn not_found(&self, mut res: server::Response) -> Result<()> {
fn not_found(&self, mut res: server::Response) -> Result<(), Error> {
*res.status_mut() = status::StatusCode::NotFound;
res.send(b"not found")?;
Ok(())
}
fn list_cameras(&self, req: &server::Request, mut res: server::Response) -> Result<()> {
fn list_cameras(&self, req: &server::Request, mut res: server::Response) -> Result<(), Error> {
let json = is_json(req);
let buf = {
let db = self.db.lock();
@ -207,7 +254,7 @@ impl Handler {
Ok(())
}
fn list_cameras_html(&self, db: MutexGuard<db::LockedDatabase>) -> Result<Vec<u8>> {
fn list_cameras_html(&self, db: MutexGuard<db::LockedDatabase>) -> Result<Vec<u8>, Error> {
let mut buf = Vec::new();
buf.extend_from_slice(b"\
<!DOCTYPE html>\n\
@ -242,7 +289,7 @@ impl Handler {
}
fn camera(&self, uuid: Uuid, query: &str, req: &server::Request, mut res: server::Response)
-> Result<()> {
-> Result<(), Error> {
let json = is_json(req);
let buf = {
let db = self.db.lock();
@ -260,7 +307,7 @@ impl Handler {
}
fn camera_html(&self, db: MutexGuard<db::LockedDatabase>, query: &str,
uuid: Uuid) -> Result<Vec<u8>> {
uuid: Uuid) -> Result<Vec<u8>, Error> {
let r = Handler::get_optional_range(query)?;
let camera = db.get_camera(uuid)
.ok_or_else(|| Error::new("no such camera".to_owned()))?;
@ -290,26 +337,30 @@ impl Handler {
// parameters between recordings.
static FORCE_SPLIT_DURATION: recording::Duration =
recording::Duration(60 * 60 * recording::TIME_UNITS_PER_SEC);
db.list_aggregated_recordings(camera.id, &r, FORCE_SPLIT_DURATION, |row| {
let seconds = (row.range.end.0 - row.range.start.0) / recording::TIME_UNITS_PER_SEC;
let mut rows = Vec::new();
db.list_aggregated_recordings(camera.id, r, FORCE_SPLIT_DURATION, |row| {
rows.push(row.clone());
Ok(())
})?;
rows.sort_by(|r1, r2| r1.time.start.cmp(&r2.time.start));
for row in &rows {
let seconds = (row.time.end.0 - row.time.start.0) / recording::TIME_UNITS_PER_SEC;
write!(&mut buf, "\
<tr><td><a href=\"view.mp4?start_time_90k={}&end_time_90k={}\">{}</a></td>\
<tr><td><a href=\"view.mp4?s={}-{}\">{}</a></td>\
<td>{}</td><td>{}x{}</td><td>{:.0}</td><td>{:b}B</td><td>{}bps</td></tr>\n",
row.range.start.0, row.range.end.0,
HumanizedTimestamp(Some(row.range.start)),
HumanizedTimestamp(Some(row.range.end)), row.video_sample_entry.width,
row.ids.start, row.ids.end - 1, HumanizedTimestamp(Some(row.time.start)),
HumanizedTimestamp(Some(row.time.end)), row.video_sample_entry.width,
row.video_sample_entry.height,
if seconds == 0 { 0. } else { row.video_samples as f32 / seconds as f32 },
Humanized(row.sample_file_bytes),
Humanized(if seconds == 0 { 0 } else { row.sample_file_bytes * 8 / seconds }))?;
Ok(())
})?;
};
buf.extend_from_slice(b"</table>\n</html>\n");
Ok(buf)
}
fn camera_recordings(&self, uuid: Uuid, query: &str, req: &server::Request,
mut res: server::Response) -> Result<()> {
mut res: server::Response) -> Result<(), Error> {
let r = Handler::get_optional_range(query)?;
if !is_json(req) {
*res.status_mut() = status::StatusCode::NotAcceptable;
@ -321,11 +372,11 @@ impl Handler {
let db = self.db.lock();
let camera = db.get_camera(uuid)
.ok_or_else(|| Error::new("no such camera".to_owned()))?;
db.list_aggregated_recordings(camera.id, &r, recording::Duration(i64::max_value()),
db.list_aggregated_recordings(camera.id, r, recording::Duration(i64::max_value()),
|row| {
out.recordings.push(json::Recording{
start_time_90k: row.range.start.0,
end_time_90k: row.range.end.0,
start_time_90k: row.time.start.0,
end_time_90k: row.time.end.0,
sample_file_bytes: row.sample_file_bytes,
video_samples: row.video_samples,
video_sample_entry_width: row.video_sample_entry.width,
@ -342,79 +393,90 @@ impl Handler {
}
fn camera_view_mp4(&self, uuid: Uuid, query: &str, req: &server::Request,
res: server::Response) -> Result<()> {
res: server::Response) -> Result<(), Error> {
let camera_id = {
let db = self.db.lock();
let camera = db.get_camera(uuid)
.ok_or_else(|| Error::new("no such camera".to_owned()))?;
camera.id
};
let mut start = None;
let mut end = None;
let mut include_ts = false;
let mut builder = mp4::Mp4FileBuilder::new();
for (key, value) in form_urlencoded::parse(query.as_bytes()) {
let (key, value) = (key.borrow(), value.borrow());
match key {
"start_time_90k" => start = Some(recording::Time(i64::from_str(value)?)),
"end_time_90k" => end = Some(recording::Time(i64::from_str(value)?)),
"ts" => { include_ts = value == "true"; },
_ => {},
}
};
let start = start.ok_or_else(|| Error::new("start_time_90k missing".to_owned()))?;
let end = end.ok_or_else(|| Error::new("end_time_90k missing".to_owned()))?;
let desired_range = start .. end;
let mut builder = mp4::Mp4FileBuilder::new();
// There should be roughly ceil((end - start) / desired_recording_duration) recordings
// in the desired timespan if there are no gaps or overlap. Add a couple more to be safe:
// one for misalignment of the requested timespan with the rotate offset, another because
// rotation only happens at key frames.
let ceil_durations = ((end - start).0 + recording::DESIRED_RECORDING_DURATION - 1) /
"s" => {
let s = Segments::parse(value).map_err(
|_| Error::new(format!("invalid s parameter: {}", value)))?;
debug!("camera_view_mp4: appending s={:?}", s);
let mut est_segments = (s.ids.end - s.ids.start) as usize;
if let Some(end) = s.end_time {
// There should be roughly ceil((end - start) / desired_recording_duration)
// recordings in the desired timespan if there are no gaps or overlap,
// possibly another for misalignment of the requested timespan with the
// rotate offset and another because rotation only happens at key frames.
let ceil_durations = (end - s.start_time +
recording::DESIRED_RECORDING_DURATION - 1) /
recording::DESIRED_RECORDING_DURATION;
let est_records = (ceil_durations + 2) as usize;
let mut next_start = start;
builder.reserve(est_records);
{
let db = self.db.lock();
db.list_recordings(camera_id, &desired_range, |r| {
if builder.len() == 0 && r.start > next_start {
return Err(Error::new(format!("recording started late ({} vs requested {})",
r.start, start)));
} else if builder.len() != 0 && r.start != next_start {
return Err(Error::new(format!("gap/overlap in recording: {} to {} after row {}",
next_start, r.start, builder.len())));
est_segments = cmp::min(est_segments, (ceil_durations + 2) as usize);
}
next_start = r.start + recording::Duration(r.duration_90k as i64);
// TODO: check for inconsistent video sample entries.
builder.reserve(est_segments);
let db = self.db.lock();
let mut prev = None;
let mut cur_off = 0;
db.list_recordings_by_id(camera_id, s.ids.clone(), |r| {
// Check for missing recordings.
match prev {
None if r.id == s.ids.start => {},
None => return Err(Error::new(format!("no such recording {}/{}",
camera_id, s.ids.start))),
Some(id) if r.id != id + 1 => {
return Err(Error::new(format!("no such recording {}/{}",
camera_id, id + 1)));
},
_ => {},
};
prev = Some(r.id);
let rel_start = if r.start < start {
(start - r.start).0 as i32
// Add a segment for the relevant part of the recording, if any.
let end_time = s.end_time.unwrap_or(i64::max_value());
let d = r.duration_90k as i64;
if s.start_time <= cur_off + d && cur_off < end_time {
let start = cmp::max(0, s.start_time - cur_off);
let end = cmp::min(d, end_time - cur_off);
let times = start as i32 .. end as i32;
debug!("...appending recording {}/{} with times {:?} (out of dur {})",
r.camera_id, r.id, times, d);
builder.append(&db, r, start as i32 .. end as i32)?;
} else {
0
};
let rel_end = if r.start + recording::Duration(r.duration_90k as i64) > end {
(end - r.start).0 as i32
} else {
r.duration_90k
};
builder.append(&db, r, rel_start .. rel_end)?;
debug!("...skipping recording {}/{} dur {}", r.camera_id, r.id, d);
}
cur_off += d;
Ok(())
})?;
// Check for missing recordings.
match prev {
Some(id) if s.ids.end != id + 1 => {
return Err(Error::new(format!("no such recording {}/{}",
camera_id, s.ids.end - 1)));
},
None => {
return Err(Error::new(format!("no such recording {}/{}",
camera_id, s.ids.start)));
},
_ => {},
};
if let Some(end) = s.end_time {
if end > cur_off {
return Err(Error::new(
format!("end time {} is beyond specified recordings", end)));
}
if next_start < end {
return Err(Error::new(format!(
"recording ends early: {}, not requested: {} after {} rows.",
next_start, end, builder.len())))
}
if builder.len() > est_records {
warn!("Estimated {} records for time [{}, {}); actually were {}",
est_records, start, end, builder.len());
} else {
debug!("Estimated {} records for time [{}, {}); actually were {}",
est_records, start, end, builder.len());
},
"ts" => builder.include_timestamp_subtitle_track(value == "true"),
_ => return Err(Error::new(format!("parameter {} not understood", key))),
}
builder.include_timestamp_subtitle_track(include_ts);
};
let mp4 = builder.build(self.db.clone(), self.dir.clone())?;
http_entity::serve(&mp4, req, res)?;
Ok(())
@ -422,7 +484,7 @@ impl Handler {
/// Parses optional `start_time_90k` and `end_time_90k` query parameters, defaulting to the
/// full range of possible values.
fn get_optional_range(query: &str) -> Result<Range<recording::Time>> {
fn get_optional_range(query: &str) -> Result<Range<recording::Time>, Error> {
let mut start = i64::min_value();
let mut end = i64::max_value();
for (key, value) in form_urlencoded::parse(query.as_bytes()) {
@ -455,10 +517,12 @@ impl server::Handler for Handler {
#[cfg(test)]
mod tests {
use super::{HtmlEscaped, Humanized};
use super::{HtmlEscaped, Humanized, Segments};
use testutil;
#[test]
fn test_humanize() {
testutil::init();
assert_eq!("1.0 B", format!("{:b}B", Humanized(1)));
assert_eq!("1.0 EiB", format!("{:b}B", Humanized(1i64 << 60)));
assert_eq!("1.5 EiB", format!("{:b}B", Humanized((1i64 << 60) + (1i64 << 59))));
@ -468,8 +532,30 @@ mod tests {
#[test]
fn test_html_escaped() {
testutil::init();
assert_eq!("", format!("{}", HtmlEscaped("")));
assert_eq!("no special chars", format!("{}", HtmlEscaped("no special chars")));
assert_eq!("a &lt;tag> &amp; text", format!("{}", HtmlEscaped("a <tag> & text")));
}
#[test]
fn test_segments() {
testutil::init();
assert_eq!(Segments{ids: 1..2, start_time: 0, end_time: None},
Segments::parse("1").unwrap());
assert_eq!(Segments{ids: 1..2, start_time: 26, end_time: None},
Segments::parse("1.26-").unwrap());
assert_eq!(Segments{ids: 1..2, start_time: 0, end_time: Some(42)},
Segments::parse("1.-42").unwrap());
assert_eq!(Segments{ids: 1..2, start_time: 26, end_time: Some(42)},
Segments::parse("1.26-42").unwrap());
assert_eq!(Segments{ids: 1..6, start_time: 0, end_time: None},
Segments::parse("1-5").unwrap());
assert_eq!(Segments{ids: 1..6, start_time: 26, end_time: None},
Segments::parse("1-5.26-").unwrap());
assert_eq!(Segments{ids: 1..6, start_time: 0, end_time: Some(42)},
Segments::parse("1-5.-42").unwrap());
assert_eq!(Segments{ids: 1..6, start_time: 26, end_time: Some(42)},
Segments::parse("1-5.26-42").unwrap());
}
}